Define the schema for the database

In [40]:
schema = {
    'node': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'lat': {'required': True, 'type': 'float', 'coerce': float},
            'lon': {'required': True, 'type': 'float', 'coerce': float},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'node_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    },
    'way': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'way_nodes': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'node_id': {'required': True, 'type': 'integer', 'coerce': int},
                'position': {'required': True, 'type': 'integer', 'coerce': int}
            }
        }
    },
    'way_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string', 'required': True}
            }
        }
    }
}

Create "sample.osm" from "cosprings.osm" in order to have a smaller file to test the python code on

In [3]:
import xml.etree.ElementTree as ET  # Use cElementTree or lxml if too slow

OSM = "cosprings.osm" 
SAMPLE_FILE = "sample.osm"

k = 30 # Parameter: take every k-th top level element

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'wb') as output:
    output.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    output.write('<osm>\n  ')

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write('</osm>')

Return the types of tags and the number of each

In [30]:
def count_tags(filename):
    tags = {}
    for event, elem in ET.iterparse(filename,events=('start',)):
        if elem.tag in tags.keys():
            tags[elem.tag] += 1
        else:
            tags[elem.tag] = 1
    return tags
count_tags(OSM)

{'bounds': 1,
 'member': 1328,
 'nd': 605664,
 'node': 506830,
 'osm': 1,
 'relation': 210,
 'tag': 222996,
 'way': 59672}

Find the number of keys with the specified patterns: lower, lower_colon, problemchars, and other

In [31]:
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

keys = {}
def key_type(element, keys):
    if element.tag == "tag":
        k_val = element.get('k')
        if bool(lower.search(k_val)):
            keys['lower'] += 1
        elif bool(lower_colon.search(k_val)):
            keys['lower_colon'] += 1
        elif bool(problemchars.search(k_val)):
            keys['problemchars'] += 1
        else:
            keys['other'] += 1    
    return keys

def process_map(filename):
    keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
    for _, element in ET.iterparse(filename):
        keys = key_type(element, keys)
    return keys

process_map(OSM)

{'lower': 140069, 'lower_colon': 80651, 'other': 2275, 'problemchars': 1}

Find the number of users who have contributed to the OSM file

In [98]:
def process_map(filename):
    users = set()
    for _, element in ET.iterparse(filename):
        if bool(element.get('uid')):
            users.add(element.attrib['uid'])
    return len(users)

process_map(OSM)

403

Return the different values for key='amenity' and the number of each value

In [103]:
def amenities(osmfile):
    osm_file = open(osmfile, "r")
    amenity = {}
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if tag.attrib['k'] == "amenity":
                    if tag.attrib['v'] in amenity.keys():
                        amenity[tag.attrib['v']] += 1
                    else:
                        amenity[tag.attrib['v']] = 1
    osm_file.close()
    return amenity

amenities(OSM)

{'Brewpub': 1,
 'Fishbowl': 1,
 'Health Clinic': 1,
 'John Cesara Park': 1,
 'apartments': 1,
 'arts_centre': 6,
 'atm': 11,
 'bank': 64,
 'bar': 31,
 'bbq': 3,
 'beauty': 2,
 'bench': 99,
 'bicycle_parking': 10,
 'bicycle_repair_station': 2,
 'cafe': 34,
 'cafeteria': 1,
 'car_wash': 27,
 'childcare': 2,
 'chiropractor': 2,
 'cinema': 6,
 'clinic': 4,
 'clock': 1,
 'college': 3,
 'community_centre': 8,
 'courthouse': 2,
 'dentist': 9,
 'doctors': 4,
 'dojo': 2,
 'dormitory': 4,
 'drinking_water': 6,
 'emergency_phone': 20,
 'fast_food': 170,
 'fire_station': 32,
 'fountain': 7,
 'fuel': 111,
 'grave_yard': 9,
 'grocery store': 1,
 'gym': 1,
 'hospital': 20,
 'ice_cream': 3,
 'kindergarten': 2,
 'library': 11,
 'nightclub': 1,
 'nursing_home': 2,
 'parking': 1870,
 'parking_space': 35,
 'pharmacy': 21,
 'picnic_table': 1,
 'place_of_worship': 69,
 'police': 6,
 'post_box': 103,
 'post_office': 19,
 'prison': 2,
 'pub': 20,
 'public_building': 10,
 'pup': 1,
 'recycling': 4,
 'register_

Audit the street types to determine which types need to be programatically corrected. Add the types to be corrected programatically to mapping 

In [33]:
from collections import defaultdict



street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)


expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons","Circle","Highway","Parkway","Terrace","Trafficway","Way","Loop","Alley",
            "Point"]

# UPDATE THIS VARIABLE
mapping = { "St": "Street",
            "St.": "Street",
            "Ct": "Court",
            "Rd": "Road",
            "Rd.": "Road",
            "Ave": "Avenue",
            "Blvd":"Boulevard",
            "Blvd.":"Boulevard",
            "Ct":"Court",
            "Dr":"Drive",
            "Dr.":"Drive",
            "HWY":"Highway",
            "Hwy":"Highway",
            "Ln":"Lane",
            "Pkwy":"Parkway",
            "RD":"Road",
            "ST":"Street",
            "STREET":"Street",
            "Ter":"Terrace",
            "Trfy":"Trafficway",
            "ave":"Avenue",
            "circle":"Circle",
            "ct":"Court",
            "dr":"Drive",
            "Pl":"Place",
            "rd":"Road",
            "st":"Street",
            "st.":"Street",
            "street":"Street",
            "ter":"Terrace",
            "terrace":"Terrace"
            }


def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)


def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")

def audit_streets(osmfile):
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types


audit_streets(OSM)


defaultdict(set,
            {'130': {'North Nevada Avenue Ste 130'},
             '80829': {'El Paso Blvd, Manitou Springs, CO 80829'},
             '80919': {'230 Point of the Pines Drive  Colorado Springs, CO 80919'},
             '83': {'Highway 83'},
             '85': {'South Highway 85'},
             'Aly': {'Kempton Aly'},
             'Ave': {'E Pikes Peak Ave',
              'East Pikes Peak Ave',
              'Gilpin Ave',
              'N Nevada Ave',
              'North Stone Ave',
              'Skyline Ave',
              'W Colorado Ave',
              'West Colorado Ave',
              'West Platte Ave'},
             'Ave.': {'Colorado Ave.', 'W. Moreno Ave.'},
             'B102-B112': {'Lakewood Cir # B102-B112'},
             'Blvd': {'N Academy Blvd', 'N Union Blvd'},
             'Boulefard': {'North Academy Boulefard'},
             'Building': {'East Pikes Peak Building'},
             'CO': {'1 Lake Avenue  Colorado Springs, CO'},
             'Cascade': {'

Define a procedure to programatically replace mapping keys with the values.  For instance '1106 W 47th street' should return '1106 W 47th Street'.  

In [42]:
def update_name(name, mapping):
    s = street_type_re.search(name)
    if s:
        street_type = s.group()
        if street_type not in expected:
            name = re.sub(street_type_re,mapping[street_type],name)
    return name

update_name('1106 W 47th street',mapping)

'1106 W 47th Street'

Audit postcodes to determine if any postcodes do not conform to the 5 digit format

In [35]:
def audit_postcodes(osmfile):
    osm_file = open(osmfile, "r")
    postcodes = set()
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if tag.attrib['k'] == 'addr:postcode':
                    postcode = tag.attrib['v']
                    if len(postcode) != 5:
                        postcodes.add(postcode)
    osm_file.close()
    return postcodes

audit_postcodes(OSM)

{'1875', 'CO'}

Define a procedure to correct postcodes that do not conform to the 5 digit standard.  For instance, 'CO 55555-4444' should be converted to '55555'.  Postcodes all conforming to a standardized format allows postcode queries to be more meaningful.

In [43]:
postcode_re = re.compile(r'ddddd', re.IGNORECASE)

def update_code(code):
    c = postcode_re.search(code)
    if c:
        codes = c.group()
        code = codes
    return code

update_code('KS 66210-1415')

'KS 66210-1415'

Audit phone numbers to catch any not conforming to the standard format '+1-555-555-5555'.

In [37]:
def phone(osmfile):
    osm_file = open(osmfile, "r")
    phones = set()
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if tag.attrib['k'] == 'phone':
                    number = tag.attrib['v']
                    if len(number) != 15:
                        phones.add(number)
    osm_file.close()
    return phones
phone(OSM)

{'(719) 227-0380',
 '(719) 265-5547',
 '(719) 418-7999',
 '(719) 534-0277',
 '(719) 635-2111',
 '(719) 941-1858',
 '+1-719- 892-7003',
 '+1-719-328-7700  ',
 '+1-719-368-7677 ',
 '+1-719-442-2201 ',
 '+1-719-473-8105 ',
 '+1-719-520-0123 ',
 '+1-719-520-5111 ',
 '+1-719-577-4366 ',
 '+1-719-578-7771 ',
 '+1-719-578-8847 ',
 '+1-719-591-0707 ',
 '+1-719-632-1754 x1200',
 '+1-719-632-5151 ',
 '+1-719-632-6122 ',
 '+1-719-634-3163 ',
 '+1-719-634-6073 ',
 '+1719-635-7925',
 '1-719-471-0880',
 '1-719-597-1700',
 '18667262676',
 '471-1562',
 '598-1891',
 '719 282 1182',
 '719 632 6278',
 '719 955 0966',
 '719-203-4413',
 '719-234-1200',
 '719-266-9900',
 '719-268-6874',
 '719-305-8000',
 '719-305-9000',
 '719-321-4727',
 '719-322-0255',
 '719-322-9351',
 '719-333-2606',
 '719-344-5897',
 '719-359-5026',
 '719-365-5000',
 '719-444-5238',
 '719-477-2121',
 '719-527-0500',
 '719-548-1404',
 '719-570-7656',
 '719-597-3014',
 '719-597-4282',
 '719-632-4066',
 '719-634-5299',
 '719-635-1389',
 '7

Define a procedure that will update phone numbers to the format defined above.  For instance, '(555) 555-5555' returns '+1-555-555-5555'.  This allows queries on phone numbers to be more valid.  

In [97]:
def update_phone(phone):
    phone = phone.replace(" ","")
    p = list(phone) 
    if len(phone)>8:
        if p[0] != '+':
            p[1:] = p[0:]
            p[0] = '+'
        if p[1] != '1':
            p[2:] = p[1:]
            p[1] = '1'
        if p[2] != '-':
            if p[2] == '(':
                p[2] = '-'
            else:
                p[3:] = p[2:]
                p[2] = '-'
        if p[6] != '-':
            if p[6] == ')':
                p[6] = '-'
            else:
                p[7:] = p[6:]
                p[6] = '-' 
        if p[10] != '-':
            p[11:] = p[10:]
            p[10] = '-'
    return "".join(p)
    
update_phone('(913) 6261531')    

'+1-913-626-1531'

Using the schema defined at the top of the code, write csv files to create the database.

In [None]:
import csv
import codecs
import cerberus

OSM_PATH = OSM

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema


NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements

    if element.tag == 'node':
        for node in NODE_FIELDS:
            node_attribs[node] = element.attrib[node]
        for child in element:
            tag = {}
            if PROBLEMCHARS.search(child.attrib["k"]):
                continue
            
            elif LOWER_COLON.search(child.attrib["k"]):
                tag_type = child.attrib["k"].split(':',1)[0]
                tag_key = child.attrib["k"].split(':',1)[1]
                tag["key"] = tag_key
                if tag_type:
                    tag["type"] = tag_type
                else:
                    tag["type"] = 'regular'
                
                tag["id"] = element.attrib["id"]
                tag["value"] = child.attrib["v"]
                
            else:
                tag["value"] = child.attrib["v"]
                tag["key"] = child.attrib["k"]
                tag["type"] = "regular"
                tag["id"] = element.attrib["id"]
            if tag:
                    tags.append(tag)
        print {'node': node_attribs, 'node_tags': tags}        
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        for way in WAY_FIELDS:
            way_attribs[way] = element.attrib[way]
        for child in element:
            nd = {}
            tag = {}
            if child.tag == 'tag':
                if PROBLEMCHARS.search(child.attrib["k"]):
                    continue
                elif LOWER_COLON.search(child.attrib["k"]):
                    tag_type = child.attrib["k"].split(':',1)[0]
                    tag_key = child.attrib["k"].split(':',1)[1]
                    tag["key"] = tag_key
                    if tag_type:
                        tag["type"] = tag_type
                    else:
                        tag["type"] = 'regular'
                    tag["id"] = element.attrib["id"]
                    tag["value"] = child.attrib["v"]
                    
                else:
                    tag["value"] = child.attrib["v"]
                    tag["key"] = child.attrib["k"]
                    tag["type"] = "regular"
                    tag["id"] = element.attrib["id"]
                if tag:
                    tags.append(tag)
            elif child.tag == 'nd':
                nd['id'] = element.attrib["id"]
                nd['node_id'] = child.attrib["ref"]
                nd['position'] = len(way_nodes)
                
                if nd:
                    way_nodes.append(nd)
            else:
                continue
        print {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}        
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}



def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
            for k, v in errors.iteritems()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)



def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])
                    
process_map(OSM,validate=False)

{'node': {'changeset': '16933109', 'uid': '227972', 'timestamp': '2013-07-13T05:00:28Z', 'lon': '-104.8302179', 'version': '37', 'user': 'Your Village Maps', 'lat': '38.8970934', 'id': '32658042'}, 'node_tags': []}
{'node': {'changeset': '16933109', 'uid': '227972', 'timestamp': '2013-07-13T05:00:28Z', 'lon': '-104.8304538', 'version': '6', 'user': 'Your Village Maps', 'lat': '38.8965024', 'id': '32658043'}, 'node_tags': []}
{'node': {'changeset': '14256922', 'uid': '719784', 'timestamp': '2012-12-13T07:11:21Z', 'lon': '-104.8122016', 'version': '35', 'user': 'FrozenFlame22', 'lat': '38.9837277', 'id': '32658050'}, 'node_tags': []}
{'node': {'changeset': '14315133', 'uid': '719784', 'timestamp': '2012-12-18T07:14:47Z', 'lon': '-104.8013559', 'version': '38', 'user': 'FrozenFlame22', 'lat': '38.9610899', 'id': '32658055'}, 'node_tags': []}
{'node': {'changeset': '14315133', 'uid': '719784', 'timestamp': '2012-12-18T07:14:47Z', 'lon': '-104.8013309', 'version': '38', 'user': 'FrozenFlame