In [1]:
import csv
import codecs
import re
import xml.etree.cElementTree as ET
import cerberus
import schema

In [2]:
OSM_PATH = "honolulu_hawaii.osm"
NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"
LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')
SCHEMA = schema.schema

In [3]:
# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

In [4]:
# This is the list of acceptable street names
expected = ["106", "Avenue", "Boulevard", "Center", "Circle", "Drive", "Highway", "Ike",
            "Lane", "Loop", "Mall", "Parkway", "Place", "Road", "Street", "Walk", "Way", 
            "Honolulu", "Kailua", "King", "Momi", "Terrace"]

In [5]:
# the keys of this mapping was obtained from checking unique keys
# in the csv files resulted from the provided data.py
mapping = { "St": "Street",
            "St.": "Street",
            "Rd.": "Road",
            "Ave": "Avenue",
            "Blvd": "Boulevard",
            "Dr": "Drive",
            "Hwy": "Highway",
            "Pkwy": "Parkway"
            }

In [6]:
# this function will set the street name to an acceptable street name
def update_name(name, mapping):    
    badname = street_type_re.search(name).group()
    pos = name.find(badname)    
    if badname in mapping:
        goodname = mapping[badname]
        name = name[:pos]+goodname
    return name

In [7]:
# this function will set the postalcode values to the standard/acceptable values
def update_postalcode(num):
    num = re.findall('[0-9]+',num)[0]
    return num

In [8]:
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""
    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements

    if element.tag == 'node':
        for node in NODE_FIELDS:
            node_attribs[node] = element.attrib[node]
        for child in element:
            
            problem = PROBLEMCHARS.search(child.attrib['k'])
            match = LOWER_COLON.search(child.attrib['k'])
            
            if problem:
                continue
            else:
                subtag = {}
                subtag['id'] = element.attrib['id']
                subtag['value'] = child.attrib['v']   
                if match:
                    pos = child.attrib['k'].find(':')
                    subtag['key'] = child.attrib['k'][pos + 1 :]
                    subtag['type'] = child.attrib['k'][:pos]
                else:
                    pos = child.attrib['k'].find(':')
                    subtag['key'] = child.attrib['k']
                    subtag['type'] = 'regular'        
                
                if subtag['key'] == "street":
                    subtag['value'] = update_name(subtag['value'], mapping)                

                if subtag['key'] == "postcode" or subtag['key'] == "postal_code":
                    subtag['value'] = update_postalcode(subtag['value'])            
                
                tags.append(subtag)

        return {'node': node_attribs, 'node_tags': tags}
    
    elif element.tag == 'way':
        for way in WAY_FIELDS:
            way_attribs[way] = element.attrib[way]
        count = -1
        for child in element:
            way_ref = {}
            if child.tag == 'nd':
                count += 1
                way_ref['id'] = element.attrib['id']
                way_ref['node_id'] = child.attrib['ref']
                way_ref['position'] = count
                way_nodes.append(way_ref)
            else:
                problem = PROBLEMCHARS.search(child.attrib['k'])
                match = LOWER_COLON.search(child.attrib['k'])
            
                if problem:
                    continue
                else:
                    subtag = {}
                    subtag['id'] = element.attrib['id']
                    subtag['value'] = child.attrib['v']   
                    if match:
                        pos = child.attrib['k'].find(':')
                        subtag['key'] = child.attrib['k'][pos + 1 :]
                        subtag['type'] = child.attrib['k'][:pos]
                    else:
                        pos = child.attrib['k'].find(':')
                        subtag['key'] = child.attrib['k']
                        subtag['type'] = 'regular'     
                    
                    
                    if subtag['key'] == "street":
                        subtag['value'] = update_name(subtag['value'], mapping)  

                    if subtag['key'] == "postcode" or subtag['key'] == "postal_code":
                        subtag['value'] = update_postalcode(subtag['value'])
                    tags.append(subtag)
                
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}

# ================================================== #
#               Helper Functions                     #
# ================================================== #

In [9]:
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()

In [10]:
def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
            for k, v in errors.iteritems()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )

In [11]:
class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)

# ================================================== #
#               Main Function                        #
# ================================================== #

In [12]:
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS,lineterminator='\n')
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS,lineterminator='\n')
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS,lineterminator='\n')
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS,lineterminator='\n')
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS,lineterminator='\n')

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])

In [13]:
#start of program
process_map(OSM_PATH, validate=True)