In [63]:
import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET

from cerberus import validator

import schema


OSM_PATH = "CSsample.xml"
##file_in = open("CSsample2")
##root = ET.parse("CSsample2").getroot()

def find_element():
    element = []
    for item in root.find("node"):
        element = ET.dump(item)
        return element
    
element = find_element()

In [64]:
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

##SCHEMA = schema.Schema
SCHEMA = "schema.py"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

In [11]:
##Run only once
'''Create smaller, sample file for running tests'''
'''
OSM_FILE = "CSMap2"
SAMPLE_FILE = "CSsample2"

k = 10 # Parameter: take every k-th top level element

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'wb') as output:
    output.write(('<?xml version="1.0" encoding="UTF-8"?>\n').encode())
    output.write(('<osm>\n  ').encode())

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write(('</osm>').encode())'''

In [65]:
'''helper functions for tag assignments in shape_element function'''
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()

def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))

class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, bytes) else v) for k, v in row.items()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)

In [66]:
def create_key(key_string):
    '''
    find "k" values and assign seperate keys and tag types
    '''
    if ":" in key_string:
        new_string = key_string.find(":")
        tag_type = key_string[:new_string]
        new_key = key_string[new_string+1:]
        return [new_key, tag_type]
        
    else:
        new_key = key_string
        tag_type = "regular"
        return [new_key, tag_type]

In [67]:
'''Find secondary tags and add them to tags list of dictionaries'''
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""
    """element is a node or way"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []
    if element.tag == 'node':
        for childName, childValue in element.attrib.items():
            if childName in NODE_FIELDS:
                node_attribs[childName] = childValue
            
        for child in element.iter('tag'):    
            dict = {}
            if PROBLEMCHARS.search(child.attrib['k']):
                continue
            else:
                dict['id'] = element.attrib['id']
                dict['key'] = create_key(child.attrib['k'])[0]
                dict['type'] = create_key(child.attrib['k'])[1]
                dict['value'] = child.attrib['v']
            tags.append(dict)
        
        return {'node': node_attribs, 'node_tags': tags}
        
    elif element.tag == 'way':
        
        for childName, childValue in element.attrib.items():
            if childName in WAY_FIELDS:
                way_attribs[childName] = childValue
                
        '''Process way tags like node tags'''
        for child in element.iter('tag'):
            dict = {}
            if PROBLEMCHARS.search(child.attrib['k']):
                continue
            else:
                dict['id'] = element.attrib['id']
                dict['key'] = create_key(child.attrib['k'])[0]
                dict['type'] = create_key(child.attrib['k'])[1]
                dict['value'] = child.attrib['v']
            tags.append(dict)
        print(tags)
        
        for num, child in enumerate(element.iter('nd')):
            dict = {}
            dict['id'] = element.attrib['id']
            dict['node_id'] = child.attrib['ref']
            dict['position'] = num
            way_nodes.append(dict)
            
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}


In [68]:
NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "node_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "way_nodes.csv"
WAY_TAGS_PATH = "way_tags.csv"

def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()                
            
        validator = cerberus.Validator()
        
        
        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])

In [69]:
if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_PATH, validate=True)

SchemaError: schema definition for field 'schema.py' must be a dict

##Sources

https://github.com/SpecCRA/quiz_preparing_for_database_sql/blob/master/quiz_preparing_for_database_sql.py
