In [1]:
sc

<pyspark.context.SparkContext at 0x110e94d90>

In [2]:
import rtree
import datetime
import operator
import os
import sys
import time
import pyspark
from operator import add
import numpy as np
import matplotlib.path as mplPath
start = time.time()

In [3]:
def geojson_create(filename,data):
    import json
    coordinatesList = {}
    with open ('block-groups-polygons.geojson') as dataFile:
        blockData = json.load(dataFile)
    count = 0
    for i in data:
        for block in blockData['features']:
            if i == block['properties']['OBJECTID']:
                coordinatesList[count] = [block['geometry'],block['properties']]
                count+=1

    template = \
        ''' \
        { "type" : "Feature",
            "id" : %s,
            "properties" : %s,
            "geometry" : %s
            },
        '''

    # the head of the geojson file
    output = \
        ''' \
    { "type" : "FeatureCollection",
        "features" : [
        '''

    for k,v in coordinatesList.iteritems():
        output += template % (k,json.dumps(v[1]),json.dumps(v[0]))

    # the tail of the geojson file
    output += \
        ''' \
        ]
    }
        '''

    # opens an geoJSON file to write the output to
    outFileHandle = open(filename+".geojson", "w")
    outFileHandle.write(output)
    outFileHandle.close()

In [4]:
def indexZones(shapeFilename):
    import rtree
    import fiona.crs
    import geopandas as gpd
    index = rtree.Rtree()
    zones = gpd.read_file(shapeFilename).to_crs(fiona.crs.from_epsg(2263))
    for idx,geometry in enumerate(zones.geometry):
        index.insert(idx, geometry.bounds)
    return (index, zones)

def find_Block(p, index, zones):
    match = index.intersection((p.x, p.y, p.x, p.y))
    for idx in match:
        z = mplPath.Path(np.array(zones.geometry[idx].exterior))
        if z.contains_point(np.array(p)):
            return zones['OBJECTID'][idx]
    return -1

def find_Borough(p, index, zones):
    match = index.intersection((p.x, p.y, p.x, p.y))
    for idx in match:
        if any(map(lambda x: x.contains(p), zones.geometry[idx])):
            return zones['boroname'][idx]
    return -1

In [5]:
def mapToZone(parts):
    import pyproj
    import shapely.geometry as geom
    proj = pyproj.Proj(init="epsg:2263", preserve_units=True)    
    index, zones = indexZones('block-groups-polygons-simple.geojson')
    index2, zones2 = indexZones('boroughs.geojson')
    for line in parts:
        if line.startswith('vendor_id'): continue 
        fields = line.strip('').split(',')
        if fields ==['']: continue
        if all((fields[5],fields[6],fields[9],fields[10])) and float(fields[4])<=2:
            pickup_location  = geom.Point(proj(float(fields[5]), float(fields[6])))
            dropoff_location = geom.Point(proj(float(fields[9]), float(fields[10])))
            pickup_block = find_Block(pickup_location, index, zones)
            dropoff_block = find_Block(dropoff_location, index, zones)
            pickup_borough = find_Borough(pickup_location, index2, zones2)
            dropoff_borough = find_Borough(dropoff_location, index2, zones2)
            if pickup_block>=0 and pickup_borough>0 and dropoff_block>0 and dropoff_borough>0:#np.array(pickup_block.exterior)
                yield (pickup_block,pickup_borough,dropoff_block,dropoff_borough)
                
                
def mapper2(k2v2):
    from heapq import nlargest
    k, values = k2v2
    top10 = nlargest(10, values, lambda x:x[1])
    return (k,top10)

In [6]:
trips = sc.textFile('../yellow_tripdata_2012-07.csv')
#trips = sc.textFile('../yellow_tripdata_2014-07.csv')

output = sc.parallelize(mapToZone(trips.take(20000)))
print output.take(10)
print (time.time()-start)/60.0

[(5439, u'Bronx', 5287, u'Bronx'), (9321, u'Manhattan', 9470, u'Manhattan'), (9000, u'Manhattan', 9572, u'Manhattan'), (8989, u'Manhattan', 9014, u'Manhattan'), (9723, u'Manhattan', 10086, u'Manhattan'), (9394, u'Manhattan', 9192, u'Manhattan'), (10136, u'Manhattan', 9066, u'Manhattan'), (9627, u'Manhattan', 9247, u'Manhattan'), (9326, u'Manhattan', 9055, u'Manhattan'), (9027, u'Manhattan', 9549, u'Manhattan')]
4.14784815311


In [7]:
pickup = output.map(lambda x: ((x[0],x[1]),1)).reduceByKey(lambda x,y: x+y).map(lambda x:(x[0][1], (x[0][0],x[1]))).groupByKey().map(mapper2)
pickup.collect()
dropoff = output.map(lambda x: ((x[2],x[3]),1)).reduceByKey(lambda x,y: x+y).map(lambda x:(x[0][1], (x[0][0],x[1]))).groupByKey().map(mapper2)

In [8]:
pickup.collect()

[(u'Bronx',
  [(6214, 1),
   (6118, 1),
   (5835, 1),
   (6223, 1),
   (5439, 1),
   (5267, 1),
   (6219, 1),
   (5376, 1),
   (5888, 1),
   (5889, 1)]),
 (u'Manhattan',
  [(9144, 248),
   (8986, 144),
   (9052, 142),
   (9493, 130),
   (9245, 118),
   (9139, 105),
   (9594, 99),
   (9509, 81),
   (9085, 80),
   (9365, 77)]),
 (u'Brooklyn',
  [(12349, 7),
   (12860, 5),
   (12756, 5),
   (11291, 5),
   (12043, 5),
   (12757, 4),
   (12882, 4),
   (11331, 4),
   (11292, 3),
   (12736, 3)]),
 (u'Staten Island', [(7649, 1)]),
 (u'Queens',
  [(2281, 27),
   (3722, 13),
   (2282, 8),
   (3431, 5),
   (2169, 5),
   (2165, 4),
   (3023, 3),
   (2815, 3),
   (3147, 3),
   (2371, 3)])]

In [9]:
dropoff.collect()

[(u'Bronx',
  [(6198, 1),
   (5486, 1),
   (5194, 1),
   (5398, 1),
   (5282, 1),
   (6039, 1),
   (6123, 1),
   (6223, 1),
   (5287, 1),
   (6219, 1)]),
 (u'Manhattan',
  [(9144, 359),
   (9139, 146),
   (8986, 137),
   (9493, 132),
   (9245, 116),
   (9143, 115),
   (9052, 103),
   (9509, 97),
   (9023, 97),
   (9608, 85)]),
 (u'Brooklyn',
  [(12049, 6),
   (12750, 6),
   (12856, 4),
   (12349, 3),
   (12730, 3),
   (11290, 3),
   (12746, 3),
   (11459, 3),
   (11023, 3),
   (12044, 2)]),
 (u'Staten Island', [(7649, 1)]),
 (u'Queens',
  [(2281, 20),
   (3722, 12),
   (2282, 8),
   (3431, 5),
   (2169, 4),
   (2388, 4),
   (2562, 3),
   (2167, 2),
   (2691, 2),
   (2395, 2)])]