In [12]:
import xml.etree.cElementTree as ET
from collections import defaultdict
import re
import csv
import codecs
import cerberus
from schema import Schema
import pprint

## How many tags in the map

In [2]:
def count_tags(filename):
    tags={}
    for event, element in ET.iterparse(filename, events=("start",)):
        if element.tag in tags:
            tags[element.tag] += 1
        else:
            tags[element.tag] = 1        
    return tags

In [3]:
print count_tags("wuhan_china.osm")

{'node': 314952, 'nd': 372704, 'bounds': 1, 'member': 8400, 'tag': 88530, 'osm': 1, 'way': 34267, 'relation': 330}


## key

In [4]:
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')


def key_type(element, keys):
    if element.tag == "tag":
        if lower.search(element.attrib['k']):
            keys['lower']+=1
        elif lower_colon.search(element.attrib['k']):
            keys['lower_colon']+=1
        elif problemchars.search(element.attrib['k']):
            keys['problemchars']+=1
        else:
            keys['other']+=1
        
        
    return keys



def process_map(filename):
    keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
    for _, element in ET.iterparse(filename):
        keys = key_type(element, keys)

    return keys

In [5]:
process_map('wuhan_china.osm')

{'lower': 85564, 'lower_colon': 2865, 'other': 101, 'problemchars': 0}

## The only users in the map

In [6]:
def get_user(element):
    return element.get('uid')


def process_map(filename):
    users = set()
    for _, element in ET.iterparse(filename):
        u= get_user(element)
        if u != None:
            users.add(u)
    return users

In [7]:
users = process_map('wuhan_china.osm')
len(users)

530

## Clean the street data

In [13]:
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)


expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons"]

# UPDATE THIS VARIABLE
mapping = { "St": "Street",
            "St.": "Street",
            "Ave": "Avenue",
            "Ave.": "Avenue",
            "Rd.": "Road",
            "Rd": "Road"
            }


def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)


def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")


def audit_street(osmfile):
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types


In [14]:
st_types = audit_street('wuhan_china.osm')
pprint.pprint(st_types)

The history saving thread hit an unexpected error (OperationalError('disk I/O error',)).History will not be written to the database.
defaultdict(<type 'set'>, {'shop': set(['small shop']), u'87hao': set([u'\u8f66\u7ad9\u675187hao']), u'2\u53f7': set([u'\u6e56\u5317\u7701.\u6b66\u6c49\u5e02\u94c1\u6865\u5357\u67512\u53f7']), 'S104': set(['S104']), u'86hao': set([u'\u6cb9\u7eb1\u8def86hao']), 'Rd': set(['Sanjiaohu Rd']), u'106\u8857\u574a': set([u'106\u8857\u574a']), u'6\u53f7': set([u'\u4e2d\u570b\u6e56\u5317\u7701\u6b66\u6c49\u5e02\u6b66\u660c\u533a\u4e2d\u5357\u5546\u5708\u4e2d\u5357\u4e8c\u8def6\u53f7', u'\u73de\u55bb\u8def6\u53f7', u'\u6d25\u6c34\u8def6\u53f7']), u'129\u53f7': set([u'\u73de\u55bb\u8def129\u53f7']), u'chezhancun': set([u'\u8f66\u7ad9\u6751 chezhancun'])})


In [15]:
def update_street_name(name, mapping):
    for ma in mapping.keys():
        if name.endswith(ma):
            name=name.replace(ma,mapping[ma])
    return name


In [16]:
for st_type, ways in st_types.iteritems():
    for name in ways:
        better_name = update_street_name(name, mapping)
        print name, "=>", better_name
            


small shop => small shop
车站村87hao => 车站村87hao
湖北省.武汉市铁桥南村2号 => 湖北省.武汉市铁桥南村2号
S104 => S104
油纱路86hao => 油纱路86hao
Sanjiaohu Rd => Sanjiaohu Road
106街坊 => 106街坊
中國湖北省武汉市武昌区中南商圈中南二路6号 => 中國湖北省武汉市武昌区中南商圈中南二路6号
珞喻路6号 => 珞喻路6号
津水路6号 => 津水路6号
珞喻路129号 => 珞喻路129号
车站村 chezhancun => 车站村 chezhancun


In [25]:
schema = {
    'node': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'lat': {'required': True, 'type': 'float', 'coerce': float},
            'lon': {'required': True, 'type': 'float', 'coerce': float},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'node_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    },
    'way': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'way_nodes': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'node_id': {'required': True, 'type': 'integer', 'coerce': int},
                'position': {'required': True, 'type': 'integer', 'coerce': int}
            }
        }
    },
    'way_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    }
}


## Seperate the tags into "CSV" documents

In [26]:
OSM_PATH = "wuhan_china.osm"
NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements

    if element.tag == 'node':
        for tag in element.iter('tag'):
            if PROBLEMCHARS.search(tag.attrib['k']):
                return None
            else:
                node_tag = {}
                node_tag['id'] = element.attrib['id']
                node_tag['value'] = tag.attrib['v']
                if LOWER_COLON.match(tag.attrib['k']):
                    node_tag['type'], _, node_tag['key'] = tag.attrib['k'].partition(':')
                    if node_tag['type'] == 'addr' and node_tag['key'] == 'street:name':
                        node_tag['value'] = update_street_name(tag.attrib['v'], mapping)
                else:
                    node_tag['type'] = 'regular'
                    node_tag['key'] = tag.attrib['k']
            tags.append(node_tag)
        for node_attr_field in node_attr_fields:
            node_attribs[node_attr_field] = element.attrib[node_attr_field]
        return {'node': node_attribs, 'node_tags': tags}
        
                
    if element.tag == 'way':
        for tag in element.iter('tag'):
            if PROBLEMCHARS.search(tag.attrib['k']):
                return None
            else:
                way_tag = {}
                way_tag['id'] = element.attrib['id']
                way_tag['value'] = tag.attrib['v']
                if LOWER_COLON.match(tag.attrib['k']):
                    way_tag['type'], _, way_tag['key'] = tag.attrib['k'].partition(':')
                    if way_tag['type'] == 'addr' and way_tag['key'] == 'street:name':
                        way_tag['value'] = update_street_name(tag.attrib['v'], mapping)  
                else:
                    way_tag['type'] = 'regular'
                    way_tag['key'] = tag.attrib['k']  
            tags.append(way_tag)
        
        for i, nd in enumerate(element.iter('nd')):
            node = {}
            node['id'] = element.attrib['id']
            node['node_id'] = nd.attrib['ref']
            node['position'] = i
            way_nodes.append(node)
              
        for way_attr_field in way_attr_fields:
            way_attribs[way_attr_field] = element.attrib[way_attr_field]
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
 

In [27]:
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()
            
def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)


# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_PATH, validate=False)
    