# Preamble

In [8]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
!pip install pyspark
from pyspark.sql import SparkSession



In [10]:
spark = SparkSession.builder.master("local[*]").appName("IDC6145-project").getOrCreate()

# Load the data

In [4]:
data_directory = "/content/drive/MyDrive/datasets/nyc-taxi/"
df = spark.read.format("parquet").option("recursiveFileLookup", "true").load(data_directory)

# Remove unecessary columns

In [11]:
keep_columns = ["hvfhs_license_num", "pickup_datetime", "trip_miles", "trip_time", "congestion_surcharge"]
df_min = df.select(*keep_columns)
df_min.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



# Add derived columns

In [15]:
from pyspark.sql.functions import col, hour, date_format

# Trip duration in minutes
df_min = df_min.withColumn("trip_duration_min", col("trip_time") / 60)

# Average trip speed
df_min = df_min.withColumn("avg_speed_mph", (col("trip_miles") / (col("trip_time") / 3600)))

# Trip hour of day
df_min = df_min.withColumn("pickup_hour", hour("pickup_datetime"))

# Trip day of week
df_min = df_min.withColumn("pickup_day", date_format("pickup_datetime", "E"))

# Data cleaning

## Remove zero or negative trip durations

In [16]:
df_clean = df_min.filter(
    (col("trip_duration_min") > 0) &
    (col("trip_miles") > 0) &
    (col("avg_speed_mph") > 0)
)

## Remove duration, distance, and speed outliers
- Remove trips longer than 2 hours and shorter than 1 minute.
- Remove trips longer than 100 miles and less than 0.1 miles.
- Remove average speed of greater than 80 mph and less than 1 mph.

In [17]:
df_clean = df_clean.filter(
    (col("trip_duration_min") <= 120) &
    (col("trip_duration_min") >= 1) &
    (col("trip_miles") <= 100) &
    (col("trip_miles") >= 0.1) &
    (col("avg_speed_mph") <= 80) &
    (col("avg_speed_mph") >= 1)
)

## Check IQR

In [18]:
columns_to_check = ["trip_duration_min", "trip_miles", "avg_speed_mph"]
iqr_dict = {}

for col_name in columns_to_check:
    q1, q3 = df_clean.approxQuantile(col_name, [0.25, 0.75], 0.01)
    iqr = q3 - q1
    iqr_dict[col_name] = (q1, q3, iqr)
    print(f"IQR for {col_name}: {iqr_dict[col_name][2]}")
    print(f"  Q1 for {col_name}: {iqr_dict[col_name][0]}")
    print(f"  Q3 for {col_name}: {iqr_dict[col_name][1]}")

IQR for trip_duration_min: 15.799999999999999
  Q1 for trip_duration_min: 10.016666666666667
  Q3 for trip_duration_min: 25.816666666666666
IQR for trip_miles: 4.746
  Q1 for trip_miles: 1.56
  Q3 for trip_miles: 6.306
IQR for avg_speed_mph: 7.663284078867818
  Q1 for avg_speed_mph: 8.683417085427134
  Q3 for avg_speed_mph: 16.346701164294952


## Filter data based on IQR

In [19]:
iqr_duration = iqr_dict["trip_duration_min"][2]
iqr_miles = iqr_dict["trip_miles"][2]
iqr_speed = iqr_dict["avg_speed_mph"][2]

iqr_factor = 1.5

iqr_duration_15 = iqr_factor * iqr_duration
iqr_miles_15 = iqr_factor * iqr_miles
iqr_speed_15 = iqr_factor * iqr_speed

duration_upper = iqr_dict["trip_duration_min"][1] + iqr_duration_15
miles_upper = iqr_dict["trip_miles"][1] + iqr_miles_15
speed_upper = iqr_dict["avg_speed_mph"][1] + iqr_speed_15

df_final = df_clean.filter(
    (col("trip_duration_min") <= duration_upper) &
    (col("trip_miles") <= miles_upper) &
    (col("avg_speed_mph") <= speed_upper)
)

# Map the license numbers to trip providers

In [20]:
from pyspark.sql.functions import when

df_final = df_final.withColumn(
    "trip_provider",
    when(col("hvfhs_license_num") == "HV0002", "J")
    .when(col("hvfhs_license_num") == "HV0003", "U")
    .when(col("hvfhs_license_num") == "HV0004", "V")
    .when(col("hvfhs_license_num") == "HV0005", "L")
    .otherwise("Unknown")
)

df_final = df_final.drop("hvfhs_license_num")

df_final.groupBy("trip_provider").count().show()

+-------------+---------+
|trip_provider|    count|
+-------------+---------+
|            L| 53650389|
|            U|158093467|
+-------------+---------+



# Preview data

In [None]:
df.show(10, truncate=False)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|request_datetime   |on_scene_datetime  |pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls|bcf |sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

# Check schema

In [None]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_

# Save the final, cleaned dataset

In [22]:
final_path = "/content/drive/MyDrive/datasets/fhvhv_tripdata_2024_final.parquet"
df_final.write.parquet(final_path, mode="overwrite")

# Exploratory analysis

## Get number of trips

In [6]:
df_min.count()

239470448

## Check range of trip dates

In [None]:
from pyspark.sql.functions import min, max

df_min.select(
    min("pickup_datetime").alias("earliest_pickup"),
    max("pickup_datetime").alias("latest_pickup")
).show()


+-------------------+-------------------+
|    earliest_pickup|      latest_pickup|
+-------------------+-------------------+
|2024-01-01 00:00:00|2024-12-31 23:59:59|
+-------------------+-------------------+



## Trip duration in minutes

In [7]:
from pyspark.sql.functions import col

df_min = df_min.withColumn("trip_duration_min", col("trip_time") / 60)
df_min.select("trip_duration_min", "trip_miles").show(5)

+------------------+----------+
| trip_duration_min|trip_miles|
+------------------+----------+
|37.516666666666666|      2.83|
|               7.2|      1.57|
|12.183333333333334|      1.98|
|              15.5|      1.99|
|12.683333333333334|      2.65|
+------------------+----------+
only showing top 5 rows



## Average trip speed

In [None]:
df_min = df_min.withColumn("avg_speed_mph", (col("trip_miles") / (col("trip_time") / 3600)))
df_min.select("trip_miles", "trip_time", "avg_speed_mph").show(5)

+----------+---------+------------------+
|trip_miles|trip_time|     avg_speed_mph|
+----------+---------+------------------+
|      2.83|     2251| 4.525988449577965|
|      1.57|      432|13.083333333333334|
|      1.98|      731| 9.751025991792066|
|      1.99|      930| 7.703225806451612|
|      2.65|      761|12.536136662286465|
+----------+---------+------------------+
only showing top 5 rows



## Trips by hour of day

In [None]:
from pyspark.sql.functions import hour

df_min = df_min.withColumn("pickup_hour", hour("pickup_datetime"))
df_min.groupBy("pickup_hour").count().orderBy("pickup_hour").show(24)

+-----------+--------+
|pickup_hour|   count|
+-----------+--------+
|          0| 8850616|
|          1| 6195085|
|          2| 4432124|
|          3| 3565774|
|          4| 3698972|
|          5| 4309730|
|          6| 6674818|
|          7| 9990060|
|          8|12061861|
|          9|11135713|
|         10|10235532|
|         11|10054757|
|         12|10302206|
|         13|10692929|
|         14|11549695|
|         15|11848345|
|         16|12220722|
|         17|13521043|
|         18|14132501|
|         19|13711602|
|         20|13045453|
|         21|12868552|
|         22|12814289|
|         23|11558069|
+-----------+--------+



## Average speed by trip hour

In [None]:
df_min.groupBy("pickup_hour").avg("avg_speed_mph").orderBy("pickup_hour").show(24)

+-----------+------------------+
|pickup_hour|avg(avg_speed_mph)|
+-----------+------------------+
|          0|15.892756336088146|
|          1|16.325043808156742|
|          2|16.953963286026177|
|          3|  18.5969798355312|
|          4|20.779781498455005|
|          5| 20.39932756350472|
|          6| 17.42317344713429|
|          7|  14.1064628664629|
|          8|12.841786014374426|
|          9|13.208563747534262|
|         10|13.218810740521793|
|         11|12.896039900914538|
|         12|12.600851299647239|
|         13|12.316307747106318|
|         14|11.621182588672829|
|         15|11.168523903947138|
|         16|10.960524369260611|
|         17|10.917344630332131|
|         18|11.594776096688337|
|         19|12.616647591647903|
|         20|13.659680066907146|
|         21|14.374254345312206|
|         22|14.770447841645177|
|         23|15.299248801529991|
+-----------+------------------+



## Trips by day of week

In [None]:
from pyspark.sql.functions import date_format

df_min = df_min.withColumn("pickup_day", date_format("pickup_datetime", "E"))
df_min.groupBy("pickup_day").count().orderBy("pickup_day").show()

+----------+--------+
|pickup_day|   count|
+----------+--------+
|       Fri|37385964|
|       Mon|29842126|
|       Sat|40012026|
|       Sun|34318757|
|       Thu|34199906|
|       Tue|31257546|
|       Wed|32454123|
+----------+--------+



## Basic descriptive statistics

In [None]:
df_min.describe(["trip_duration_min", "trip_miles", "avg_speed_mph", "congestion_surcharge"]).show()

+-------+------------------+-----------------+------------------+--------------------+
|summary| trip_duration_min|       trip_miles|     avg_speed_mph|congestion_surcharge|
+-------+------------------+-----------------+------------------+--------------------+
|  count|         239470448|        239470448|         239470418|           239470448|
|   mean| 20.12791464008397|5.081384419049408|13.563081717472029|  1.0391516430453247|
| stddev|14.484228281515866|5.896975440728102|  7.19349392425426|  1.3273029585704301|
|    min|               0.0|              0.0|               0.0|                 0.0|
|    max| 918.9666666666667|           555.25|189.24590163934425|                8.25|
+-------+------------------+-----------------+------------------+--------------------+



## Re-run descriptive statistics

In [None]:
df_clean.describe(["trip_duration_min", "trip_miles", "avg_speed_mph"]).show()

+-------+------------------+-----------------+------------------+
|summary| trip_duration_min|       trip_miles|     avg_speed_mph|
+-------+------------------+-----------------+------------------+
|  count|         239246922|        239246922|         239246922|
|   mean|20.068129666540365|5.055787728558447|13.562186810622977|
| stddev|14.181214475132856|5.702410852488753| 7.179451612441374|
|    min|               1.0|              0.1|               1.0|
|    max|             120.0|            100.0| 79.80560747663552|
+-------+------------------+-----------------+------------------+



## Recheck the descriptive statistics

In [None]:
df_final.describe(["trip_duration_min", "trip_miles", "avg_speed_mph"]).show()

+-------+------------------+------------------+------------------+
|summary| trip_duration_min|        trip_miles|     avg_speed_mph|
+-------+------------------+------------------+------------------+
|  count|         211743856|         211743856|         211743856|
|   mean|  17.0333728900894|3.5507916982677523|12.010186126840958|
| stddev| 9.759759994076635|2.8023403897664427| 5.064293342779409|
|    min|               1.0|               0.1|               1.0|
|    max|49.516666666666666|            13.425|27.841621621621623|
+-------+------------------+------------------+------------------+



In [None]:
df_final.printSchema()

root
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- trip_duration_min: double (nullable = true)
 |-- avg_speed_mph: double (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- trip_provider: string (nullable = false)



In [None]:
df_final.show(10, truncate=False)

+-------------------+----------+---------+--------------------+------------------+------------------+-----------+----------+-------------+
|pickup_datetime    |trip_miles|trip_time|congestion_surcharge|trip_duration_min |avg_speed_mph     |pickup_hour|pickup_day|trip_provider|
+-------------------+----------+---------+--------------------+------------------+------------------+-----------+----------+-------------+
|2024-01-01 00:28:08|2.83      |2251     |2.75                |37.516666666666666|4.525988449577965 |0          |Mon       |U            |
|2024-01-01 00:12:53|1.57      |432      |2.75                |7.2               |13.083333333333334|0          |Mon       |U            |
|2024-01-01 00:23:05|1.98      |731      |2.75                |12.183333333333334|9.751025991792066 |0          |Mon       |U            |
|2024-01-01 00:41:04|1.99      |930      |2.75                |15.5              |7.703225806451612 |0          |Mon       |U            |
|2024-01-01 00:57:21|2.65  

In [None]:
df_final.describe(["congestion_surcharge"]).show()

+-------+--------------------+
|summary|congestion_surcharge|
+-------+--------------------+
|  count|           211743856|
|   mean|  1.0354683514406198|
| stddev|  1.3267091227893713|
|    min|                 0.0|
|    max|                8.25|
+-------+--------------------+



# File Sizes

- Original dataset - 5.51GB
- Final dataset - 3.49GB

In [21]:
df_final.count()

211743856