In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext

In [2]:
# Read the October 2019 FHV into a Spark Dataframe
csv_path = "/sparkdata/taxi_trip/fhv/2019/10/fhv_tripdata_2019_10.csv.gz"
df = spark.read.csv(csv_path, header=True, inferSchema=True)

In [3]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [6]:
# Repartition the Dataframe to 6 partitions and save it to parquet.
df.repartition(6).write.parquet('/sparkdata/target/fhv/2019/10')

In [5]:
## How many taxi trips were there on the 15th of October?
from pyspark.sql import functions as F
df = (
    df
    .withColumn('pickup_date', F.to_date(df.pickup_datetime))
    .withColumn('dropoff_date', F.to_date(df.dropOff_datetime))
)
print(df.filter(df.pickup_date == '2019-10-15').count())

62610


In [9]:
## What is the length of the longest trip in the dataset in hours?
(
    df
    .withColumn('trip_time', F.round((F.col("dropOff_datetime").cast("long") - F.col("pickup_datetime").cast("long"))/3600,2))
    .select(F.max(F.col('trip_time')))
    .show()
)

+--------------+
|max(trip_time)|
+--------------+
|      631152.5|
+--------------+



In [21]:
## Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?
location = spark.read.csv("/sparkdata/taxi_trip/taxi_zone_lookup.csv", header=True, inferSchema=True)

(
    df.alias('fhv')
    .join(location.alias('l'), F.col('l.LocationID') == F.col('fhv.PUlocationID'))
    .groupBy(F.col('Zone'))
    .count()
    .orderBy('count', ascending=True)
    .show()
)

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows

