### This notebook does the following:
- processes raw ridership data
- performs feature selection and cleaning
- performs hourly aggregation
- saves results in one file

In [38]:
import pandas as pd
import glob
import os
from datetime import date, timedelta
import itertools

In [39]:
# raw data directory
dataDir = '/Users/hemingyi/Documents/UrbanTemporalNetworks/rawData/'

In [40]:
files = glob.glob(dataDir+'*csv')
len(files)

24

### Run following script for each hub

In [41]:
# hub = 'Penn'
hub = 'JFK'

In [42]:
# lat1,lon1,lat2,lon2 = 40.751497, -73.993968, 40.749707, -73.991550


In [43]:
processedFileDir = "/Users/hemingyi/Documents/UrbanTemporalNetworks/processedData/"
processedFile = processedFileDir+hub+"VehiceByHour2015.csv"

In [44]:
import geopandas as gpd
zones = gpd.read_file('../Data/NYC Taxi Zones/geo_export_420696f7-2c26-4eab-be38-199c5fb5c185.shp')
zones.loc[zones['objectid']==132.0]

Unnamed: 0,borough,location_i,objectid,shape_area,shape_leng,zone,geometry
136,Queens,132.0,132.0,0.002038,0.245479,JFK Airport,(POLYGON ((-73.8250346749999 40.66358013399984...


In [55]:
for file in files:
    print("Processing "+str(file).split('/')[-1])
    
    vehicleType = str(file).split('/')[-1].split('_')[0]
    df = pd.read_csv(file)
    print("DataFrame Shape: "+str(df.shape))
#     print(df.columns)
    # rename columns for consistency
    # set passenger count to 1 for fhv
        
    if vehicleType == 'green':
        # fixing column problem
        names = df.columns.tolist()
        df = pd.read_csv(file,names=names)
        df =df.iloc[1:]
        df.rename(columns={'lpep_pickup_datetime': 'tpep_pickup_datetime'},inplace=True)
        
    if vehicleType == 'yellow':
        df.rename(columns={'pickup_longitude': 'Pickup_longitude','pickup_latitude':'Pickup_latitude',
                          'dropoff_longitude':'Dropoff_longitude','dropoff_latitude':'Dropoff_latitude',
                          'passenger_count':'Passenger_count'},inplace=True) 
        


    # treat for na values
    df = df.dropna(subset=['tpep_pickup_datetime','Pickup_longitude', 'Pickup_latitude',
                           'Dropoff_longitude', 'Dropoff_latitude'])
    df.fillna(value={'passenger_count':1}, inplace = True)
    
    # correct data types
    df['Pickup_longitude'] = df['Pickup_longitude'].astype('float')
    df['Pickup_latitude'] = df['Pickup_latitude'].astype('float')
    df['Dropoff_longitude'] = df['Dropoff_longitude'].astype('float')
    df['Dropoff_latitude'] = df['Dropoff_latitude'].astype('float')
    
    # filter to get outgoing traffic from selected hub
#     df = df[((df['Pickup_latitude'].between(lat1-0.0009,lat1+0.0009))\
#                            &(df['Pickup_longitude'].between(lon1-0.0009,lon1+0.0009)))\
#            |((df['Pickup_latitude'].between(lat2-0.0009,lat2+0.0009))\
#                            &(df['Pickup_longitude'].between(lon2-0.0009,lon2+0.0009)))]
    lat1,lon1,lat2,lon2 = 40.669409, -73.814252, 40.615843, -73.747190
    df = df[(df['Pickup_latitude'].between(lat2,lat1))\
                           &(df['Pickup_longitude'].between(lon1,lon2))]
    df['JFK'] = df.apply(lambda x:geom.Point(x['Pickup_longitude'],x['Pickup_latitude']),axis=1)
    df['JFK'] = df['JFK'] = df['JFK'].apply(lambda x: zones.geometry[136].contains(x))
    df = df[df['JFK']==True]
    del df['JFK']
    print(hub+" out DataFrame Shape: "+str(df.shape))
    # treat datetime
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['Date'] = df['tpep_pickup_datetime'].dt.date
    df['Hour'] = df['tpep_pickup_datetime'].dt.hour
    
    #df['vehicle_type'] = vehicleType
    
    # select rquired columns
    #df = df[['vehicle_type', 'Date', 'Hour', 'DOLocationID','passenger_count']]
    df = df[['Date', 'Hour', 'Dropoff_longitude','Dropoff_latitude','Passenger_count']]

    df.rename(columns={'Passenger_count': 'vehicle_count'},inplace=True)


    #aggregatedDf = pd.merge(df_count,df_sum, on=['vehicle_type', 'Date', 'Hour', 'DOLocationID'], how='inner')
    aggregatedDf = df
    
    print("Aggregated DataFrame Shape: "+str(aggregatedDf.shape))
    print(aggregatedDf.head(3))
    # save file
    if os.path.exists(processedFile):
        print('append to results...')
        aggregatedDf.to_csv(processedFile,index=False, header=False, mode='a+')      
    else:
        print('create results file...')
        aggregatedDf.to_csv(processedFile,index=False)
    print('file saved..')
    print("------------------------------------------------")

Processing yellow_tripdata_2015-05.csv
DataFrame Shape: (13158262, 19)
JFK out DataFrame Shape: (282415, 19)
Aggregated DataFrame Shape: (282415, 5)
           Date  Hour  Dropoff_longitude  Dropoff_latitude  vehicle_count
133  2015-05-02    23         -73.983109         40.607601              1
142  2015-05-02    23         -73.964142         40.672272              1
226  2015-05-14     9         -73.984077         40.757462              1
create results file...
file saved..
------------------------------------------------
Processing yellow_tripdata_2015-11.csv
DataFrame Shape: (11312676, 19)
JFK out DataFrame Shape: (249928, 19)
Aggregated DataFrame Shape: (249928, 5)
           Date  Hour  Dropoff_longitude  Dropoff_latitude  vehicle_count
48   2015-11-01     0         -73.784996         40.665409              2
183  2015-11-23     0         -73.894051         40.771526              1
184  2015-11-23     0         -73.980110         40.733639              1
append to results...
file

  interactivity=interactivity, compiler=compiler, result=result)


JFK out DataFrame Shape: (383, 21)
Aggregated DataFrame Shape: (383, 5)
             Date  Hour  Dropoff_longitude  Dropoff_latitude vehicle_count
15647  2015-08-01     5         -73.789673         40.647007             1
15650  2015-08-01     5         -73.793015         40.660606             1
19718  2015-08-01     9         -73.803795         40.659042             2
append to results...
file saved..
------------------------------------------------
Processing green_tripdata_2015-09.csv
DataFrame Shape: (1494926, 21)
JFK out DataFrame Shape: (345, 21)
Aggregated DataFrame Shape: (345, 5)
             Date  Hour  Dropoff_longitude  Dropoff_latitude vehicle_count
7192   2015-09-01     8         -73.867226         40.770748             2
14116  2015-09-01    12         -73.785019         40.648418             2
14645  2015-09-01    12         -73.790123         40.643616             1
append to results...
file saved..
------------------------------------------------
Processing yellow_tri

### Assign pickup to taxi zone

In [56]:
sc

In [57]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import csv
import geopandas as gpd

In [58]:
rdd = sc.textFile(processedFile).cache()
rdd.take(10)

['Date,Hour,Dropoff_longitude,Dropoff_latitude,vehicle_count',
 '2015-05-02,23,-73.98310852050781,40.60760116577149,1',
 '2015-05-02,23,-73.96414184570312,40.672271728515625,1',
 '2015-05-14,9,-73.98407745361328,40.75746154785156,1',
 '2015-05-24,15,-74.00306701660156,40.72015380859375,1',
 '2015-05-24,15,-73.98428344726562,40.75151062011719,1',
 '2015-05-14,10,-73.89244842529298,40.747955322265625,2',
 '2015-05-14,10,-74.00601959228516,40.70886611938477,1',
 '2015-05-19,9,-73.8107681274414,40.69195556640625,1',
 '2015-05-13,16,-73.97402954101562,40.673992156982415,1']

In [61]:
zones = gpd.read_file('../Data/NYC Taxi Zones/geo_export_420696f7-2c26-4eab-be38-199c5fb5c185.shp')
len(zones.location_i.unique())

260

In [62]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

def createIndex(shapefile):# target geojson
    import rtree
    import fiona.crs
    import geopandas as gpd
    zones = gpd.read_file(shapefile)
    index = rtree.Rtree()
    for idx,geometry in enumerate(zones.geometry):
        index.insert(idx, geometry.bounds)
    return (index, zones)

def findZone(p, index, zones):
    if p.is_valid:
        rtreeInter = index.intersection((p.x, p.y, p.x, p.y))
        for idx in rtreeInter:
            if zones.geometry[idx].is_valid:
                if zones.geometry[idx].contains(p):
                    return zones.location_i[idx]
#                     print(zones.location_i[idx])
#                 print('none')
                return None

def TripTaxizone(pid,trip):
    from pyproj import Proj, transform
    import csv
    import shapely.geometry as geom
    import re
    if pid == 0:
        next(trip)
    index, zones = createIndex('../Data/NYC Taxi Zones/geo_export_420696f7-2c26-4eab-be38-199c5fb5c185.shp')
    inProj = Proj(init='epsg:4326', preserve_units=True)
    outProj = Proj(init='epsg:2263')
    reader = csv.reader(trip)
    for row in reader:
        Lon, Lat = float(row[2]), float(row[3])
#         Lon,Lat = transform(inProj,outProj,Lon, Lat)
        Location = geom.Point(Lon, Lat)
        TaxiZone = findZone(Location, index, zones)
        if TaxiZone is not None:
            yield ((row[0],row[1],TaxiZone),row[4])
        else:
            yield (('out'),0)


In [63]:
rdd = sc.textFile(processedFile)
counts = rdd.mapPartitionsWithIndex(TripTaxizone).reduceByKey(lambda x,y:int(x)+int(y)).\
filter(lambda x: x[0]!='out').\
map(lambda x:(x[0][0],x[0][1],int(x[0][2]),x[1])).\
cache()

counts.collect()

[('2015-05-21', '21', 61, 7),
 ('2015-05-24', '17', 256, 8),
 ('2015-05-02', '14', 160, '1'),
 ('2015-05-02', '21', 162, 16),
 ('2015-05-03', '16', 231, 12),
 ('2015-05-11', '17', 74, 3),
 ('2015-05-03', '19', 95, 24),
 ('2015-05-06', '9', 49, 3),
 ('2015-05-16', '19', 191, 4),
 ('2015-05-30', '15', 216, '1'),
 ('2015-05-19', '14', 141, 4),
 ('2015-05-17', '0', 162, 25),
 ('2015-05-11', '14', 10, 8),
 ('2015-05-19', '16', 132, 32),
 ('2015-05-03', '20', 90, 7),
 ('2015-05-11', '16', 181, 3),
 ('2015-05-25', '5', 228, '1'),
 ('2015-05-25', '10', 132, 12),
 ('2015-05-09', '12', 114, 3),
 ('2015-05-04', '9', 162, 8),
 ('2015-05-06', '23', 95, 18),
 ('2015-05-06', '23', 170, 25),
 ('2015-05-07', '0', 80, 8),
 ('2015-05-22', '20', 206, '1'),
 ('2015-05-11', '18', 68, 7),
 ('2015-05-28', '6', 180, '1'),
 ('2015-05-08', '23', 79, 17),
 ('2015-05-31', '17', 1, 8),
 ('2015-05-04', '16', 142, '1'),
 ('2015-05-04', '16', 189, 4),
 ('2015-05-22', '13', 158, 14),
 ('2015-05-22', '13', 145, 13),
 ('

In [64]:
processedFile

'/Users/hemingyi/Documents/UrbanTemporalNetworks/processedData/JFKVehiceByHour2015.csv'

In [65]:
df = sqlContext.createDataFrame(counts,('Date','Hour','DOLocationID','vehicle_count'))
df.toPandas().to_csv(processedFile, index=False)
print('done')

done


### Further processing

In [66]:
def getcCompleteGridDf(minDate,maxDate, locations):
    minDate = [int(x) for x in minDate.split('-')]
    maxDate = [int(x) for x in maxDate.split('-')]
    sdate = date(minDate[0], minDate[1], minDate[2])   
    edate = date(maxDate[0], maxDate[1], maxDate[2])    

    delta = edate - sdate       
    days = []
    for i in range(delta.days + 1):
        days.append(sdate + timedelta(days=i))
    hours = list(range(24))
    print(len(days))
    print(len(hours))
    
    combList = list(itertools.product(*[days,hours,locations]))
    dfList = [{'Date':d, 'Hour':h, 'DOLocationID':l} for d,h,l in combList]
 
    dateHourDf = pd.DataFrame(dfList)
    dateHourDf['Date'] = pd.to_datetime(dateHourDf['Date']).dt.date
    return dateHourDf

In [67]:
processedDf = pd.read_csv(processedFile)
processedDf.head(2)

Unnamed: 0,Date,Hour,DOLocationID,vehicle_count
0,2015-05-21,21,61,7.0
1,2015-05-24,17,256,8.0


In [68]:
processedDf.shape

(622779, 4)

In [69]:
# ensuring proper grouping since files were grouped by independently
processedDf = processedDf.groupby(['Date', 'Hour', 'DOLocationID']).sum().reset_index()
processedDf.shape

(622779, 4)

In [70]:
# sanity checks
validYears = [2015]
processedDf = processedDf[processedDf.Date.apply(lambda x: int(x.split('-')[0]) in validYears)]

validMonths = list(range(1,13))
processedDf = processedDf[processedDf.Date.apply(lambda x: int(x.split('-')[1]) in validMonths)]

processedDf.shape    

(622779, 4)

In [71]:
minDate, maxDate = (processedDf.Date.min(), processedDf.Date.max()) 
#v_types = list(set(processedDf.vehicle_type))
locations = list(zones.location_i.unique())

#print(len(v_types))
print(len(locations))

dateHourDf = getcCompleteGridDf(minDate,maxDate,locations)
dateHourDf.shape

260
365
24


(2277600, 3)

In [72]:
260*365*24

2277600

In [73]:
dateHourDf['Date'] = pd.to_datetime(dateHourDf['Date'])
processedDf['Date'] = pd.to_datetime(processedDf['Date'])

In [74]:
mergedDf = pd.merge(dateHourDf,processedDf, on=['Date', 'Hour', 'DOLocationID'], how='left')
mergedDf.fillna(0, inplace=True)
mergedDf['Date'] = mergedDf['Date'].dt.date
print(mergedDf.shape)
mergedDf.head(3)

(2277600, 4)


Unnamed: 0,DOLocationID,Date,Hour,vehicle_count
0,1.0,2015-01-01,0,0.0
1,2.0,2015-01-01,0,0.0
2,3.0,2015-01-01,0,0.0


In [75]:
# sanity check
print(processedDf.vehicle_count.sum())
print(mergedDf.vehicle_count.sum())

2261378.0
2261378.0


In [76]:
mergedDf['Date'] = mergedDf['Date'].astype('str')
mergedDf['Hour'] = mergedDf['Hour'].astype('str')
mergedDf['DOLocationID'] = mergedDf['DOLocationID'].astype('str')
mergedDf.to_csv('/Users/hemingyi/Documents/UrbanTemporalNetworks/processedData/'+hub+'VehiceByHour2015.csv',index=False)