# Task

Please write a Spark application that takes the above file yellow_tripdata_2011-05.csv on HDFS as its input and produce the top 3 origin neighborhoods that delivered passengers to each of the five borough based on the number of trips served.

In [1]:
import datetime
import operator
import os
import sys
import time

import pyspark

In [2]:
import rtree
import fiona.crs
import geopandas as gpd
index = rtree.Rtree()
zones = gpd.read_file('neighborhoods.geojson').to_crs(fiona.crs.from_epsg(2263))
# for idx,geometry in enumerate(zones.geometry):
#     index.insert(idx, geometry.bounds)
# return (index, zones, zones.properties)

In [36]:
for idx,geometry in enumerate(zones.geometry):
    index.insert(idx, geometry.bounds)

In [37]:
index

<rtree.index.Index at 0x10ff74290>

In [39]:
zones.borough[1]

u'Queens'

In [40]:
str(zones.borough[1])

'Queens'

In [41]:
zones

Unnamed: 0,borough,geometry,neighborhood
0,Queens,"(POLYGON ((1042695.41268177 157932.5909777562,...",Arverne
1,Queens,(POLYGON ((1041567.549112553 166236.1995676575...,Broad Channel
2,Bronx,"(POLYGON ((1044339.23427794 246814.3387769974,...",City Island
3,Queens,(POLYGON ((1048643.510334175 157165.0353089103...,Edgemere
4,Manhattan,(POLYGON ((972697.5892621264 193015.8087967121...,Ellis Island
5,Manhattan,(POLYGON ((988855.2701673042 209379.9600719489...,Flatiron District
6,Brooklyn,(POLYGON ((1002290.931614941 158511.0111480284...,Gerritsen Beach
7,Brooklyn,(POLYGON ((983172.2402874186 151181.7700830466...,Sea Gate
8,Manhattan,(POLYGON ((988393.9578039527 213817.0774394134...,Theater District
9,Bronx,"(POLYGON ((1032350.96826855 234897.0683896698,...",Throgs Neck


In [3]:
from pyspark.sql import SQLContext
sqlContext=SQLContext(sc)

In [17]:
df = sc.textFile("output.csv", use_unicode=False).cache()

In [18]:
def extractScores(index, lines):
    import csv
    reader = csv.reader(lines)
    for row in reader:
        (b, n) = (row[0].strip("'"), row[1].strip("'"))
        yield ((b,n),1)
        
df1 = df.mapPartitionsWithIndex(extractScores)

In [19]:
df1.take(20)

[(('Manhattan', 'Chelsea'), 1),
 (('Manhattan', 'EastVillage'), 1),
 (('Manhattan', "Hell'sKitchen"), 1),
 (('Manhattan', 'Midtown'), 1),
 (('Manhattan', 'GreenwichVillage'), 1),
 (('Manhattan', 'TheaterDistrict'), 1),
 (('Manhattan', 'UpperEastSide'), 1),
 (('Manhattan', 'Gramercy'), 1),
 (('Manhattan', 'UpperWestSide'), 1),
 (('Manhattan', 'Midtown'), 1),
 (('Manhattan', 'KipsBay'), 1),
 (('Queens', 'UpperEastSide'), 1),
 (('Manhattan', 'EastVillage'), 1),
 (('Manhattan', 'Chelsea'), 1),
 (('Manhattan', 'Midtown'), 1),
 (('Manhattan', 'Midtown'), 1),
 (('Manhattan', 'Midtown'), 1),
 (('Manhattan', 'GreenwichVillage'), 1),
 (('Manhattan', 'EastVillage'), 1),
 (('Manhattan', 'FlatironDistrict'), 1)]

In [52]:
df2 = df1.reduceByKey(lambda a, b: (a+b))\
            .sortBy(lambda x: -x[1]) 


In [69]:
df2.take(20)

[(('Manhattan', 'Midtown'), 2396124),
 (('Manhattan', 'UpperEastSide'), 2029549),
 (('Manhattan', 'UpperWestSide'), 1308388),
 (('Manhattan', 'Chelsea'), 1264311),
 (('Manhattan', "Hell'sKitchen"), 751111),
 (('Manhattan', 'EastVillage'), 555723),
 (('Manhattan', 'WestVillage'), 553480),
 (('Manhattan', 'TheaterDistrict'), 522833),
 (('Manhattan', 'MurrayHill'), 364961),
 (('Manhattan', 'SoHo'), 343788),
 (('Manhattan', 'GreenwichVillage'), 317738),
 (('Manhattan', 'Gramercy'), 310155),
 (('Manhattan', 'KipsBay'), 295427),
 (('Manhattan', 'Tribeca'), 251971),
 (('Manhattan', 'FinancialDistrict'), 244442),
 (('Manhattan', 'CentralPark'), 229968),
 (('Manhattan', 'LaGuardiaAirport'), 229932),
 (('Manhattan', 'FlatironDistrict'), 223023),
 (('Manhattan', 'LowerEastSide'), 187992),
 (('Manhattan', 'EastHarlem'), 133881)]

In [55]:
df3 = df2.map(lambda x: x[0])

In [122]:
df3.take(10)

[('Manhattan', 'Midtown'),
 ('Manhattan', 'UpperEastSide'),
 ('Manhattan', 'UpperWestSide'),
 ('Manhattan', 'Chelsea'),
 ('Manhattan', "Hell'sKitchen"),
 ('Manhattan', 'EastVillage'),
 ('Manhattan', 'WestVillage'),
 ('Manhattan', 'TheaterDistrict'),
 ('Manhattan', 'MurrayHill'),
 ('Manhattan', 'SoHo')]

In [155]:
df4 = df3.reduceByKey(lambda a, b: (a+','+b))

In [156]:
df4.take(10)

[('Manhattan',
  "Midtown,UpperEastSide,UpperWestSide,Chelsea,Hell'sKitchen,EastVillage,WestVillage,TheaterDistrict,MurrayHill,SoHo,GreenwichVillage,Gramercy,KipsBay,Tribeca,FinancialDistrict,CentralPark,LaGuardiaAirport,FlatironDistrict,LowerEastSide,EastHarlem,JohnF.KennedyInternationalAirport,MorningsideHeights,BatteryParkCity,NoHo,Harlem,StuyvesantTown,Nolita,Chinatown,CivicCenter,Williamsburg,LittleItaly,LongIslandCity,DowntownBrooklyn,BrooklynHeights,Astoria,Sunnyside,WashingtonHeights,CarrollGardens,FortGreene,CobbleHill,ParkSlope,BoerumHill,EastElmhurst,Woodside,DUMBO,Greenpoint,DitmarsSteinway,TwoBridges,ProspectHeights,Jamaica,SouthOzonePark,ClintonHill,Inwood,Randall'sIsland,RooseveltIsland,ForestHills,Elmhurst,Bedford-Stuyvesant,SouthSlope,MottHaven,Gowanus,FlushingMeadowsCoronaPark,JacksonHeights,RegoPark,Maspeth,CrownHeights,Concourse,SunsetPark,Flushing,Bushwick,PortMorris,ColumbiaSt,Kensington,Prospect-LeffertsGardens,Briarwood,RichmondHill,ProspectPark,Flatbush,Corona,

In [131]:
df5 = df4.mapValues(lambda x: x.split(",")[:3])

In [132]:
df5.take(10)

[('Manhattan', ['Midtown', 'UpperEastSide', 'UpperWestSide']),
 ('Bronx', ['UpperEastSide', 'Midtown', 'UpperWestSide']),
 ('StatenIsland',
  ['JohnF.KennedyInternationalAirport', 'Midtown', 'FinancialDistrict']),
 ('Brooklyn', ['Midtown', 'EastVillage', 'Williamsburg']),
 ('Queens', ['Midtown', 'UpperEastSide', 'JohnF.KennedyInternationalAirport'])]

In [19]:
df_final = df1.reduceByKey(lambda a, b: (a+b))\
            .sortBy(lambda x: -x[1]) \
            .map(lambda x: x[0]) \
            .reduceByKey(lambda a, b: (a+','+b)) \
            .mapValues(lambda x: x.split(",")[:3]) 

In [20]:
df_final.collect()

[('Manhattan', ['Midtown', 'UpperEastSide', 'UpperWestSide']),
 ('Bronx', ['UpperEastSide', 'Midtown', 'UpperWestSide']),
 ('StatenIsland',
  ['JohnF.KennedyInternationalAirport', 'Midtown', 'FinancialDistrict']),
 ('Brooklyn', ['Midtown', 'EastVillage', 'Williamsburg']),
 ('Queens', ['Midtown', 'UpperEastSide', 'JohnF.KennedyInternationalAirport'])]

In [161]:
dffff3 = df2.map(lambda a: (a[0][0], (a[0][1], a[1])))

In [162]:
dffff3.take(10)

[('Manhattan', ('Midtown', 2396124)),
 ('Manhattan', ('UpperEastSide', 2029549)),
 ('Manhattan', ('UpperWestSide', 1308388)),
 ('Manhattan', ('Chelsea', 1264311)),
 ('Manhattan', ("Hell'sKitchen", 751111)),
 ('Manhattan', ('EastVillage', 555723)),
 ('Manhattan', ('WestVillage', 553480)),
 ('Manhattan', ('TheaterDistrict', 522833)),
 ('Manhattan', ('MurrayHill', 364961)),
 ('Manhattan', ('SoHo', 343788))]

In [173]:
dfff4 = dffff3.reduceByKey(lambda a, b: a+b)

In [175]:
dfff5 = dfff4.mapValues(lambda x: (x[0],x[2],x[4]))

In [176]:
dfff5.take(5)

[('Manhattan',
  ('Midtown', 2396124, 'UpperEastSide', 2029549, 'UpperWestSide', 1308388)),
 ('Bronx', ('UpperEastSide', 9189, 'Midtown', 7943, 'UpperWestSide', 7225)),
 ('StatenIsland',
  ('JohnF.KennedyInternationalAirport',
   415,
   'Midtown',
   385,
   'FinancialDistrict',
   257)),
 ('Brooklyn', ('Midtown', 55328, 'EastVillage', 54647, 'Williamsburg', 48727)),
 ('Queens',
  ('Midtown',
   132378,
   'UpperEastSide',
   66421,
   'JohnF.KennedyInternationalAirport',
   59232))]

In [177]:
df_final_1 = df1.reduceByKey(lambda a, b: (a+b))\
            .sortBy(lambda x: -x[1]) \
            .map(lambda a: (a[0][0], (a[0][1], a[1]))) \
            .reduceByKey(lambda a, b: a+b) \
            .mapValues(lambda x: (x[0],x[2],x[4]))

In [178]:
df_final_1.take(5)

[('Manhattan', ('Midtown', 'UpperEastSide', 'UpperWestSide')),
 ('Bronx', ('UpperEastSide', 'Midtown', 'UpperWestSide')),
 ('StatenIsland',
  ('JohnF.KennedyInternationalAirport', 'Midtown', 'FinancialDistrict')),
 ('Brooklyn', ('Midtown', 'EastVillage', 'Williamsburg')),
 ('Queens', ('Midtown', 'UpperEastSide', 'JohnF.KennedyInternationalAirport'))]

In [20]:
df_final_2 = df1.reduceByKey(lambda a, b: (a+b))\
            .map(lambda a: (a[0][0], (a[0][1], a[1]))) \
            .sortBy(lambda x: -x[1][1]) \
            .reduceByKey(lambda a, b: a+b) \
            .mapValues(lambda x: (x[0],x[2],x[4]))

In [21]:
df_final_2.take(10)

[('Manhattan', ('Midtown', 'UpperEastSide', 'UpperWestSide')),
 ('Bronx', ('UpperEastSide', 'Midtown', 'UpperWestSide')),
 ('StatenIsland',
  ('JohnF.KennedyInternationalAirport', 'Midtown', 'FinancialDistrict')),
 ('Brooklyn', ('Midtown', 'EastVillage', 'Williamsburg')),
 ('Queens', ('Midtown', 'UpperEastSide', 'JohnF.KennedyInternationalAirport'))]