In [87]:
# import pyspark to process large files
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id

In [88]:
# create a new spark session
spark = SparkSession.builder.master('local[*]').appName('2019_insights').getOrCreate()
print("Spark version: ", spark.version)

Spark version:  2.4.0


In [89]:
# load taxi zone lookup
taxi_zones = spark.read.csv('csv/taxi+_zone_lookup.csv', header=True)

In [90]:
# show five first taxi zones
# columns of interest: LocationID, Zone
taxi_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [91]:
# load fhv and fhvhv data
fhv_tripdata = spark.read.csv('csv/2019/processed_fhv_data.csv', header=True)
fhvhv_tripdata = spark.read.csv('csv/2019/processed_fhvhv_data.csv', header=True)

In [92]:
# leave only PU and DO cols
pu_do_fhv = fhv_tripdata.drop('pickup_datetime', 'dropoff_datetime', 'SR_Flag')
pu_do_fhvhv = fhvhv_tripdata.drop('pickup_datetime', 'dropoff_datetime', 'SR_Flag')

In [93]:
# show first 5 rows of pu_do_fhv
pu_do_fhv.show(5)

+------------+------------+
|PULocationID|DOLocationID|
+------------+------------+
|         140|          52|
|         141|         237|
|         237|         236|
|         162|          85|
|         237|         246|
+------------+------------+
only showing top 5 rows



In [94]:
# change possible NaN values to '0'
pu_do_fhv = pu_do_fhv.fillna('0')

In [95]:
# cast PU col to int
pu_do_fhv = pu_do_fhv.withColumn("PULocationID", pu_do_fhv.PULocationID.cast('int'))

In [96]:
# cast DO col to int
pu_do_fhv = pu_do_fhv.withColumn("DOLocationID", pu_do_fhv.DOLocationID.cast('int'))

In [97]:
# show first five rows of pu_do_fhvhv
pu_do_fhvhv.show(5)

+------------+------------+
|PULocationID|DOLocationID|
+------------+------------+
|         245|         251|
|         216|         197|
|         261|         234|
|          87|          87|
|          87|         198|
+------------+------------+
only showing top 5 rows



In [98]:
# change possible NaN values to '0'
pu_do_fhvhv = pu_do_fhvhv.fillna('0')

In [99]:
# cast PU col to int
pu_do_fhvhv = pu_do_fhvhv.withColumn("PULocationID", pu_do_fhvhv.PULocationID.cast('int'))

In [100]:
# cast DO col to int
pu_do_fhvhv = pu_do_fhvhv.withColumn("DOLocationID", pu_do_fhvhv.DOLocationID.cast('int'))

In [101]:
# filter off rows where either PU or DO are 0
pu_do_fhv.createOrReplaceTempView("FHV_VIEW")
pu_do_fhv_tripdata = spark.sql("SELECT * FROM FHV_VIEW WHERE PULocationID > 0 AND DOLocationID > 0")

pu_do_fhvhv.createOrReplaceTempView("FHVHV_VIEW")
pu_do_fhvhv_tripdata = spark.sql("SELECT * FROM FHVHV_VIEW WHERE PULocationID > 0 AND DOLocationID > 0")

In [102]:
# create taxi zone OD matrix
# the +1 is to compensate for 0-index
od_matrix_size = taxi_zones.count()+1
od_matrix = [[0 for x in range(od_matrix_size)] for y in range(od_matrix_size)]

In [103]:
# iterate over fhv tripdata and fill OD matrix
fhv_collect = pu_do_fhv_tripdata.rdd.toLocalIterator()

for fhv_row in fhv_collect:
    od_matrix[fhv_row.PULocationID][fhv_row.DOLocationID] += 1

In [104]:
# iterate over fhvhv tripdata and fill OD matrix
fhvhv_collect = pu_do_fhvhv_tripdata.rdd.toLocalIterator()

for fhvhv_row in fhvhv_collect:
    od_matrix[fhvhv_row.PULocationID][fhvhv_row.DOLocationID] += 1

In [105]:
# get taxi zones as int
zone_ids = [int(row.LocationID) for row in taxi_zones.select("LocationID").collect()]
zone_ids.insert(0,0)
zone_ids = list(map(lambda x: str(x), zone_ids))

In [106]:
# create OD dataframe from od_matrix
od_df = spark.createDataFrame(data=od_matrix,schema=zone_ids)

In [107]:
# add index O/D column to dataframe
od_df_with_index = od_df.withColumn("O/D", monotonically_increasing_id())

In [108]:
# write OD dataframe to file
od_df_with_index.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("csv/2019/fhv_fhvhv_od.csv")

In [131]:
# create and populate list containing max values from each row in od_df_with_index
od_df_iter = od_df.rdd.toLocalIterator()
origin = 0
od_greatest_values = []

for row in od_df_iter:
    max_val = max(list(row))
    destination = row.index(max_val)
    od_greatest_values.append((origin, destination, max_val))
    origin += 1

# create dataframe from od_greatest_values list
od_gr_df_cols = ["PULocationID", "DOLocationID", "TripQty"]
od_gr_df = spark.createDataFrame(data=od_greatest_values, schema=od_gr_df_cols)

In [132]:
od_gr_df.show(5)

+------------+------------+-------+
|PULocationID|DOLocationID|TripQty|
+------------+------------+-------+
|           0|           0|      0|
|           1|         265|   6794|
|           2|         124|     42|
|           3|           3|  41602|
|           4|          79|  48830|
+------------+------------+-------+
only showing top 5 rows

