Project Summary: 

In this project, I use data wrangling techniques, such as assessing the quality of the data for validity, accuracy, completeness, consistency and uniformity, to clean OpenStreetMap data. Then I convert the dataset from XML to CSV format, import the cleaned .csv files into database, conduct SQL queries to provide a statistical overview of the dataset. Finally, I give some additional suggestions for improving and analyzing the data.
Map Area: Stokholm
Split osm file into a smaller sample (SAMPLE_FILE). The original file (Stockholm) is 2GB. Challenges: activating python2 via source activate py2 to be able to run the following code(the code is written for py2). I stared with k=10

In [1]:
import xml.etree.ElementTree as ET  # we can use cElementTree or lxml if too slow

OSM_FILE = "stockholm_sweden.osm"  
SAMPLE_FILE = "sample.osm"

k = 10 # Parameter: take every k-th top level element

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'wb') as output:
    output.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    output.write('<osm>\n  ')

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write('</osm>')

Parse data-set and identify different tags, using iterative parsing.

In [2]:
import xml.etree.cElementTree as ET
import pprint
from collections import defaultdict
import collections

def count_tags(filename):
        all_tags=ET.iterparse(filename)
        nodes= defaultdict(int)
        for node in all_tags:
            nodes[node[1].tag] +=1
        return dict(nodes)           
    
def test():

    tags = count_tags(SAMPLE_FILE)
    pprint.pprint(tags)

if __name__ == "__main__":
    test()

{'member': 19825,
 'nd': 745157,
 'node': 610544,
 'osm': 1,
 'relation': 1012,
 'tag': 216022,
 'way': 69782}


Unique users contributed to the map in this particular area:

In [3]:
def process_map(filename):
    users = set()
    for _, element in ET.iterparse(filename):
        if "uid" in element.attrib:
            users.add(element.get('uid'))

    return users

def test():

    users = process_map(SAMPLE_FILE)
    pprint.pprint(len(users))
#    assert len(users) == 6

if __name__ == "__main__":
    test()

1883


# auditing 
One of the usual problems in openstreetmap dataset is from the street name abbreviation. However, I have not found any problems by only looking at the osm file. Here I will try to find something via my code.
1-Building the regular expression to match the last element in the string, where usually the street type is based. 
2-Then based on the street abbreviation, create a mapping that need to be cleaned.

I tried all sort of changes in my code, however the resut looks pretty good(Swedes are really good at documentation afterall ;). in addition, Swedish wording is differnet in many ways. Namely for a street-name+street_type it usually only one world 'namestreet'; for example 'axfordstreet' as only one word. therefore, if the code recognises the last word which is not detected as an expected will return it as a name to be updated. to avoid the confusion and to avoid printing 2G worth of streettypes, I only dupdate street types with lower case in their first letter with most common swidish street types.

In [15]:
import re
import xml.etree.cElementTree as ET
import pprint
from collections import defaultdict
import collections

street_types = collections.defaultdict(set)
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

expected = [ "Väg", "Gatan", "Alle","Allé", "väg","torg","gatan","alle", "Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road",
            "Trail", "Parkway", "Commons", "Cove", "Alley", "Park", "Way", "Walk" "Circle", "Highway",
            "Plaza", "Path", "Center", "Mission", "Kyrka", "kyrka"]

mapping = { "väg": "Väg" ,
           "torg": "Torg",

            "gata":"Gatan",
            "gatan": "Gatan" ,
            "allé" :"Alle",
           "boulevard":"Boulevard",
           

            }

def audit_street_type(street_types, street_name):
#    print street_name
    m = street_type_re.search(street_name) #finds the pattern of last words
    if m:
        street_type = m.group() #returns the last word

        if street_type in expected:  ## here is my own interpretation of expected (for english speaking countries i would use #"if street_type in expected: )#

            street_types[street_type].add(street_name)
            print street_types

def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")


def audit(osmfile):
    osm_file = open(osmfile, "r")
   
    for event, elem in ET.iterparse(osmfile, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types

if __name__ == "__main__":
    
    audit(SAMPLE_FILE)
    pprint.pprint(audit(SAMPLE_FILE) )

#    for name, street in street_types.items():

#        print("/nSet:", name, "Entries:"),
#        for item in street:

#            print (item)




defaultdict(<type 'set'>, {'torg': set(['Valla torg'])})
defaultdict(<type 'set'>, {u'gatan': set([u'Gr\xf6na gatan']), 'torg': set(['Valla torg'])})
defaultdict(<type 'set'>, {u'gatan': set([u'Gr\xf6na gatan']), 'torg': set(['Valla torg'])})
defaultdict(<type 'set'>, {u'gatan': set([u'Gr\xf6na gatan']), 'torg': set(['Valla torg'])})
defaultdict(<type 'set'>, {u'gatan': set([u'Gr\xf6na gatan']), 'torg': set(['Valla torg'])})
defaultdict(<type 'set'>, {u'gatan': set([u'Gr\xf6na gatan']), 'torg': set(['Valla torg'])})
defaultdict(<type 'set'>, {u'gatan': set([u'Gr\xf6na gatan']), 'torg': set(['Valla torg', 'Kista torg'])})
defaultdict(<type 'set'>, {u'gatan': set([u'Gr\xf6na gatan']), 'Gatan': set(['Tysta Gatan']), 'torg': set(['Valla torg', 'Kista torg'])})
defaultdict(<type 'set'>, {u'gatan': set([u'Gr\xf6na gatan']), 'Gatan': set(['Tysta Gatan']), 'torg': set(['Valla torg', 'Kista torg'])})
defaultdict(<type 'set'>, {u'gatan': set([u'Gr\xf6na gatan']), 'Gatan': set(['Tysta Gatan']), '

In [16]:
def update_name(name, mapping, regex):
    m = regex.search(name)
    if m:
        st_type = m.group()
        if st_type in mapping:
            name = re.sub(regex, mapping[st_type], name)
    return name
for street_type, ways in street_types.iteritems():
    for name in ways:
        better_name = update_name(name, mapping, street_type_re)
        print name, "=>", better_name

Lugna gatan => Lugna Gatan
Gröna gatan => Gröna Gatan
Finska gatan => Finska Gatan
Östra Ågatan => Östra ÅGatan
Västra Ågatan => Västra ÅGatan
Breda Gatan => Breda Gatan
Långa Gatan => Långa Gatan
Tysta Gatan => Tysta Gatan
Gustaf de Lavals torg => Gustaf de Lavals Torg
Valla torg => Valla Torg
Kista torg => Kista Torg
Gustav III:s Boulevard => Gustav III:s Boulevard


Checking ‘k’ value for each tag. creating a dictionary of the different tags. Regular expressions: lower is for valid only-lowercase-letter tags. lower_colon is for other valid tags with a colon in the value. problemchars is for tags with problematic characters.

In [None]:
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')


def key_type(element, keys):
    if element.tag == "tag":
        if re.match(lower, element.attrib['k']):
            keys["lower"] += 1
        elif re.match(lower_colon, element.attrib['k']):
            keys["lower_colon"] += 1
        elif re.search(problemchars, element.attrib['k']):
            keys["problemchars"] += 1
        else:
            keys['other'] += 1
    return keys


def process_map(filename):
    keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
    for _, element in ET.iterparse(filename):
        keys = key_type(element, keys)

    return keys

sf_all_keys = process_map(SAMPLE_FILE)
print sf_all_keys

Auditing postal codes. The first two digit of postal codes in sweden is 72.

In [None]:
import collections
def audit_zipcode(invalid_zipcodes, zipcode):
    twoDigits = zipcode[0:2]
    
    if twoDigits != 72 or not twoDigits.isdigit():
        invalid_zipcodes[twoDigits].add(zipcode)
        
def is_zipcode(elem):
    return (elem.attrib['k'] == "addr:postcode")

def audit_zip(osmfile):
    osm_file = open(osmfile, "r")
    invalid_zipcodes = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_zipcode(tag):
                    audit_zipcode(invalid_zipcodes,tag.attrib['v'])

    return invalid_zipcodes

sf_zipcode = audit_zip(SAMPLE_FILE)
pprint.pprint(dict(sf_zipcode))


Problems Encountered: Inconsistent postal codes! Although I indicated invalid zipcode in a broad set, many of the above zipcodes are valid and have no promlebs. In Stockholm area zip codes all begin with “72” or “41”, however some of zip codes were outside this region.
In the following code, I modify the function to clean zip code, change xxx xx-xxxx format into 5 digits format, to remove the blank in the middle and create a consistant zipcode. 

In [None]:
def update_zip(zipcode):
    zipcode= zipcode.replace(" ","")
    zipChar = re.findall('[a-zA-Z]*', zipcode)
    if zipChar:
        zipChar = zipChar[0]
    zipChar.strip()
    if zipChar == "u":
        updateZip = re.findall(r'\d+', zipcode)
        if updateZip:
            return ((re.findall(r'\d+', zipcode))[0])
    else:
            
        d=((re.findall(r'\d+', zipcode))[0])
        return d
        


for street_type, ways in sf_zipcode.iteritems():
    for name in ways:
        better_name = update_zip(name)
        print name, "=>", better_name

After auditing is completed the next step is to prepare the data to be inserted into a SQL database.
To do so I did parse the elements in the OSM XML file, transforming them from document format to
tabular format, thus making it possible to write to .csv files.  These csv files can then easily be
imported to a SQL database as tables.

In [None]:
import csv
import codecs
import re
import xml.etree.cElementTree as ET

import cerberus

import schema

OSM_PATH = "example.osm"

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
                       # r'^([a-z]|_)*:([a-z]|_)*$'
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema.schema

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


def load_new_tag(element, secondary, default_tag_type):
    """
    Load a new tag dict to go into the list of dicts for way_tags, node_tags
    """
    new = {}
    new['id'] = element.attrib['id']
    if ":" not in secondary.attrib['k']:
        new['key'] = secondary.attrib['k']
        new['type'] = default_tag_type
    else:
        post_colon = secondary.attrib['k'].index(":") + 1
        new['key'] = secondary.attrib['k'][post_colon:]
        new['type'] = secondary.attrib['k'][:post_colon - 1]
    new['value'] = secondary.attrib['v']
    print "!23123"
    print secondary.attrib['v']
    print"!2312"
    return new


def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements

    if element.tag == 'node':
        for attrib, value in element.attrib.iteritems():
            if attrib in node_attr_fields:
                node_attribs[attrib] = value
        
        # for elements within the top element
        for secondary in element.iter():
            if secondary.tag == 'tag':
                if problem_chars.match(secondary.attrib['k']) is not None:
                    continue
                else:
                    new = load_new_tag(element, secondary, default_tag_type)
                    tags.append(new)
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        for attrib, value in element.attrib.iteritems():
            if attrib in way_attr_fields:
                way_attribs[attrib] = value
                
        counter = 0
        for secondary in element.iter():
            if secondary.tag == 'tag':
                if problem_chars.match(secondary.attrib['k']) is not None:
                    continue
                else:
                    new = load_new_tag(element, secondary, default_tag_type)
                    tags.append(new)
            if secondary.tag == 'nd':
                newnd = {}
                newnd['id'] = element.attrib['id']
                newnd['node_id'] = secondary.attrib['ref']
                newnd['position'] = counter
                counter += 1
                way_nodes.append(newnd)
        
        # print {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}



#               Helper Functions                     #

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
            for k, v in errors.iteritems()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)


#               Main Function                        #

def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
        codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(SAMPLE_FILE, validate=True)

Some helper functions. additional help!

In [13]:

sqlite> SELECT * FROM nodes WHERE id IN (SELECT DISTINCT(id) FROM nodes_tags_file WHERE key='postcode' AND value='48009')

SyntaxError: invalid syntax (<ipython-input-13-a79ad7212b60>, line 2)

Overview of the data
This section contains basic statistics about the dataset, the SQL queries used to gather them, and some additional ideas about the data in context.
File Size
Number of Nodes
Number of Ways
Number of unique users
Top 10 contrinuters 



Additional ideas:
List of top 20 Amenities in Stockholm

In [17]:
import sqlite3
import  pandas as pd

conn = sqlite3.connect(process_map(SAMPLE_FILE, validate=True))

def ll(c):
    c = conn.cursor()
    c.execute("SELECT value, COUNT(*) as num \
            FROM nodes_tags \
           WHERE key='amenity' \
           GROUP BY value \
           ORDER BY num DESC \
           LIMIT 20;")
    
    return cursor.fetchall()
if __name__ == '__main__':
    pprint.pprint(ll(c))

TypeError: process_map() got an unexpected keyword argument 'validate'

In [28]:
import sqlite3
import  pandas as pd
con = sqlite3.connect("nodes_tags.csv")
tracks = pd.read_sql_query("SELECT COUNT(*) FROM con", con=con)
tracks

DatabaseError: Execution failed on sql 'SELECT COUNT(*) FROM con': file is encrypted or is not a database

In [None]:
sqlite> SELECT nodes_tags.value, COUNT(*) as num
FROM nodes_tags 
    JOIN (SELECT DISTINCT(id) FROM nodes_tags WHERE value='restaurant') i 
    ON nodes_tags.id=i.id
WHERE nodes_tags.key="cuisine"
GROUP BY nodes_tags.value
ORDER BY num DESC;

Conclusion
From the process of auditing we can see the dataset is fairly well-cleaned even though there are some minor error such as inconsistent postal codes. Since there are thousands of contributing users, so it is inevitable to have many human input error. My thought is: is it possible to create a monitor system to check everybody’s contribution regularly. In addition, because OpenStreetMaps is an open source project, there’re still a lot of areas map outdated such as my fav city, Stockholm. So I hope OpenStreetMaps can obtain these data from other open data sources.

Additional Suggestion and Ideas

Control typo errors

We can build parser which parse every word input by the users.
We can make some rules or patterns to input data which users follow everytime to input their data. This will also restrict users input in their native language.
We can develope script or bot to clean the data regularly or certain period.