In [0]:
print("Big Data Analysis of NYC Taxi Trips with PySpark")

Big Data Analysis of NYC Taxi Trips with PySpark


In [0]:
#Upload & Load the Dataset

# Mount the uploaded dataset in Databricks FileStore
file_path = "dbfs:/FileStore/tables/yellow_tripdata_2025_02.csv"

# Load the CSV using PySpark
df = spark.read.option("header", True).option("inferSchema", True).csv(file_path)

# Preview
df.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2025-02-01 00:12:18|  2025-02-01 00:32:33|              3|         3.12|         1|                 N|         246|    

In [0]:
#Feature Engineering – Add Trip Duration & Rush Hour Flag

from pyspark.sql.functions import col, to_timestamp, hour, when, round

# Convert timestamps
df = df.withColumn("pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
       .withColumn("dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))

# Trip duration in minutes
df = df.withColumn("trip_duration_mins", 
                   round((col("dropoff_datetime").cast("long") - col("pickup_datetime").cast("long")) / 60, 2))

# Rush Hour flag (7–9 AM or 4–6 PM)
df = df.withColumn("pickup_hour", hour("pickup_datetime")) \
       .withColumn("is_rush_hour", when((col("pickup_hour").between(7, 9)) | (col("pickup_hour").between(16, 18)), 1).otherwise(0))

df.select("pickup_datetime", "dropoff_datetime", "trip_duration_mins", "is_rush_hour").show(5)


+-------------------+-------------------+------------------+------------+
|    pickup_datetime|   dropoff_datetime|trip_duration_mins|is_rush_hour|
+-------------------+-------------------+------------------+------------+
|2025-02-01 00:12:18|2025-02-01 00:32:33|             20.25|           0|
|2025-02-01 00:40:04|2025-02-01 00:49:15|              9.18|           0|
|2025-02-01 00:06:09|2025-02-01 00:11:51|               5.7|           0|
|2025-02-01 00:15:13|2025-02-01 00:20:19|               5.1|           0|
|2025-02-01 00:02:52|2025-02-01 00:20:25|             17.55|           0|
+-------------------+-------------------+------------------+------------+
only showing top 5 rows



In [0]:
#Aggregation

from pyspark.sql.functions import avg, round

# Average trip duration and distance by vendor and rush hour
agg_df = df.groupBy("VendorID", "is_rush_hour").agg(
    round(avg("trip_distance"), 2).alias("avg_distance"),
    round(avg("trip_duration_mins"), 2).alias("avg_duration"),
    round(avg("passenger_count"), 2).alias("avg_passengers")
)

agg_df.show()



+--------+------------+------------+------------+--------------+
|VendorID|is_rush_hour|avg_distance|avg_duration|avg_passengers|
+--------+------------+------------+------------+--------------+
|       1|           0|        3.07|       15.78|          1.14|
|       7|           1|        2.06|         0.0|          1.15|
|       1|           1|        2.86|       16.64|           1.1|
|       2|           1|        7.99|       15.85|          1.29|
|       2|           0|        6.33|       14.99|          1.33|
|       7|           0|        2.31|         0.0|          1.23|
|       6|           1|        8.39|       32.24|          null|
|       6|           0|        8.65|       28.58|          null|
+--------+------------+------------+------------+--------------+



In [0]:
#Performance Optimizations

# Repartition based on VendorID
df = df.repartition("VendorID")

# Caching the dataframe in memory
df.cache()
df.count()  # Triggers caching

# Apply a filter to test filter pushdown (assuming zone is available)
filtered_df = df.filter(col("passenger_count") > 1)
filtered_df.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------------+-------------------+------------------+-----------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|    pickup_datetime|   dropoff_datetime|trip_duration_mins|pickup_hour|is_rush_hour|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+---------------

In [0]:
#Write to Parquet and Delta Lake Formats

# Save as Parquet
df.write.mode("overwrite").parquet("/tmp/nyc_taxi_parquet")

# Save as Delta Lake
df.write.format("delta").mode("overwrite").save("/tmp/nyc_taxi_delta")


In [0]:
#Created SQL Table from Delta Lake

# Register Delta Table
spark.sql("DROP TABLE IF EXISTS nyc_taxi_delta")
spark.sql("CREATE TABLE nyc_taxi_delta USING DELTA LOCATION '/tmp/nyc_taxi_delta'")

# Query using Spark SQL
spark.sql("SELECT VendorID, COUNT(*) as trips FROM nyc_taxi_delta GROUP BY VendorID").show()


+--------+-------+
|VendorID|  trips|
+--------+-------+
|       2|2817803|
|       1| 754990|
|       6|    330|
|       7|   4420|
+--------+-------+

