# Preamble

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
!pip install pyspark
from pyspark.sql import SparkSession



In [4]:
spark = SparkSession.builder.master("local[*]").appName("capstone-project").getOrCreate()

# Load the data

In [5]:
data_directory = "/content/drive/MyDrive/datasets/nyc-taxi/"
df = spark.read.format("parquet").option("recursiveFileLookup", "true").load(data_directory)
print(f"Row count: {df.count()}")

Row count: 239470448


# Preview data

In [6]:
df.show(10, truncate=False)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|request_datetime   |on_scene_datetime  |pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls|bcf |sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

# Check schema

In [7]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_

# Filter to Uber trips only

In [8]:
df = df.filter(df["hvfhs_license_num"] == "HV0003")
print(f"Uber trip count: {df.count()}")

Uber trip count: 179125798


# Exploratory analysis

## Trip duration in minutes

In [9]:
from pyspark.sql.functions import col

df = df.withColumn("trip_duration_min", col("trip_time") / 60)
df.select("trip_duration_min", "trip_miles").show(5)

+------------------+----------+
| trip_duration_min|trip_miles|
+------------------+----------+
|37.516666666666666|      2.83|
|               7.2|      1.57|
|12.183333333333334|      1.98|
|              15.5|      1.99|
|12.683333333333334|      2.65|
+------------------+----------+
only showing top 5 rows


## Average Trip Speed



In [10]:
df = df.withColumn("avg_speed_mph", (col("trip_miles") / (col("trip_time") / 3600)))
df.select("trip_miles", "trip_time", "avg_speed_mph").show(5)

+----------+---------+------------------+
|trip_miles|trip_time|     avg_speed_mph|
+----------+---------+------------------+
|      2.83|     2251| 4.525988449577965|
|      1.57|      432|13.083333333333334|
|      1.98|      731| 9.751025991792066|
|      1.99|      930| 7.703225806451612|
|      2.65|      761|12.536136662286465|
+----------+---------+------------------+
only showing top 5 rows


# Data cleaning

## Remove zero or negative trip durations

In [11]:
df_clean = df.filter(
    (col("trip_duration_min") > 0) &
    (col("trip_miles") > 0) &
    (col("avg_speed_mph") > 0)
)

## Remove duration, distance, and speed outliers
- Remove trips longer than 2 hours and shorter than 1 minute.
- Remove trips longer than 100 miles and less than 0.1 miles.
- Remove average speed of greater than 80 mph and less than 1 mph.

In [12]:
df_clean = df_clean.filter(
    (col("trip_duration_min") <= 120) &
    (col("trip_duration_min") >= 1) &
    (col("trip_miles") <= 100) &
    (col("trip_miles") >= 0.1) &
    (col("avg_speed_mph") <= 80) &
    (col("avg_speed_mph") >= 1)
)

## Re-run descriptive statistics

In [13]:
df_clean.describe(["trip_duration_min", "trip_miles", "avg_speed_mph"]).show()

+-------+------------------+-----------------+------------------+
|summary| trip_duration_min|       trip_miles|     avg_speed_mph|
+-------+------------------+-----------------+------------------+
|  count|         178954150|        178954150|         178954150|
|   mean|20.090790506302188|5.086722704167528|13.638021637340781|
| stddev|14.206753896513533|5.715895275234833| 7.190297379177025|
|    min|               1.0|              0.1|               1.0|
|    max|             120.0|            100.0|              76.8|
+-------+------------------+-----------------+------------------+



## Check IQR

In [14]:
columns_to_check = ["trip_duration_min", "trip_miles", "avg_speed_mph"]
iqr_dict = {}

for col_name in columns_to_check:
    q1, q3 = df_clean.approxQuantile(col_name, [0.25, 0.75], 0.01)
    iqr = q3 - q1
    iqr_dict[col_name] = (q1, q3, iqr)
    print(f"IQR for {col_name}: {iqr_dict[col_name][2]}")
    print(f"  Q1 for {col_name}: {iqr_dict[col_name][0]}")
    print(f"  Q3 for {col_name}: {iqr_dict[col_name][1]}")

IQR for trip_duration_min: 15.866666666666669
  Q1 for trip_duration_min: 9.983333333333333
  Q3 for trip_duration_min: 25.85
IQR for trip_miles: 4.83
  Q1 for trip_miles: 1.57
  Q3 for trip_miles: 6.4
IQR for avg_speed_mph: 7.726130022002787
  Q1 for avg_speed_mph: 8.740237691001697
  Q3 for avg_speed_mph: 16.466367713004484


## Filter data based on IQR

In [15]:
iqr_duration = iqr_dict["trip_duration_min"][2]
iqr_miles = iqr_dict["trip_miles"][2]
iqr_speed = iqr_dict["avg_speed_mph"][2]

iqr_factor = 1.5

iqr_duration_15 = iqr_factor * iqr_duration
iqr_miles_15 = iqr_factor * iqr_miles
iqr_speed_15 = iqr_factor * iqr_speed

duration_upper = iqr_dict["trip_duration_min"][1] + iqr_duration_15
miles_upper = iqr_dict["trip_miles"][1] + iqr_miles_15
speed_upper = iqr_dict["avg_speed_mph"][1] + iqr_speed_15

df_final = df_clean.filter(
    (col("trip_duration_min") <= duration_upper) &
    (col("trip_miles") <= miles_upper) &
    (col("avg_speed_mph") <= speed_upper)
)

## Recheck the descriptive statistics

In [16]:
df_final.describe(["trip_duration_min", "trip_miles", "avg_speed_mph"]).show()

+-------+-----------------+------------------+------------------+
|summary|trip_duration_min|        trip_miles|     avg_speed_mph|
+-------+-----------------+------------------+------------------+
|  count|        158568296|         158568296|         158568296|
|   mean|17.07090532103181|3.5879410359558808|12.101417515492919|
| stddev|9.832613878586045|2.8431905192515905| 5.101892774349794|
|    min|              1.0|               0.1|               1.0|
|    max|            49.65|             13.64|28.055555555555557|
+-------+-----------------+------------------+------------------+



# Aggregate to daily average (cleaned Uber data)

Uber filter was applied at the start of the pipeline. Aggregation below uses the cleaned Uber dataset (df_final).

In [17]:
# df is already Uber-only (filter applied at start). Add pickup_date to cleaned data for aggregation.
from pyspark.sql.functions import to_date

df_final = df_final.withColumn("pickup_date", to_date(col("pickup_datetime")))

# Group by date and compute average duration

In [18]:
from pyspark.sql.functions import avg, round

df_daily_avg = df_final.groupBy("pickup_date") \
                   .agg(avg("trip_duration_min").alias("avg_duration_min")) \
                   .orderBy("pickup_date")

df_daily_avg = df_daily_avg.withColumn("avg_duration_min", round("avg_duration_min"))

df_daily_avg.show(10, truncate=False)

+-----------+----------------+
|pickup_date|avg_duration_min|
+-----------+----------------+
|2024-01-01 |15.0            |
|2024-01-02 |16.0            |
|2024-01-03 |16.0            |
|2024-01-04 |16.0            |
|2024-01-05 |16.0            |
|2024-01-06 |15.0            |
|2024-01-07 |14.0            |
|2024-01-08 |16.0            |
|2024-01-09 |16.0            |
|2024-01-10 |16.0            |
+-----------+----------------+
only showing top 10 rows


# Group by date and compute average duration

In [19]:
# Aggregation done in cell above (uses df_final).

# Save the final, cleaned time series

In [21]:
final_path = "/content/drive/MyDrive/datasets/fhvhv_tripdata_2024_uber_timeseries.csv"

df_daily_avg.select("pickup_date", "avg_duration_min") \
         .coalesce(1) \
         .write.option("header", True) \
         .mode("overwrite") \
         .csv(final_path)