In [1]:
import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET 
import schema
import cerberus


In [2]:
# %load 'schema.py'


# In[3]:

schema = {
    'node': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'lat': {'required': True, 'type': 'float', 'coerce': float},
            'lon': {'required': True, 'type': 'float', 'coerce': float},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'node_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    },
    'way': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'way_nodes': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'node_id': {'required': True, 'type': 'integer', 'coerce': int},
                'position': {'required': True, 'type': 'integer', 'coerce': int}
            }
        }
    },
    'way_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    }
}


# source
# https://gist.github.com/swwelch/f1144229848b407e0a5d13fcb7fbbd6f#file-data_wrangling_schema-sql


In [3]:
OSM_FILE='sample.osm'

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


In [4]:
# Clean Street Types.  Created a Map
#Mapping of street names observed in the list of data of unexpected street types. 
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

street_mapping = { "St": "Street",
            "St.": "Street", 
            "STREET":"Street",
            "Rd": "Road",
            "Rd.": "Road",
            "Ave": "Avenue",
            "Ave.":"Avenue",
            "avenue":"Avenue",
           "AVENUE":"Avenue",
           "Avene":"Avenue",
            "S":"South",
           "W":"West",
           "N":"North",
           "E":"East",
           "Pkwy":"Parkway",
           "Cir":"Circle",
           "Blvd":"Boulevard"
            }

def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")

#Function to take an unexpected name and replace it with a new name from mapping. 
def update_street_name(name, mapping):
    m = street_type_re.search(name)
    if m:
        street_type = m.group()
        if street_type in mapping:
            street_type_fixed=mapping.get(street_type,)
            name = name.replace(street_type, street_type_fixed)
    return name

#Checks for Zipcode and replaces zipcode 
zip_code_re= re.compile('\d{5}$', re.IGNORECASE)

def is_zipcode_name(elem):
    return (elem.attrib['k'] == "addr:postcode")

def update_zipcode_name(name):
    name=name[:5]
    return name 

state_type_re = re.compile('qr/(Alabama|Alaska|Arizona|Arkansas|California|Colorado|Connecticut|Delaware|Florida|Georgia|Hawaii|Idaho|Illinois|Indiana|Iowa|Kansas|Kentucky|Louisiana|Maine|Maryland|Massachusetts|Michigan|Minnesota|Mississippi|Missouri|Montana|Nebraska|Nevada|New\sHampshire|New\sJersey|New\sMexico|New\sYork|North\sCarolina|North\sDakota|Ohio|Oklahoma|Oregon|Pennsylvania|Rhode\sIsland|South\sCarolina|South\sDakota|Tennessee|Texas|Utah|Vermont|Virginia|Washington|West\sVirginia|Wisconsin|Wyoming)/', re.IGNORECASE)


def is_state_addr(elem):
    return (elem.attrib['k'] == "addr:state")

# Changes Street types 
state_mapping = { "NY": "New York",
                 "NJ":"New Jersey",
            "ny": "New York",
           "Ny": "New York",
           "NJ.": "New Jersey", 
            "Nj":"New Jersey",
           "nj": "New Jersey",
            "CT":"Conneticut"
            }

def update_state(name, mapping):
    m = state_type_re.search(name)
    if not m:
        state_type = name
        if state_type in mapping:
            state_type_fixed=mapping.get(name,)
            name = name.replace(state_type, state_type_fixed)
    return name


In [5]:
def add_to_dict(element,node_attr_fields,attribs):
    for any_name_that_you_want in node_attr_fields:
        attribs[ any_name_that_you_want]=element.attrib[ any_name_that_you_want]
    return attribs

def audit_ktagsvalue(element, k, node_tags,problemchars=PROBLEMCHARS, lower_colon=LOWER_COLON): 
    if lower_colon.search(element.attrib[k]): 
        split_element=element.attrib[k].split(":",1)
        node_tags["key"]=split_element[1]
        node_tags["type"]= split_element[0]
        if is_street_name(element): 
            node_tags["value"]=update_street_name(element.attrib["v"],street_mapping)
        elif is_zipcode_name(element):
            node_tags["value"]=update_zipcode_name(element.attrib["v"])
        elif is_state_addr(element):
            node_tags["value"]=update_state(element.attrib["v"],state_mapping)
        else: 
            node_tags["value"]=element.attrib["v"]         
    else: 
        node_tags["key"]=element.attrib[k]
        node_tags["type"]= "regular"
        if is_street_name(element): 
            node_tags["value"]=update_street_name(element.attrib["v"],street_mapping)
        elif is_zipcode_name(element):
            node_tags["value"]=update_zipcode_name(element.attrib["v"])
        elif is_state_addr(element):
            node_tags["value"]=update_state(element.attrib["v"],state_mapping)
        else: 
            node_tags["value"]=element.attrib["v"]   
    return node_tags 

In [6]:
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements
    
    
    # YOUR CODE HERE
    if element.tag == 'node':
        add_to_dict(element,node_attr_fields, node_attribs)
        for tag in element:
            node_tags={}
            parsed_data=audit_ktagsvalue(tag, "k", node_tags,problemchars=PROBLEMCHARS, lower_colon=LOWER_COLON)
            parsed_data['id'] = element.attrib['id']
            tags.append(parsed_data)          
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        add_to_dict(element,way_attr_fields, way_attribs)
        counter=0
        for tag in element:
            way_tags={}   
            if tag.tag=="tag":
                parsed_data=audit_ktagsvalue(tag, "k", way_tags,problemchars=PROBLEMCHARS, lower_colon=LOWER_COLON)
                parsed_data['id'] = element.attrib['id']
                tags.append(parsed_data)   
            if tag.tag =="nd":
                way_node={}
                way_node["id"]=element.attrib["id"]
                way_node["node_id"]=tag.attrib["ref"]
                way_node["position"]=counter
                counter+=1
                way_nodes.append(way_node)
                

        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}




In [7]:

# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)


# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_FILE, validate=False)
