In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_timestamp, hour, dayofweek, col, count, when, date_format, expr,
    round)

# Start Spark session
spark = SparkSession.builder \
    .appName("NYC Yellow Cab Analysis") \
    .getOrCreate()

# Load the Parquet file (make sure it's in the same folder or give full path)
df = spark.read.parquet("yellow_tripdata_2025-01.parquet")


In [2]:
df.limit(5).toPandas()


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1,1.6,1,N,229,237,1,10.0,3.5,0.5,3.0,0.0,1.0,18.0,2.5,0.0
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1,0.5,1,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1,0.6,1,N,141,141,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3,0.52,1,N,244,244,2,7.2,1.0,0.5,0.0,0.0,1.0,9.7,0.0,0.0
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3,0.66,1,N,244,116,2,5.8,1.0,0.5,0.0,0.0,1.0,8.3,0.0,0.0


df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()


In [4]:
# Create a DataFrame of null counts for all columns
null_counts = df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df.columns
])

# Convert to Pandas for cleaner display 
null_counts_pd = null_counts.toPandas().transpose()
null_counts_pd.columns = ['null_count']
null_counts_pd


Unnamed: 0,null_count
VendorID,0
tpep_pickup_datetime,0
tpep_dropoff_datetime,0
passenger_count,540149
trip_distance,0
RatecodeID,540149
store_and_fwd_flag,540149
PULocationID,0
DOLocationID,0
payment_type,0


In [5]:
df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [6]:
# 3. Load location lookup table 
zone_lookup_path = "taxi-zone-lookup.csv"  
zones = spark.read.option("header", True).csv(zone_lookup_path)
zones = zones.withColumn("LocationID", col("LocationID").cast("int"))

zones.show(5, truncate = False )  # First 5 rows



+----------+-------------+-----------------------+
|LocationID|Borough      |Zone                   |
+----------+-------------+-----------------------+
|1         |EWR          |Newark Airport         |
|2         |Queens       |Jamaica Bay            |
|3         |Bronx        |Allerton/Pelham Gardens|
|4         |Manhattan    |Alphabet City          |
|5         |Staten Island|Arden Heights          |
+----------+-------------+-----------------------+
only showing top 5 rows



In [7]:
# 4. Join pickup and dropoff locations
df = df.join(zones.withColumnRenamed("LocationID", "PULocationID")
                 .withColumnRenamed("Zone", "PU_Zone")
                 .withColumnRenamed("Borough", "PU_Borough"),
             on="PULocationID", how="left")

df = df.join(zones.withColumnRenamed("LocationID", "DOLocationID")
                 .withColumnRenamed("Zone", "DO_Zone")
                 .withColumnRenamed("Borough", "DO_Borough"),
             on="DOLocationID", how="left")


In [8]:
# 5. Convert timestamp columns
df = df.withColumn("pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
       .withColumn("dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))


In [9]:
# 6. Create trip duration (in minutes)
df = df.withColumn("trip_duration", 
               (col("dropoff_datetime").cast("long") - col("pickup_datetime").cast("long")) / 60)

In [10]:
# 7. Data cleaning
df = df.filter(
    (col("trip_duration") > 0) &
    (col("trip_distance") > 0) &
    (col("fare_amount") > 0) &
    (col("passenger_count") > 0)
)


In [11]:
# 8. Feature engineering
df = df.withColumn("pickup_hour", hour("pickup_datetime")) \
       .withColumn("day_of_week", date_format("pickup_datetime", "E")) \
       .withColumn("tip_percent", round((col("tip_amount") / col("fare_amount")) * 100, 2))


In [47]:
# 9A. Average trip duration & distance by hour (safe approach)
avg_trip_by_hour = df.groupBy("pickup_hour").agg(
    round(expr("avg(trip_duration)"), 2).alias("avg_duration_min"),
    round(expr("avg(trip_distance)"), 2).alias("avg_distance_km"),
    round(expr("avg(fare_amount)"), 2).alias("avg_fare_amount")
).orderBy("pickup_hour")

avg_trip_by_hour.show(24)


+-----------+----------------+---------------+---------------+
|pickup_hour|avg_duration_min|avg_distance_km|avg_fare_amount|
+-----------+----------------+---------------+---------------+
|          0|           13.99|           3.81|          19.18|
|          1|           12.83|           3.25|          17.16|
|          2|           12.63|           3.01|          16.22|
|          3|           12.41|            3.3|          17.25|
|          4|           14.38|            4.8|           23.2|
|          5|           16.86|           6.04|          27.08|
|          6|           15.84|           5.86|          21.63|
|          7|            15.3|           3.53|          18.57|
|          8|           15.26|           2.93|           17.0|
|          9|           15.05|           3.24|          17.13|
|         10|           15.02|           2.94|          17.42|
|         11|           15.41|           2.84|          17.24|
|         12|           15.31|           2.96|         

In [13]:
# 9B. Total trips per day
trips_per_day = df.withColumn("pickup_date", date_format("pickup_datetime", "yyyy-MM-dd")) \
    .groupBy("pickup_date").count().orderBy("pickup_date")
trips_per_day.show()

+-----------+------+
|pickup_date| count|
+-----------+------+
| 2024-12-31|    21|
| 2025-01-01| 70353|
| 2025-01-02| 77542|
| 2025-01-03| 83756|
| 2025-01-04| 89264|
| 2025-01-05| 72211|
| 2025-01-06| 72852|
| 2025-01-07| 88599|
| 2025-01-08| 95886|
| 2025-01-09| 99956|
| 2025-01-10| 94130|
| 2025-01-11| 99342|
| 2025-01-12| 81063|
| 2025-01-13| 84903|
| 2025-01-14| 99977|
| 2025-01-15|102707|
| 2025-01-16|109721|
| 2025-01-17| 95748|
| 2025-01-18| 92357|
| 2025-01-19| 77510|
+-----------+------+
only showing top 20 rows



In [14]:
# 9C. Top 10 pickup zones
top_pu_zones = df.groupBy("PU_Zone").count() \
    .orderBy(col("count").desc()).limit(10)
top_pu_zones.show(truncate=False)

+----------------------------+------+
|PU_Zone                     |count |
+----------------------------+------+
|Upper East Side South       |148003|
|Midtown Center              |146750|
|Upper East Side North       |137810|
|JFK Airport                 |133722|
|Penn Station/Madison Sq West|108175|
|Times Sq/Theatre District   |107524|
|Midtown East                |105056|
|Lincoln Square East         |97557 |
|LaGuardia Airport           |85429 |
|Midtown North               |84648 |
+----------------------------+------+



In [15]:
# 9D. Top 10 drop-off zones
top_do_zones = df.groupBy("DO_Zone").count() \
    .orderBy(col("count").desc()).limit(10)
top_do_zones.show(truncate=False)

+-------------------------+------+
|DO_Zone                  |count |
+-------------------------+------+
|Upper East Side North    |144476|
|Upper East Side South    |133560|
|Midtown Center           |111895|
|Times Sq/Theatre District|89103 |
|Lincoln Square East      |85342 |
|Upper West Side South    |84955 |
|Murray Hill              |82651 |
|Lenox Hill West          |80319 |
|Midtown East             |79603 |
|Midtown North            |72780 |
+-------------------------+------+



In [16]:
# 9E. Top 10 most common routes
top_routes = df.groupBy("PU_Zone", "DO_Zone").count() \
    .orderBy(col("count").desc()).limit(10)
top_routes.show(truncate=False)

+---------------------+---------------------+-----+
|PU_Zone              |DO_Zone              |count|
+---------------------+---------------------+-----+
|Upper East Side South|Upper East Side North|23437|
|Upper East Side North|Upper East Side South|20241|
|Upper East Side North|Upper East Side North|16384|
|Upper East Side South|Upper East Side South|15613|
|Midtown Center       |Upper East Side South|10703|
|Upper East Side South|Midtown Center       |9363 |
|Midtown Center       |Upper East Side North|9142 |
|Upper West Side South|Upper West Side North|8875 |
|Lincoln Square East  |Upper West Side South|8562 |
|Upper West Side South|Lincoln Square East  |8052 |
+---------------------+---------------------+-----+

