# OpenStreetMap - Prague
## Choose file and show summary about its tags

In [34]:
import xml.etree.cElementTree as ET
import pprint
import re
from collections import defaultdict
import csv
import codecs
import cerberus

#osm_filename = "prague_sample_k1000.osm"
#osm_filename = "prague_sample_k500.osm"
#osm_filename = "prague_sample_k250.osm"
#osm_filename = "prague_sample_k100.osm"
#osm_filename = "prague_sample_k25.osm"
osm_filename = "prague_czech-republic.osm"

csv_filename_nodes = "nodes.csv"
csv_filename_nodes_tags = "nodes_tags.csv"
csv_filename_ways = "ways.csv"
csv_filename_ways_nodes = "ways_nodes.csv"
csv_filename_ways_tags = "ways_tags.csv"
csv_filename_users = "users.csv"

In [35]:
def count_tags(filename):
    tags = {}
    for _, elem in ET.iterparse(filename, events=("start",)):
        if elem.tag not in tags:
            tags[elem.tag] = 1
        else:
            tags[elem.tag] += 1
    return tags

pprint.pprint(count_tags(osm_filename))

KeyboardInterrupt: 

## Utilities

In [36]:
word_colon_word = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\, \t\r\n]') # we allow for a dot as opposed to the case-study code

# Returns parsed type or None if not possible
def ensure_type(str_value, expected_type):
    
    # avoids converting strings to strings - so it doesn't have to handle non-ascii chars
    if expected_type == type(str()):
        return str_value
    
    try:
        return expected_type(str_value)
    except:
        return None
    
# My pprint class which handles utf8
# source:
# https://stackoverflow.com/questions/10883399/unable-to-encode-decode-pprint-output
class PP(pprint.PrettyPrinter):
    def format(self, object, context, maxlevels, level):
        if isinstance(object, unicode):
            return (object.encode('utf8'), True, False)
        return pprint.PrettyPrinter.format(self, object, context, maxlevels, level)

pp = PP()


def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()

## Audit

In [37]:
# mappings for parsing .osm into python dictionary/csv
# structure of the following variables is (original_name, type, target_name)

node_fields_names = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
node_fields = zip(node_fields_names, \
                    map(type, [int(), float(), float(), str(), int(), int(), int(), str()]), \
                    node_fields_names)

way_fields_names = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
way_fields = zip(way_fields_names, \
                    map(type, [int(), str(), int(), int(), int(), str()]),
                    way_fields_names)

tag_fields = zip(['k', 'v'], \
                map(type, [str(), str()]), \
                ['key', 'value'])

way_nodes_fields = zip(['ref'], \
                       [type(int())], \
                       ['node_id'])

All audit functions have the same return signature, i.e., (result1, result2, ..., errs), where results are what you expect or None if there was a failure. If results are None then errs is always non-empty list with info.

In [38]:
# Parse any element into dictionary using the given fields
def audit_fields(elem, fields):
    errs = []
    parsed = {}
    for field, field_type, dict_field in fields:
        if field not in elem.attrib:
            errs.append(('missing value', field))
        else:
            value = ensure_type(elem.get(field), field_type)
            if not value:
                errs.append(('wrong type', field))
            else:
                parsed[dict_field] = value
    
    if errs:
        parsed = None
    return parsed, errs

In [39]:
# Check for problematic characters and 
# split key if it has structure abc:def 
#      -> set 'type' to abc and 'key' to def
def audit_tag_key(tag):
    key = tag['key']
    errs = []
    if re.search(problemchars, key):
        tag = None
        errs.append(('problematic characters in key', re.search(problemchars, key).group(0)))
    elif re.match(word_colon_word, key):
        sep = key.index(':')
        tag['type'] = key[:sep]
        tag['key'] = key[sep+1:]
    return tag, errs

In [40]:
pattern_postcode_alternative = re.compile(r'^[0-9]{3} [0-9]{2}$')

# Check bounds of the postcode.
# Prague region included fully, Central Bohemian and Usti nad Labem regions partially
def audit_postcode(tag):
    errs = []
    try:
        if re.match(pattern_postcode_alternative, tag['value']):
            tag['value'] = tag['value'].replace(' ', '')
        postcode = int(tag['value'])
        if not (10000 <= postcode <= 29599 or 40000 <= postcode <= 44199):
            tag = None
            errs = ['postcode outside of valid range']
    except:
        tag = None
        errs = ['postcode not integer']
    return tag, errs

In [41]:
def audit_streetname(tag):
    errs = []
    tag['value'] = tag['value'].replace(u'nám.',u'náměstí')
    return tag, errs

In [42]:
def audit_country(tag):
    errs = []
    if tag['value'] != 'CZ':
        errs = ['wrong country']

    return errs

In [43]:
def audit_housenumber(tag):
    tag['value'] = tag['value'].replace('ev. ','ev.')
    return tag

In [44]:
# Auxiliary definitions for checking consistency of house ids 

# Patterns for checking all types of house ids in the Czech system
pattern_conscription = r'[1-9]+?[0-9]*'
pattern_street = r'[1-9]+?[0-9]*[a-zA-Z]?'
pattern_provisional = r'[1-9]+?[0-9]*'
patterns_house_id = \
            {'conscription': re.compile(r'^' + pattern_conscription + '$'), # popisne
            'street': re.compile(r'^' + pattern_street + '$'), # orientacni
            'provisional': re.compile(r'^' + pattern_provisional + '$'), # evidencni
            'house': re.compile(r'^' + \
                    '(ev\.' + pattern_provisional + ')|(' + pattern_conscription + ')' \
                        + '(/' + pattern_street + ')?$')} # ((ev.)evidencni)|(popisne)(/orientacni)?

def has_some_house_id(tags):
    for tag in tags:
        if tag['key'][:-6] in patterns_house_id:
            return True
    return False

def get_house_ids(tags):
    house_id = {}
    for tag in tags:
        id_type = tag['key'][:-6]
        
        if id_type in patterns_house_id: 
            house_id[id_type] = tag['value']
            
    return house_id

In [45]:
# Verify that all house id tags are consistent
def audit_house_id(tags):
    """
    Verify that all house id tags are consistent and try to
    fill in the missing ones if they can be inferred.
    If there is some inconsistency, tags will be returned
    in their original format and the second return value will
    be error info.
    """

    house_id = get_house_ids(tags)
    errs = []
    
    # check that the patterns are correct
    for id_type, id_value in house_id.items():
        
        match = re.match(patterns_house_id[id_type], id_value)
        
        # unusual format, maybe that house is a street number only
        if id_type == 'house':
                
            # this handles a few cases which are in the database
            if id_value[0] == '?':
                id_value = id_value[1:]
            if id_value[0] == '/':
                id_value = id_value[1:]
                    
            # either nothing else is known, so test street pattern
            # or street is known so test equality
            if (len(house_id) == 1 and not match and re.match(patterns_house_id['street'], id_value)) \
                    or ('street' in house_id and house_id['street'] == id_value): 
                
                # keep only street tag - house will be composed correctly later
                house_id['street'] = id_value
                del house_id['house']
                tags = [tag for tag in tags if tag['key'] != 'housenumber']
                match = True 
        
        if not match:
            errs.append((id_type + ' does not match pattern', id_value))
    
    if errs:
        return tags, [('House id pattern not matching', errs, tags)]
    
    if 'provisional' in house_id and 'conscription' in house_id:
        errs.append(('We cannot have both provisional and conscription', house_id))    
    
    # house unknown, use other fields to compose it
    elif 'house' not in house_id:
        housenumber = ''
        if 'conscription' in house_id:
            housenumber = house_id['conscription']
        elif 'provisional' in house_id:
            housenumber = 'ev.' + house_id['provisional']
        
        if 'street' in house_id:
            housenumber = housenumber + '/' + house_id['street']
        
        if housenumber != '':
            tags.append({'id': tags[0]['id'], # all tags have parent's id
                    'key': 'housenumber',
                    'value': housenumber,
                    'type': 'addr'})
    
    # use house to check or fill in other fields 
    else:
        is_provisional = house_id['house'][:3] == 'ev.'
        if is_provisional:
            house_id['house'] = house_id['house'][3:]
            first_name = 'provisional'
        elif house_id['house'][0] == '/':
            first_name = ''
        else:
            first_name = 'conscription'
            
        sep = house_id['house'].find('/')
        has_street = sep >= 0
        
        first = house_id['house']
        if has_street:
            second = first[sep+1:]
            first = first[:sep]
            
        if first_name in house_id:
            if first != house_id[first_name]:
                errs.append(('First number is not consistent', house_id))
        elif first != '' and first_name != '':
            tags.append({'id': tags[0]['id'], # all tags have parent's id
                    'key': first_name + 'number',
                    'value': first,
                    'type': 'addr'})
            
        if has_street:
            if 'street' in house_id:
                if second != house_id['street']:
                    errs.append(('Street number is not consistent', house_id))
            else:
                tags.append({'id': tags[0]['id'], # all tags have parent's id
                        'key': 'streetnumber',
                        'value': second,
                        'type': 'addr'})
        
    return tags, errs   

In [46]:
def audit_tags(elem):
    tags = []
    errs = []
    for tag in elem.iter('tag'):
        parsed, errs_curr = audit_fields(tag, tag_fields)
        if parsed:
            parsed['id'] = int(elem.get('id'))
            parsed['type'] = 'regular'
            parsed, errs_curr = audit_tag_key(parsed)
        
        if parsed and parsed['type'] == 'addr':
            if parsed['key'] == 'postcode':
                parsed, errs_curr = audit_postcode(parsed)
            elif parsed['key'] == 'street':
                parsed, errs_curr = audit_streetname(parsed)
            elif parsed['key'] == 'country':
                errs_curr = audit_country(parsed)
            elif parsed['key'] == 'housenumber':
                parsed = audit_housenumber(parsed)
        
        if errs_curr:
            errs.append(('ignored tag', tag.attrib, errs_curr))
        if parsed:
            tags.append(parsed)
    
    if errs:
        errs = [(errs, tags)]
    
    if has_some_house_id(tags):
        tags, errs_curr = audit_house_id(tags)
        if errs_curr:
            errs.append(errs_curr)
    
    return tags, errs

In [47]:
# Check bounds of latitude and longitude
def audit_location(node):
    errs = []
    for bound, field in zip([90., 180.], ['lat', 'lon']):
        if not (-bound <= node[field] <= bound):
            errs.append(('invalid value', field, node))
            node = None
            break
    return node, errs

In [48]:
# Parse node and its tags
def audit_node(node):
    parsed, errs = audit_fields(node, node_fields)
            
    if parsed:
        parsed, errs_loc = audit_location(parsed)
        tags, errs_tags = audit_tags(node)
        errs = errs_loc + errs_tags
        
    if not parsed:
        tags = None
            
    return parsed, tags, errs

In [49]:
# Parse nd tags which define nodes of a way
def audit_way_nodes(way):
    nodes = []
    errs = []
    position = 0
    for node in way.iter('nd'):
        parsed, errs_curr = audit_fields(node, way_nodes_fields)
        
        if parsed:
            parsed['id'] = int(way.get('id'))
            parsed['position'] = position
            nodes.append(parsed)
            position += 1
            
        else:
            errs.append('Bad way node', node.attrib, errs_curr)
    
    return nodes, errs

In [50]:
# Parse way, its tags and nodes
def audit_way(way):
    
    parsed, errs = audit_fields(way, way_fields)
            
    if parsed:
        nodes, errs_nodes = audit_way_nodes(way)
        tags, errs_tags = audit_tags(way)
        errs = errs_nodes + errs_tags
    
    if not parsed:
        nodes = None
        tags = None
        
    return parsed, nodes, tags, errs

In [51]:
def categorize_errs(errs):
    errs_cat = defaultdict(list)
    for err in errs:
        try:
            cat = err[2][0][0][0]
            errs_cat[cat].append(err)
        except:
            errs_cat['other'].append(err)
    return errs_cat

In [19]:
errs_glob = []
def audit(osm_filename):
    global errs_glob
    
    count = 0
#    tag_count = defaultdict(int)
#    streets = defaultdict(int)
    errs = []
    for elem in get_element(osm_filename, tags=('node', 'way')):
        
        count += 1  
        
        if elem.tag == "node":
            node, tags, errs_curr = audit_node(elem)
        elif elem.tag == "way":
            way, way_nodes, tags, errs_curr = audit_way(elem)
        
            
        if errs_curr:
            errs.append(('bad elem', elem.attrib, errs_curr))
#            pp.pprint(errs[-1])
#        else:
#            for tag in tags:
#                tag_count[tag['type'] + ":" + tag['key']] += 1
                
#                if tag['key'] == 'street' and len(tag['value'].split(' ')) > 1:
#                    words = tag['value'].split(' ')
#                    streets[words[0]] +=1
#                    streets[words[-1]] +=1
              
                 
#        if count > 1000:
#            break
    errs_glob = errs
    print("Num errors: {}".format(len(errs)))
    errs_cat = categorize_errs(errs)
    for cat in errs_cat:
        print("{}: {}".format(cat, len(errs_cat[cat])))
#    for key in sorted(streets, key=streets.get, reverse=True):
#        if streets[key] == 1:
#            break
#        print(key + ": " + str(streets[key]))
#    print("")
#    for key in sorted(tag_count, key=tag_count.get, reverse=True):
#        print(key + ": " + str(tag_count[key]))
#        if tag_count[key] < 100:
#            break

audit(osm_filename)

Num errors: 1
House id pattern not matching: 1


In [20]:
errs_cat = categorize_errs(errs_glob)

for cat, errs in errs_cat.items():
    print("{}: {}".format(cat, len(errs)))

print("")
for err in errs_cat['First number is not consistent']:
    pp.pprint(err[2][0])

House id pattern not matching: 1



## Parse and save as CSVs

In [52]:
tags_schema = {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    }

SCHEMA = {
    'nodes': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'lat': {'required': True, 'type': 'float', 'coerce': float},
            'lon': {'required': True, 'type': 'float', 'coerce': float},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'integer', 'coerce': int},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'nodes_tags': tags_schema,
    'ways': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'integer', 'coerce': int},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'ways_nodes': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'node_id': {'required': True, 'type': 'integer', 'coerce': int},
                'position': {'required': True, 'type': 'integer', 'coerce': int}
            }
        }
    },
    'ways_tags': tags_schema,
    'users' : {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'username': {'required': True, 'type': 'string'}
        }
    }
}

In [53]:
node_fields_csv_names = ['id', 'lat', 'lon', 'uid', 'version', 'changeset', 'timestamp']
way_fields_csv_names = ['id', 'uid', 'version', 'changeset', 'timestamp']
tag_fields_csv_names = ['id', 'key', 'value', 'type']
way_nodes_fields_csv_names = ['id', 'node_id', 'position']
user_fields_csv_names = ['id', 'username']

In [54]:
def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        pp.pprint(element)
        raise Exception(message_string.format(field, error_string))

In [55]:
class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)
    
    def write(self, rowrows):
        if type(rowrows) == type(list()):
            self.writerows(rowrows)
        elif type(rowrows) == type(dict()):
            self.writerow(rowrows)
        else:
            raise Exception("Wrong type to write, list or dict expected")

In [56]:
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(csv_filename_nodes, 'wb') as nodes_file, \
         codecs.open(csv_filename_nodes_tags, 'wb') as nodes_tags_file, \
         codecs.open(csv_filename_ways, 'wb') as ways_file, \
         codecs.open(csv_filename_ways_nodes, 'wb') as ways_nodes_file, \
         codecs.open(csv_filename_ways_tags, 'wb') as ways_tags_file, \
         codecs.open(csv_filename_users, 'wb') as users_file:

        nodes_writer = UnicodeDictWriter(nodes_file, node_fields_csv_names)
        nodes_tags_writer = UnicodeDictWriter(nodes_tags_file, tag_fields_csv_names)
        ways_writer = UnicodeDictWriter(ways_file, way_fields_csv_names)
        ways_nodes_writer = UnicodeDictWriter(ways_nodes_file, way_nodes_fields_csv_names)
        ways_tags_writer = UnicodeDictWriter(ways_tags_file, tag_fields_csv_names)
        users_writer = UnicodeDictWriter(users_file, user_fields_csv_names)
        
        writers = {'nodes': nodes_writer,
                  'nodes_tags': nodes_tags_writer,
                  'ways': ways_writer,
                  'ways_nodes': ways_nodes_writer,
                  'ways_tags': ways_tags_writer,
                  'users': users_writer}
        
        # It seems that sqlite does not like headers
        #for writer in writers.values():
        #    writer.writeheader()
            
        validator = cerberus.Validator()
        errs = []
        user_ids = set()
    
        for elem in get_element(file_in, tags=('node', 'way')):
            
            parsed = {}
            if elem.tag == "node":
                node, tags, errs_curr = audit_node(elem)
                if node:
                    parsed['nodes'] = node
                    parsed['nodes_tags'] = tags
                        
            elif elem.tag == "way":
                way, way_nodes, tags, errs_curr = audit_way(elem)
                if way:
                    parsed['ways'] = way 
                    parsed['ways_nodes'] = way_nodes
                    parsed['ways_tags'] = tags 
                    
            if errs_curr:
                errs.append(('bad elem', elem.attrib, errs_curr))
                #pp.pprint(errs[-1])
            
            if parsed:
                
                # move username to the users table
                uid = parsed[elem.tag + 's']['uid']
                username = parsed[elem.tag + 's']['user']
                del parsed[elem.tag + 's']['user']

                if uid not in user_ids:
                    user_ids.add(uid)
                    parsed['users'] = {'id': uid,
                                      'username': username}
                
                if validate is True:
                    validate_element(parsed, validator)
            
                for table in parsed:
                    writers[table].write(parsed[table])
                    
process_map(osm_filename, validate=True)