In [17]:
import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET

import cerberus

import schema

In [18]:
#指定输入文件和输出文件名称
OSM_PATH = "chicago.osm"

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"


LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

SCHEMA = schema.schema

#清洗数据使用的映射集
mapping = { "St": "Street",
            "St.": "Street",
            "Rd.":"Road",
            "Ave":"Avenue",
            "Rd":"Road",
            "IL":"Illinois"
            }

In [19]:
# CSV文件的表头
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

In [20]:
#更新数据所用的程序
def update_name(name):
    m = street_type_re.search(name).group()
    if m in list(mapping):
        name = name.replace(m,mapping[m])        
    return name

In [21]:
#对“node”或“way”子标签“tag”进行清洗和整理
def get_tags(element,problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    tags = []
    if element.findall('./tag'):
        for ele in element.findall('./tag'):
            #对key值的大写字母改为小写
            ele.attrib['k'] = ele.attrib['k'].lower()
            if not problem_chars.search(ele.attrib['k']) and not problem_chars.search(ele.attrib['v']):
                if LOWER_COLON.search(ele.attrib['k']):
                    type = re.compile(r'[a-z]*(?=\:)').search(ele.attrib['k']).group()
                    key = ele.attrib['k'].replace(type+':','')
                    #对街名进行清洗
                    if type == 'addr' and key == 'street':
                        ele.attrib['v'] = update_name(ele.attrib['v'])
                else:
                    type = default_tag_type
                    key = ele.attrib['k']
                #修改简写的州名
                if key in ['is_in','county','state']:
                    ele.attrib['v'] = update_name(ele.attrib['v'])
                #清洗重复的州名
                if ele.attrib['v'] == 'Cook,Illinois,Ill.,IL,USA':
                    ele.attrib['v'] = 'Cook,Illinois'
                #替换多余的'_'为空格
                if key in ['highway','railway','amenity','service','leisure','grass']:
                    ele.attrib['v'] = ele.attrib['v'].replace('_',' ')
                tags.append({'id':element.attrib['id'],'key':key,'value':ele.attrib['v'],'type':type})
    return tags


In [22]:
#生成node或者way的字典以供写入csv文件
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS):
    node_attribs = {}
    way_attribs = {}
    way_nodes = []

    if element.tag == 'node':
        for e in node_attr_fields:
            node_attribs[e] = element.attrib[e]
        tags = get_tags(element)
        return {'node': node_attribs, 'node_tags': tags}
        
    elif element.tag == 'way':
        for e in way_attr_fields:
            way_attribs[e] = element.attrib[e]  
        tags = get_tags(element)
        if  element.findall('./nd'):
            i = 0
            for ele in element.findall('./nd'):
                way_nodes.append({'id':element.attrib['id'],'node_id':ele.attrib['ref'],'position':i})    
                i = i+1
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}

In [23]:
# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)

In [24]:
# ================================================== #
#                   主程序                           #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'wb') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'wb') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'wb') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'wb') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'wb') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()
        
        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            #print el
            if el:
                if validate is True:
                    validate_element(el, validator)
                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # 由于芝加哥的地图文件非常大，我只检查了样本格式的有效性。
    process_map(OSM_PATH, validate=False)

In [26]:
print 'Finish'

Finish
