In [10]:
sc

In [11]:
import fiona
import fiona.crs
import shapely
import rtree
import pandas as pd
import geopandas as gpd

neighborfile ='neighborhoods.geojson'
neighborhoods = gpd.read_file(neighborfile).to_crs(fiona.crs.from_epsg(2263))
neighborhoods.head()

Unnamed: 0,neighborhood,borough,geometry
0,Arverne,Queens,(POLYGON ((1042695.412681766 157932.5909777609...
1,Broad Channel,Queens,(POLYGON ((1041567.549112549 166236.1995676621...
2,City Island,Bronx,(POLYGON ((1044339.234277936 246814.3387770021...
3,Edgemere,Queens,"(POLYGON ((1048643.510334172 157165.035308915,..."
4,Ellis Island,Manhattan,(POLYGON ((972697.5892621228 193015.8087967168...


In [12]:
boroughsfile ='boroughs.geojson'
boroughs = gpd.read_file(boroughsfile).to_crs(fiona.crs.from_epsg(2263))
boroughs.head()

Unnamed: 0,boroname,geometry
0,Staten Island,(POLYGON ((970217.0401832734 145643.3197584194...
1,Queens,(POLYGON ((1029606.004483169 156073.9244595101...
2,Brooklyn,(POLYGON ((1021176.616410234 151374.8056603043...
3,Manhattan,"(POLYGON ((981219.17021961 188655.1436204575, ..."
4,Bronx,"(POLYGON ((1012821.759526813 229228.102345575,..."


In [16]:
def createIndex(shapefile):
    import rtree
    import fiona.crs
    import geopandas as gpd
    zones = gpd.read_file(shapefile).to_crs(fiona.crs.from_epsg(2263))
    index = rtree.Rtree()
    for idx,geometry in enumerate(zones.geometry):
        index.insert(idx, geometry.bounds)
    return (index, zones)


def findPickupNeighborZone(p, index, zones):
    match = index.intersection((p.x, p.y, p.x, p.y))
    for idx in match:
        if zones.geometry[idx].contains(p):
            return zones['neighborhood'][idx]
    return None

def findDropoffBroughZone(p, index, zones):
    match = index.intersection((p.x, p.y, p.x, p.y))
    for idx in match:
        if zones.geometry[idx].contains(p):
            return zones['boroname'][idx]
    return None

def processTrips(pid, records):
    import csv
    import pyproj
    import shapely.geometry as geom
    
    proj = pyproj.Proj(init="epsg:2263", preserve_units=True)    
    neighborIndex, neighborZones = createIndex('neighborhoods.geojson')  
    broughIndex, broughZones = createIndex('boroughs.geojson')  
    
    if pid==0:
        next(records)
    reader = csv.reader(records)
    counts = {}
    for row in reader:
        if row[2] == 'NULL' or row[3] == 'NULL' or row[4] == 'NULL'or row[5] =='NULL':
            continue
        pickupP = geom.Point(proj(float(row[3]), float(row[2])))
        dropoffP =   geom.Point(proj(float(row[5]), float(row[4])))
        pickupZone = findPickupNeighborZone(pickupP, neighborIndex, neighborZones)
        dropOffZone = findDropoffBroughZone(dropoffP, broughIndex, broughZones)
        
        if pickupZone != None and  dropOffZone != None :
            yield ((pickupZone, dropOffZone), 1)
            
from pyspark import SparkContext
if __name__ == "__main__":
        sc = SparkContext()
        rdd = sc.textFile('green.csv')
        rdd.mapPartitionsWithIndex(processTrips) \
              .reduceByKey(lambda x,y: x+y) \
              .saveAsTextFile('output7')


KeyboardInterrupt: 