In [1]:
import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET
import cerberus

In [2]:
SCHEMA= {
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": True
   },
   "outputs": [],
   "source": [
    "# Note: The schema is stored in a .py file in order to take advantage of the\n",
    "# int() and float() type coercion functions. Otherwise it could easily stored as\n",
    "# as JSON or another serialized format.\n",
    "\n",
    "schema = {\n",
    "    'node': {\n",
    "        'type': 'dict',\n",
    "        'schema': {\n",
    "            'id': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "            'lat': {'required': True, 'type': 'float', 'coerce': float},\n",
    "            'lon': {'required': True, 'type': 'float', 'coerce': float},\n",
    "            'user': {'required': True, 'type': 'string'},\n",
    "            'uid': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "            'version': {'required': True, 'type': 'string'},\n",
    "            'changeset': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "            'timestamp': {'required': True, 'type': 'string'}\n",
    "        }\n",
    "    },\n",
    "    'node_tags': {\n",
    "        'type': 'list',\n",
    "        'schema': {\n",
    "            'type': 'dict',\n",
    "            'schema': {\n",
    "                'id': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "                'key': {'required': True, 'type': 'string'},\n",
    "                'value': {'required': True, 'type': 'string'},\n",
    "                'type': {'required': True, 'type': 'string'}\n",
    "            }\n",
    "        }\n",
    "    },\n",
    "    'way': {\n",
    "        'type': 'dict',\n",
    "        'schema': {\n",
    "            'id': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "            'user': {'required': True, 'type': 'string'},\n",
    "            'uid': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "            'version': {'required': True, 'type': 'string'},\n",
    "            'changeset': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "            'timestamp': {'required': True, 'type': 'string'}\n",
    "        }\n",
    "    },\n",
    "    'way_nodes': {\n",
    "        'type': 'list',\n",
    "        'schema': {\n",
    "            'type': 'dict',\n",
    "            'schema': {\n",
    "                'id': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "                'node_id': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "                'position': {'required': True, 'type': 'integer', 'coerce': int}\n",
    "            }\n",
    "        }\n",
    "    },\n",
    "    'way_tags': {\n",
    "        'type': 'list',\n",
    "        'schema': {\n",
    "            'type': 'dict',\n",
    "            'schema': {\n",
    "                'id': {'required': True, 'type': 'integer', 'coerce': int},\n",
    "                'key': {'required': True, 'type': 'string'},\n",
    "                'value': {'required': True, 'type': 'string'},\n",
    "                'type': {'required': True, 'type': 'string'}\n",
    "            }\n",
    "        }\n",
    "    }\n",
    "}\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": None,
   "metadata": {
    "collapsed": True
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}


In [3]:
OSM_PATH = "Downloads/data_wrangling/hcm.osm"

In [4]:
# Functions to update and clean the data
expected = ["Duong","duong", "đường", "Đường", 'DUONG', "Hẻm", "hẻm",'Hem','HẺM']

def is_street(string):
    for i in expected:
        if i in string:
            return True
        
def update_street(tag):
    if is_street(tag.attrib['v']):
        tag.attrib['k'] = 'street'
            
def update_route(tag):
    tag.attrib['k'] = 'street'

In [5]:
NODES_PATH = "Downloads/data_wrangling/csv/nodes.csv"
NODE_TAGS_PATH = "Downloads/data_wrangling/csv/nodes_tags.csv"
WAYS_PATH = "Downloads/data_wrangling/csv/ways.csv"
WAY_NODES_PATH = "Downloads/data_wrangling/csv/ways_nodes.csv"
WAY_TAGS_PATH = "Downloads/data_wrangling/csv/ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""
    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements
    if element.tag == 'node':
        for attrib in element.attrib:
            if attrib in node_attr_fields:
                node_attribs[attrib] = element.attrib[attrib]
    
    
        for child in element:
            if child.tag == "tag" and problem_chars.match(child.attrib['k']) == None:
                    tag = {}
                    tag['id'] = element.attrib['id']
                    key = child.attrib['k']
                    if ':' in key:
                        tag['key'] = key.split(':', 1)[1]
                    else:
                        if key == 'name':
                            tag['key'] = update_street(child)
                        elif key == 'route':
                            tag['key'] = update_route(child)
                        else:
                            tag['key'] = key
                    tag['value'] = child.attrib['v']
                    if key.find(':') == -1:
                        tag['type'] = default_tag_type
                    else:
                        tag['type'] = key[:key.find(':')]
                    tags.append(tag)
    
        return {'node': node_attribs, 'node_tags': tags}
    
    elif element.tag == 'way':
        for attrib in element.attrib:
            if attrib in way_attr_fields:
                way_attribs[attrib] = element.attrib[attrib]
        position = 0
        for child in element:
            if child.tag == "nd":
                for attrib in child.attrib:
                    way_node = {}
                    way_node['id'] = element.attrib['id']
                    way_node['node_id'] = child.attrib['ref']
                    way_node['position'] = position
                    position += 1
                    way_nodes.append(way_node)
            
            if child.tag == "tag" and problem_chars.match(child.attrib['k']) == None:
                    tag = {}
                    tag['id'] = element.attrib['id']
                    key = child.attrib['k']
                    key = child.attrib['k']
                    if ':' in key:
                        tag['key'] = key.split(':', 1)[1]
                    else:
                        if key == 'name':
                            tag['key'] = update_street(child)
                        elif key == 'route':
                            tag['key'] = update_route(child)
                        else:
                            tag['key'] = key
                    tag['value'] = child.attrib['v']
                    if key.find(':') == -1:
                        tag['type'] = default_tag_type
                    else:
                        tag['type'] = key[:key.find(':')]
                    tags.append(tag)
    
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}


# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, bytes) else ", ".join(v))
            for k, v in errors.iteritems()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, str) else v) for k, v in row.items()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)



# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = csv.DictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = csv.DictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = csv.DictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = csv.DictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = csv.DictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_PATH, validate=False)
