In [1]:
#!/usr/bin/env python
#-*- coding: utf-8 -*-

import xml.etree.ElementTree as ET  # Use cElementTree or lxml if too slow

OSM_FILE = "san-antonio_texas.osm"  # Replace this with your osm file
SAMPLE_FILE = "sample.osm"

k = 10 # Parameter: take every k-th top level element

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'wb') as output:
    output.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    output.write('<osm>\n  ')

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write('</osm>')

In [2]:
def count_tags(filename):
    tags = {}
    for event, elem in ET.iterparse(filename):
        if elem.tag in tags.keys():
            tags[elem.tag] += 1
        else:
            tags[elem.tag] = 1

    return tags

count_tags(OSM_FILE)

{'bounds': 1,
 'member': 23537,
 'nd': 1479783,
 'node': 1244193,
 'osm': 1,
 'relation': 1718,
 'tag': 751039,
 'way': 144603}

In [3]:
# some regular expressions
import pprint
import re
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)
postcode_re = re.compile(r'^\d{5}$')
 
# expected street names
expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", "Trail", "Parkway",
            "Commons","Bend","Causeway", "Circle","Concession","County","Drive","Highway","Manor","Passage","Path","Plaza",
            "Point","Terrace","Trail","Way"]

In [26]:
mapping = {"AVE":"Avenue","Ave":"Avenue","Ave.":"Avenue","ave":"Avenue" ,"avenue":"Avenue",
    "BLVD":"Boulevard", "Blvd":"Boulevard","Blvd.":"Boulevard","Bnd":"Bend",
        "Cir": "Circle","Cirlce":"Circle","Ct":"County","Cv":"County","ct":"County",
            "DRIVE":"Drive","Dr":"Drive","Dr.":"Drive","E":"Drive",
                "Hwy":"Highway","Ln":"Lane","Mnr":"Manor","North":"Drive",
                    "Pkwy":"Parkway","Poinciana":"Point","Pt":"Point",
                        "RD":"Road","Rd.":"Road","Rd":"Road","rd":"Road","road":"Road",
                            "ST":"Street","St":"Street","St.":"Street","st":"Street",
                                "Ter":"Terrace","W":"Way", "Trl":"Trail",
                                    "okeechobee":"Okeechobee Road","street":"Street","US Highway 1":"US Highway","3331": "33310"}

In [4]:
def audit_street_type(street_types, street_name):
    # Function to add street names not in the common_street_types list
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)

In [5]:
def is_street_name(elem):
    # To determine if an element is a street name
    return (elem.attrib['k'] == "addr:street")
 

In [6]:
from collections import defaultdict
def audit_street(osmfile):
    # iter through all street name tag under node or way and audit the street name value
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()#####
    return street_types

In [7]:
pprint.pprint(dict(audit_street(SAMPLE_FILE)))

{'100': set(['W Ave #100']),
 '103': set(['Broadway St #103']),
 '112': set(['US Hwy 281 North, Suite 112']),
 '12': set(['RM 12']),
 '1346': set(['FM 1346']),
 '151': set(['State Highway 151']),
 '2011': set(['Huebner Rd #2011']),
 '281': set(['North US Highway 281']),
 '3351': set(['Farm-to-Market Road 3351']),
 '35': set(['North IH 35',
            'North Interstate Highway 35',
            'South Interstate Highway 35']),
 '410': set(['Northeast Loop 410', 'Northwest Loop 410']),
 '46': set(['North State Highway 46', 'State Highway 46']),
 '464': set(['FM 464']),
 '775': set(['FM 775']),
 '78': set(['Farm-to-Market Road 78']),
 'A': set(['Avenue A']),
 'Audrey': set(['Ashton Audrey']),
 'B': set(['Avenue B']),
 'Bay': set(['Bourdeaux Bay', 'Goldenrain Bay']),
 'Bluff': set(['River Bluff']),
 'Bois': set(['Clos Du Bois']),
 'Broadway': set(['Broadway']),
 'Bypass': set(['North Highway 123 Bypass']),
 'C': set(['Avenue C']),
 'Casbury': set(['Casbury']),
 'Cedar': set(['Gray Cedar'])

process_map(OSM_FILE)

In [8]:
def is_post_code(elem):
    return (elem.attrib['k'] == "addr:postcode")

In [9]:
def audit_postcode_type(postcode_types, postcode):
    m = postcode_re.search(postcode)
    if m:
        # pc = len(m.group(0))
        pc = m.group()
        postcode_types[pc].add(postcode)

In [13]:
def audit_postcode(osmfile):
    # iter through all street name tag under node or way and audit the street name value
    osm_file = open(osmfile, "r")
    postcode_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_post_code(tag):
                    audit_postcode_type(postcode_types, tag.attrib['v'])
    return postcode_types
 


In [14]:
pprint.pprint(dict(audit_postcode(SAMPLE_FILE)))

{'78006': set(['78006']),
 '78023': set(['78023']),
 '78052': set(['78052']),
 '78070': set(['78070']),
 '78108': set(['78108']),
 '78109': set(['78109']),
 '78112': set(['78112']),
 '78114': set(['78114']),
 '78121': set(['78121']),
 '78130': set(['78130']),
 '78154': set(['78154']),
 '78155': set(['78155']),
 '78204': set(['78204']),
 '78205': set(['78205']),
 '78207': set(['78207']),
 '78209': set(['78209']),
 '78210': set(['78210']),
 '78211': set(['78211']),
 '78212': set(['78212']),
 '78213': set(['78213']),
 '78216': set(['78216']),
 '78217': set(['78217']),
 '78218': set(['78218']),
 '78219': set(['78219']),
 '78222': set(['78222']),
 '78223': set(['78223']),
 '78226': set(['78226']),
 '78227': set(['78227']),
 '78228': set(['78228']),
 '78229': set(['78229']),
 '78230': set(['78230']),
 '78231': set(['78231']),
 '78232': set(['78232']),
 '78233': set(['78233']),
 '78238': set(['78238']),
 '78239': set(['78239']),
 '78240': set(['78240']),
 '78244': set(['78244']),
 '78245': se

In [33]:
def update_postcode(postcode, mapping):
   
    m = re.findall(r'^(\d{5})-\d{4}$', postcode)
   
    if m:
        postcode = m[0]
    elif postcode in mapping:
        postcode = mapping[postcode]
   
    return postcode

In [15]:
def update_name(name, mapping):
    m = street_type_re.search(name)
    if m.group() not in expected:
        if m.group() in mapping.keys():
            print "BEFORE"
            print name
            name = re.sub(m.group(), mapping[m.group()], name)
            print "AFTER"
            print name
            
    return name

In [16]:
import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET

import cerberus
import schema

OSM_PATH = "sample.osm"

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema.schema

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

In [40]:
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""
   
    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []
   
    if element.tag == 'node':
        for attrib in element.attrib:
            if attrib in NODE_FIELDS:
                node_attribs[attrib] = element.attrib[attrib]
       
        for child in element:
            node_tag = {}
            if LOWER_COLON.match(child.attrib['k']):
                node_tag['type'] = child.attrib['k'].split(':',1)[0]
                node_tag['key'] = child.attrib['k'].split(':',1)[1]
                node_tag['id'] = element.attrib['id']
                node_tag['value'] = child.attrib['v']
                if child.attrib['k'] == 'addr:street':
                    # check if your function returns a value
                    if update_name(child.attrib["v"], mapping):
                        node_tag["value"] = update_name(child.attrib["v"], mapping)
                    else:
                        continue
                #elif child.attrib['k'] == 'addr:postcode':
                    # check if your function returns a value
                #    if update_postcode(child.attrib['v']):
                #        tag['value'] = update_postcode(child.attrib['v'])
                #    else:
                        continue
                tags.append(node_tag)
            elif PROBLEMCHARS.match(child.attrib['k']):
                continue
            else:
                node_tag['type'] = 'regular'
                node_tag['key'] = child.attrib['k']
                node_tag['id'] = element.attrib['id']
                node_tag['value'] = child.attrib['v']
                tags.append(node_tag)
       
        return {'node': node_attribs, 'node_tags': tags}
 
    elif element.tag == 'way':
        for attrib in element.attrib:
            if attrib in WAY_FIELDS:
                way_attribs[attrib] = element.attrib[attrib]
       
        position = 0
        for child in element:
            way_tag = {}
            way_node = {}
           
            if child.tag == 'tag':
                if LOWER_COLON.match(child.attrib['k']):
                    way_tag['type'] = child.attrib['k'].split(':',1)[0]
                    way_tag['key'] = child.attrib['k'].split(':',1)[1]
                    way_tag['id'] = element.attrib['id']
                    way_tag['value'] = child.attrib['v']
                    if child.attrib["k"] == 'addr:street':
                    # check if your function returns a value
                        if update_name(child.attrib["v"], mapping):
                            way_tag["value"] = update_name(child.attrib["v"], mapping)
                        else:
                            continue
       
                #elif child.attrib['k'] == 'addr:postcode':
                # check if your function returns a value
                #    if update_postcode(child.attrib['v']):
                #        tag['value'] = update_postcode(child.attrib['v'])
                #    else:
                #        continue
                    tags.append(way_tag)
                elif PROBLEMCHARS.match(child.attrib['k']):
                    continue
                else:
                    way_tag['type'] = 'regular'
                    way_tag['key'] = child.attrib['k']
                    way_tag['id'] = element.attrib['id']
                    way_tag['value'] = child.attrib['v']
                    tags.append(way_tag)
       
            elif child.tag == 'nd':
                way_node['id'] = element.attrib['id']
                way_node['node_id'] = child.attrib['ref']
                way_node['position'] = position
                position += 1
                way_nodes.append(way_node)
 
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
 

In [41]:
# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)

In [42]:
# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_PATH, validate=False)


BEFORE
Interstate Highway 35 North
AFTER
Interstate Highway 35 Drive
BEFORE
Interstate Highway 35 North
AFTER
Interstate Highway 35 Drive
BEFORE
W Josephine St
AFTER
W Josephine Street
BEFORE
W Josephine St
AFTER
W Josephine Street
BEFORE
W Commerce St
AFTER
W Commerce Street
BEFORE
W Commerce St
AFTER
W Commerce Street
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
West Loop 1604 North
AFTER
West Loop 1604 Drive
BEFORE
East Loop 1604 North
AFTER
East Loop 1604 Drive
BEFORE
East Loop 1604 North
AFTER
East Loop 1604 Drive
BEFORE
Interstate 