# Preamble

In [None]:
!pip install pyspark
from pyspark.sql import SparkSession



In [None]:
spark = SparkSession.builder.master("local[*]").appName("IDC6145-project").getOrCreate()

# Load the data

In [None]:
data_directory = "/content/drive/MyDrive/datasets/nyc-taxi/"
df = spark.read.format("parquet").option("recursiveFileLookup", "true").load(data_directory)

# Preview data

In [None]:
df.show(10, truncate=False)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|request_datetime   |on_scene_datetime  |pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls|bcf |sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

# Check schema

In [None]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_

# Exploratory analysis

## Trip duration in minutes

In [None]:
from pyspark.sql.functions import col

df = df.withColumn("trip_duration_min", col("trip_time") / 60)
df.select("trip_duration_min", "trip_miles").show(5)

+------------------+
| trip_duration_min|
+------------------+
|37.516666666666666|
|               7.2|
|12.183333333333334|
|              15.5|
|12.683333333333334|
+------------------+
only showing top 5 rows



## Average Trip Speed



In [None]:
df = df.withColumn("avg_speed_mph", (col("trip_miles") / (col("trip_time") / 3600)))
df.select("trip_miles", "trip_time", "avg_speed_mph").show(5)

+----------+---------+------------------+
|trip_miles|trip_time|     avg_speed_mph|
+----------+---------+------------------+
|      2.83|     2251| 4.525988449577965|
|      1.57|      432|13.083333333333334|
|      1.98|      731| 9.751025991792066|
|      1.99|      930| 7.703225806451612|
|      2.65|      761|12.536136662286465|
+----------+---------+------------------+
only showing top 5 rows



# Data cleaning

## Remove zero or negative trip durations

In [None]:
df_clean = df.filter(
    (col("trip_duration_min") > 0) &
    (col("trip_miles") > 0) &
    (col("avg_speed_mph") > 0)
)

## Remove duration, distance, and speed outliers
- Remove trips longer than 2 hours and shorter than 1 minute.
- Remove trips longer than 100 miles and less than 0.1 miles.
- Remove average speed of greater than 80 mph and less than 1 mph.

In [None]:
df_clean = df_clean.filter(
    (col("trip_duration_min") <= 120) &
    (col("trip_duration_min") >= 1) &
    (col("trip_miles") <= 100) &
    (col("trip_miles") >= 0.1) &
    (col("avg_speed_mph") <= 80) &
    (col("avg_speed_mph") >= 1)
)

## Re-run descriptive statistics

In [None]:
df_clean.describe(["trip_duration_min", "trip_miles", "avg_speed_mph"]).show()

+-------+------------------+-----------------+------------------+
|summary| trip_duration_min|       trip_miles|     avg_speed_mph|
+-------+------------------+-----------------+------------------+
|  count|         239246922|        239246922|         239246922|
|   mean|20.068129666540365|5.055787728558447|13.562186810622977|
| stddev|14.181214475132856|5.702410852488753| 7.179451612441374|
|    min|               1.0|              0.1|               1.0|
|    max|             120.0|            100.0| 79.80560747663552|
+-------+------------------+-----------------+------------------+



## Check IQR

In [None]:
columns_to_check = ["trip_duration_min", "trip_miles", "avg_speed_mph"]
iqr_dict = {}

for col_name in columns_to_check:
    q1, q3 = df_clean.approxQuantile(col_name, [0.25, 0.75], 0.01)
    iqr = q3 - q1
    iqr_dict[col_name] = (q1, q3, iqr)
    print(f"IQR for {col_name}: {iqr_dict[col_name][2]}")
    print(f"  Q1 for {col_name}: {iqr_dict[col_name][0]}")
    print(f"  Q3 for {col_name}: {iqr_dict[col_name][1]}")

IQR for trip_duration_min: 15.799999999999999
  Q1 for trip_duration_min: 10.016666666666667
  Q3 for trip_duration_min: 25.816666666666666
IQR for trip_miles: 4.746
  Q1 for trip_miles: 1.56
  Q3 for trip_miles: 6.306
IQR for avg_speed_mph: 7.663284078867818
  Q1 for avg_speed_mph: 8.683417085427134
  Q3 for avg_speed_mph: 16.346701164294952


## Filter data based on IQR

In [None]:
iqr_duration = iqr_dict["trip_duration_min"][2]
iqr_miles = iqr_dict["trip_miles"][2]
iqr_speed = iqr_dict["avg_speed_mph"][2]

iqr_factor = 1.5

iqr_duration_15 = iqr_factor * iqr_duration
iqr_miles_15 = iqr_factor * iqr_miles
iqr_speed_15 = iqr_factor * iqr_speed

duration_upper = iqr_dict["trip_duration_min"][1] + iqr_duration_15
miles_upper = iqr_dict["trip_miles"][1] + iqr_miles_15
speed_upper = iqr_dict["avg_speed_mph"][1] + iqr_speed_15

df_final = df_clean.filter(
    (col("trip_duration_min") <= duration_upper) &
    (col("trip_miles") <= miles_upper) &
    (col("avg_speed_mph") <= speed_upper)
)

## Recheck the descriptive statistics

In [None]:
df_final.describe(["trip_duration_min", "trip_miles", "avg_speed_mph"]).show()

+-------+------------------+------------------+------------------+
|summary| trip_duration_min|        trip_miles|     avg_speed_mph|
+-------+------------------+------------------+------------------+
|  count|         211743856|         211743856|         211743856|
|   mean|  17.0333728900894|3.5507916982677523|12.010186126840958|
| stddev| 9.759759994076635|2.8023403897664427| 5.064293342779409|
|    min|               1.0|               0.1|               1.0|
|    max|49.516666666666666|            13.425|27.841621621621623|
+-------+------------------+------------------+------------------+



# Filter to Uber trips only

In [None]:
from pyspark.sql.functions import when

df_uber = df.filter(df["hvfhs_license_num"] == "HV0003")
df_uber.count()

179125798

# Convert pickup datetime to date

In [None]:
from pyspark.sql.functions import to_date

df_uber = df_uber.withColumn("pickup_date", to_date(col("pickup_datetime")))

# Group by date and compute average duration

In [None]:
from pyspark.sql.functions import avg, round

df_daily_avg = df_uber.groupBy("pickup_date") \
                   .agg(avg("trip_duration_min").alias("avg_duration_min")) \
                   .orderBy("pickup_date")

df_daily_avg = df_daily_avg.withColumn("avg_duration_min", round("avg_duration_min"))

df_daily_avg.show(10, truncate=False)

+-----------+----------------+
|pickup_date|avg_duration_min|
+-----------+----------------+
|2024-01-01 |18.0            |
|2024-01-02 |18.0            |
|2024-01-03 |18.0            |
|2024-01-04 |18.0            |
|2024-01-05 |18.0            |
|2024-01-06 |17.0            |
|2024-01-07 |16.0            |
|2024-01-08 |19.0            |
|2024-01-09 |18.0            |
|2024-01-10 |19.0            |
+-----------+----------------+
only showing top 10 rows



# Save the final, cleaned time series

In [None]:
final_path = "/content/drive/MyDrive/datasets/fhvhv_tripdata_2024_uber_timeseries.csv"

df_daily_avg.select("pickup_date", "avg_duration_min") \
         .coalesce(1) \
         .write.option("header", True) \
         .csv(final_path)