# Initialize Spark Session


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("TaxiDataAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print("Spark Version:", spark.version)
print("Spark UI:", spark.sparkContext.uiWebUrl)

Spark Version: 3.1.2
Spark UI: http://itvdelab:4040


# Load Datasets

In [2]:
# Load taxi trip data
trips_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/user/root/landing_zone/trips/taxi_trip_data.csv")

# Load taxi zone data  
zones_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/user/root/landing_zone/zones/taxi_zone_geo.csv")

print("Trips DataFrame loaded successfully")
print("Zones DataFrame loaded successfully")

Trips DataFrame loaded successfully
Zones DataFrame loaded successfully


# Initial Data Inspection

In [3]:
# Check basic info about trips dataset
print("=== TRIPS DATASET ===")
print(f"Number of rows: {trips_df.count():,}")
print(f"Number of columns: {len(trips_df.columns)}")
print("\nSchema:")
trips_df.printSchema()

print("\nFirst 5 rows:")
trips_df.show(5)

=== TRIPS DATASET ===
Number of rows: 1,048,575
Number of columns: 16

Schema:
root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- imp_surcharge: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)


First 5 rows:
+---------+----------------+----------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------

In [4]:
# Check basic info about zones dataset
print("=== ZONES DATASET ===")
print(f"Number of rows: {zones_df.count():,}")
print(f"Number of columns: {len(zones_df.columns)}")
print("\nSchema:")
zones_df.printSchema()

print("\nFirst 5 rows:")
zones_df.show(5)

=== ZONES DATASET ===
Number of rows: 263
Number of columns: 4

Schema:
root
 |-- zone_id: integer (nullable = true)
 |-- zone_name: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- zone_geom: string (nullable = true)


First 5 rows:
+-------+--------------------+-------+--------------------+
|zone_id|           zone_name|borough|           zone_geom|
+-------+--------------------+-------+--------------------+
|      1|      Newark Airport|    EWR|POLYGON((-74.1856...|
|      3|Allerton/Pelham G...|  Bronx|POLYGON((-73.8485...|
|     18|        Bedford Park|  Bronx|POLYGON((-73.8844...|
|     20|             Belmont|  Bronx|POLYGON((-73.8839...|
|     31|          Bronx Park|  Bronx|POLYGON((-73.8710...|
+-------+--------------------+-------+--------------------+
only showing top 5 rows



# MISSING/INVALID VALUES ANALYSIS 

In [5]:
# Check for null values in trips dataset
print("NULL VALUES IN TRIPS DATASET:")
for column in trips_df.columns:
    null_count = trips_df.filter(col(column).isNull()).count()
    if null_count > 0:
        print(f"{column}: {null_count:,} nulls")
    else:
        print(f"{column}: 0 nulls")

print("\nZERO VALUES IN NUMERIC COLUMNS:")
numeric_columns = ['passenger_count', 'trip_distance', 'fare_amount', 'tip_amount', 'extra', 'mta_tax', 'tolls_amount', 'imp_surcharge']
for column in numeric_columns:
    zero_count = trips_df.filter(col(column) == 0).count()
    print(f"{column}: {zero_count:,} zeros")

print("\nNEGATIVE VALUES CHECK:")
for column in numeric_columns:
    negative_count = trips_df.filter(col(column) < 0).count()
    if negative_count > 0:
        print(f"{column}: {negative_count:,} negative values")

print("\n=== ZONES DATASET NULL CHECK ===")
for column in zones_df.columns:
    null_count = zones_df.filter(col(column).isNull()).count()
    if null_count > 0:
        print(f"{column}: {null_count:,} nulls")
    else:
        print(f"{column}: 0 nulls")

NULL VALUES IN TRIPS DATASET:
vendor_id: 0 nulls
pickup_datetime: 0 nulls
dropoff_datetime: 0 nulls
passenger_count: 0 nulls
trip_distance: 0 nulls
rate_code: 0 nulls
store_and_fwd_flag: 0 nulls
payment_type: 0 nulls
fare_amount: 0 nulls
extra: 0 nulls
mta_tax: 0 nulls
tip_amount: 0 nulls
tolls_amount: 0 nulls
imp_surcharge: 0 nulls
pickup_location_id: 0 nulls
dropoff_location_id: 0 nulls

ZERO VALUES IN NUMERIC COLUMNS:
passenger_count: 9,408 zeros
trip_distance: 7,143 zeros
fare_amount: 280 zeros
tip_amount: 356,125 zeros
extra: 558,667 zeros
mta_tax: 4,707 zeros
tolls_amount: 990,766 zeros
imp_surcharge: 312 zeros

NEGATIVE VALUES CHECK:
fare_amount: 668 negative values
tip_amount: 8 negative values
extra: 320 negative values
mta_tax: 645 negative values
tolls_amount: 9 negative values
imp_surcharge: 668 negative values

=== ZONES DATASET NULL CHECK ===
zone_id: 0 nulls
zone_name: 0 nulls
borough: 0 nulls
zone_geom: 0 nulls


# CLEANING INVALID VALUES

In [6]:

print(f"Original dataset: {trips_df.count():,} rows")

# Remove records with invalid data
trips_cleaned = trips_df.filter(
    # Remove zero or negative passenger counts
    (col("passenger_count") > 0) & (col("passenger_count") <= 8) &
    
    # Remove negative fares and costs
    (col("fare_amount") >= 0) &
    (col("tip_amount") >= 0) &
    (col("extra") >= 0) &
    (col("mta_tax") >= 0) &
    (col("tolls_amount") >= 0) &
    (col("imp_surcharge") >= 0) &
    
    # Remove negative trip distances
    (col("trip_distance") >= 0) &
    
    # Remove extreme outliers
    (col("fare_amount") <= 500) &  # Fares over $500 are suspicious
    (col("tip_amount") <= 100)  # Tips over $100 are suspicious
)

cleaned_count = trips_cleaned.count()
removed_count = trips_df.count() - cleaned_count

print(f"After cleaning: {cleaned_count:,} rows")
print(f"Removed: {removed_count:,} rows ({removed_count/trips_df.count()*100:.1f}%)")


Original dataset: 1,048,575 rows
After cleaning: 1,038,478 rows
Removed: 10,097 rows (1.0%)


# DUPLICATE ANALYSIS AND REMOVAL

In [7]:

print(f"Current dataset: {trips_cleaned.count():,} rows")

# Check for exact duplicates
distinct_count = trips_cleaned.distinct().count()
duplicate_count = trips_cleaned.count() - distinct_count

print(f"Distinct rows: {distinct_count:,}")
print(f"Exact duplicates: {duplicate_count:,}")

# Remove exact duplicates if any exist
if duplicate_count > 0:
    trips_no_duplicates = trips_cleaned.distinct()
    print(f"After removing exact duplicates: {trips_no_duplicates.count():,} rows")
else:
    trips_no_duplicates = trips_cleaned
    print("No exact duplicates found")



Current dataset: 1,038,478 rows
Distinct rows: 1,037,650
Exact duplicates: 828
After removing exact duplicates: 1,037,650 rows


In [8]:

trips_final_clean = trips_no_duplicates


# CREATING DERIVED COLUMNS

In [9]:

# 1. Create trip duration in minutes
trips_with_features = trips_final_clean.withColumn(
    "trip_duration_minutes",
    (unix_timestamp(to_timestamp(col("dropoff_datetime"), "M/d/yyyy H:mm")) - 
     unix_timestamp(to_timestamp(col("pickup_datetime"), "M/d/yyyy H:mm"))) / 60
)

print("Trip duration added:")
trips_with_features.select("pickup_datetime", "dropoff_datetime", "trip_duration_minutes").show(5)

# Check for invalid durations
invalid_durations = trips_with_features.filter(
    (col("trip_duration_minutes") <= 0) | (col("trip_duration_minutes") > 1440)
).count()

print(f"Invalid trip durations (<=0 or >24 hours): {invalid_durations:,}")

# Remove invalid durations
trips_with_features = trips_with_features.filter(
    (col("trip_duration_minutes") > 0) & (col("trip_duration_minutes") <= 1440)
)

print(f"After removing invalid durations: {trips_with_features.count():,} rows")


Trip duration added:
+---------------+----------------+---------------------+
|pickup_datetime|dropoff_datetime|trip_duration_minutes|
+---------------+----------------+---------------------+
|10/8/2018 10:00| 10/8/2018 10:37|                 37.0|
| 9/3/2018 18:49|  9/3/2018 18:55|                  6.0|
| 9/20/2018 7:35|  9/20/2018 7:44|                  9.0|
|3/10/2018 10:20| 3/10/2018 10:30|                 10.0|
| 6/27/2018 7:58|  6/27/2018 8:06|                  8.0|
+---------------+----------------+---------------------+
only showing top 5 rows

Invalid trip durations (<=0 or >24 hours): 5,826
After removing invalid durations: 1,031,824 rows


# CREATING TOTAL TRIP COST COLUMN 

In [10]:

# 2. Create total trip cost (fare + extra + mta_tax + tip + tolls + surcharge)
trips_with_cost = trips_with_features.withColumn(
    "total_trip_cost",
    col("fare_amount") + 
    col("extra") + 
    col("mta_tax") + 
    col("tip_amount") + 
    col("tolls_amount") + 
    col("imp_surcharge")
)

print("Total trip cost added:")
trips_with_cost.select(
    "fare_amount", "extra", "mta_tax", "tip_amount", 
    "tolls_amount", "imp_surcharge", "total_trip_cost"
).show(5)



Total trip cost added:
+-----------+-----+-------+----------+------------+-------------+------------------+
|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|   total_trip_cost|
+-----------+-----+-------+----------+------------+-------------+------------------+
|       31.0|  0.0|    0.5|       0.0|         0.0|          0.3|              31.8|
|        6.5|  0.0|    0.5|      1.46|         0.0|          0.3| 8.760000000000002|
|        7.5|  0.0|    0.5|       0.0|         0.0|          0.3|               8.3|
|       10.0|  0.0|    0.5|      2.16|         0.0|          0.3|             12.96|
|        8.5|  0.0|    0.5|      2.32|         0.0|          0.3|11.620000000000001|
+-----------+-----+-------+----------+------------+-------------+------------------+
only showing top 5 rows



# JOINING WITH ZONES

In [11]:

# Join with zones for pickup and dropoff information
trips_with_zones = trips_with_cost.join(
    zones_df.select(
        col("zone_id").alias("pickup_zone_id"),
        col("zone_name").alias("pickup_zone_name"),
        col("borough").alias("pickup_borough")
    ),
    trips_with_cost.pickup_location_id == col("pickup_zone_id"),
    "left"
).drop("pickup_zone_id").join(
    zones_df.select(
        col("zone_id").alias("dropoff_zone_id"),
        col("zone_name").alias("dropoff_zone_name"),
        col("borough").alias("dropoff_borough")
    ),
    col("dropoff_location_id") == col("dropoff_zone_id"),
    "left"
).drop("dropoff_zone_id")



#  CREATING TIME OF DAY 

In [12]:
# Add time of day derived column
trips_enriched = trips_with_zones.withColumn(
    "pickup_hour",
    hour(to_timestamp(col("pickup_datetime"), "M/d/yyyy H:mm"))
).withColumn(
    "time_of_day",
    when((col("pickup_hour") >= 6) & (col("pickup_hour") < 12), "Morning")
    .when((col("pickup_hour") >= 12) & (col("pickup_hour") < 18), "Afternoon") 
    .when((col("pickup_hour") >= 18) & (col("pickup_hour") < 22), "Evening")
    .otherwise("Night")
)

print(f"Final enriched dataset: {trips_enriched.count():,} rows")

# Show sample of enriched data
print("Sample enriched data:")
trips_enriched.select(
    "pickup_zone_name", "pickup_borough", "dropoff_zone_name", "dropoff_borough",
    "time_of_day", "trip_duration_minutes", "total_trip_cost"
).show(5, truncate=False)

# Cache the final dataset
trips_enriched.cache()
print(f"\nFinal dataset cached: {trips_enriched.count():,} rows")

Final enriched dataset: 1,032,120 rows
Sample enriched data:
+-------------------------+--------------+---------------------+---------------+-----------+---------------------+------------------+
|pickup_zone_name         |pickup_borough|dropoff_zone_name    |dropoff_borough|time_of_day|trip_duration_minutes|total_trip_cost   |
+-------------------------+--------------+---------------------+---------------+-----------+---------------------+------------------+
|Garment District         |Manhattan     |Jackson Heights      |Queens         |Morning    |37.0                 |31.8              |
|Gramercy                 |Manhattan     |UN/Turtle Bay South  |Manhattan      |Evening    |6.0                  |8.760000000000002 |
|West Chelsea/Hudson Yards|Manhattan     |East Chelsea         |Manhattan      |Morning    |9.0                  |8.3               |
|Yorkville East           |Manhattan     |Upper West Side South|Manhattan      |Morning    |10.0                 |12.96             |
|

In [13]:
print("Query 1: Most Common Payment Type per Time of Day")

# Calculate payment type distribution by time of day
from pyspark.sql.window import Window

payment_by_time = trips_enriched.groupBy("time_of_day", "payment_type").count()

# Find the most common payment type for each time of day
window_spec = Window.partitionBy("time_of_day").orderBy(col("count").desc())

most_common_payment = payment_by_time.withColumn(
    "rank", row_number().over(window_spec)
).filter(col("rank") == 1).drop("rank")

print("\nMost common payment type by time of day:")
most_common_payment.orderBy(
    when(col("time_of_day") == "Morning", 1)
    .when(col("time_of_day") == "Afternoon", 2)
    .when(col("time_of_day") == "Evening", 3)
    .otherwise(4)
).show()

print("\nDetailed payment type breakdown:")
payment_by_time.orderBy("time_of_day", col("count").desc()).show(20)


Query 1: Most Common Payment Type per Time of Day

Most common payment type by time of day:
+-----------+------------+------+
|time_of_day|payment_type| count|
+-----------+------------+------+
|    Morning|           1|180026|
|  Afternoon|           1|220329|
|    Evening|           1|177414|
|      Night|           1|140280|
+-----------+------------+------+


Detailed payment type breakdown:
+-----------+------------+------+
|time_of_day|payment_type| count|
+-----------+------------+------+
|  Afternoon|           1|220329|
|  Afternoon|           2|107889|
|  Afternoon|           3|  1358|
|  Afternoon|           4|   387|
|    Evening|           1|177414|
|    Evening|           2| 67434|
|    Evening|           3|   921|
|    Evening|           4|   232|
|    Morning|           1|180026|
|    Morning|           2| 74609|
|    Morning|           3|   841|
|    Morning|           4|   266|
|      Night|           1|140280|
|      Night|           2| 58742|
|      Night|          

In [14]:
print("=== Query 2: Top Boroughs by Revenue and Trip Volume ===")

# Calculate revenue and trip stats by pickup borough
borough_stats = trips_enriched.filter(col("pickup_borough").isNotNull()).groupBy("pickup_borough").agg(
    sum("total_trip_cost").alias("total_revenue"),
    count("*").alias("trip_count"),
    avg("total_trip_cost").alias("avg_trip_cost"),
    avg("trip_distance").alias("avg_distance")
).orderBy(col("total_revenue").desc())

print("Top boroughs by total revenue:")
borough_stats.show()

# Compare revenue vs volume
print("\nRevenue vs Trip Volume comparison:")
borough_stats.select(
    "pickup_borough",
    "total_revenue", 
    "trip_count",
    round(col("total_revenue") / col("trip_count"), 2).alias("revenue_per_trip")
).orderBy(col("revenue_per_trip").desc()).show()


=== Query 2: Top Boroughs by Revenue and Trip Volume ===
Top boroughs by total revenue:
+--------------+-------------------+----------+------------------+------------------+
|pickup_borough|      total_revenue|trip_count|     avg_trip_cost|      avg_distance|
+--------------+-------------------+----------+------------------+------------------+
|     Manhattan|1.334234415000055E7|    937649|14.229572206657876|2.3458535550083246|
|        Queens|  2912391.309999994|     64934|44.851561739612436|11.370727353928606|
|      Brooklyn| 237696.29999999973|     12901|18.424641500658844| 3.851692892023874|
|         Bronx| 29942.459999999992|      1159|25.834736842105258| 6.169240724762723|
|           EWR| 2737.4100000000008|        32| 85.54406250000002|3.7971874999999993|
| Staten Island| 1633.1199999999994|        21| 77.76761904761902| 9.676666666666662|
+--------------+-------------------+----------+------------------+------------------+


Revenue vs Trip Volume comparison:
+--------------

In [15]:
# Manhattan dominates in volume but has the lowest revenue per trip, while airports (EWR) have the highest revenue per trip

In [16]:
print("=== Query 3: Average Tip Amount by Passenger Count ===")

# Calculate average tip by passenger count
tip_by_passengers = trips_enriched.groupBy("passenger_count").agg(
    avg("tip_amount").alias("avg_tip"),
    count("*").alias("trip_count"),
    avg("total_trip_cost").alias("avg_total_cost"),
    (avg("tip_amount") / avg("fare_amount") * 100).alias("avg_tip_percentage")
).orderBy("passenger_count")

print("Average tip amount by passenger count:")
tip_by_passengers.show()

# Calculate tip percentage categories
print("\nTip behavior analysis:")
trips_enriched.withColumn(
    "tip_category",
    when(col("tip_amount") == 0, "No Tip")
    .when(col("tip_amount") <= 2, "Low Tip ($0-2)")
    .when(col("tip_amount") <= 5, "Medium Tip ($2-5)")
    .when(col("tip_amount") <= 10, "High Tip ($5-10)")
    .otherwise("Very High Tip ($10+)")
).groupBy("passenger_count", "tip_category").count().orderBy("passenger_count", col("count").desc()).show(30)



=== Query 3: Average Tip Amount by Passenger Count ===
Average tip amount by passenger count:
+---------------+------------------+----------+------------------+------------------+
|passenger_count|           avg_tip|trip_count|    avg_total_cost|avg_tip_percentage|
+---------------+------------------+----------+------------------+------------------+
|              1|1.8675428056757348|    740556|16.113377853937227|14.602201491460127|
|              2| 1.858311846712745|    152498|16.756763891985393|13.905391751239183|
|              3|1.8326605356937573|     43495| 16.72456604207383|13.721446556603581|
|              4|1.7026074233039077|     20503| 16.95160074135491|12.463376387607507|
|              5|1.8789114835082938|     46963|16.384286353086505|14.443669638974391|
|              6|1.8622374835426814|     28103| 16.17576272995764|14.475947854517083|
|              7|              0.73|         2| 8.780000000000001| 9.542483660130719|
+---------------+------------------+----------

In [17]:
print("=== Query 4: Best 5 Pickup Zones for Drivers ===")

# Find top 5 zones by total revenue (best for drivers)
best_zones = trips_enriched.filter(col("pickup_zone_name").isNotNull()).groupBy("pickup_zone_name", "pickup_borough").agg(
    count("*").alias("total_trips"),
    sum("total_trip_cost").alias("total_revenue")
).orderBy(col("total_revenue").desc()).limit(5)

print("Best 5 pickup zones by total revenue:")
best_zones.show(truncate=False)

# Find their busiest time of day
print("\nBusiest time of day for each top zone:")
best_zone_names = [row.pickup_zone_name for row in best_zones.collect()]

# Filter trips for top zones only
top_zones_trips = trips_enriched.filter(col("pickup_zone_name").isin(best_zone_names))

# Group by zone and time of day, count trips
zone_time_counts = top_zones_trips.groupBy("pickup_zone_name", "time_of_day").count()

# Add ranking - rank times by trip count for each zone
window_spec = Window.partitionBy("pickup_zone_name").orderBy(col("count").desc())
zone_time_ranked = zone_time_counts.withColumn("rank", row_number().over(window_spec))

# Keep only the busiest time (rank = 1) for each zone
busiest_times = zone_time_ranked.filter(col("rank") == 1)

#Select and show results
busiest_times.select(
    "pickup_zone_name", 
    "time_of_day", 
    col("count").alias("peak_trips")
).show(truncate=False)


=== Query 4: Best 5 Pickup Zones for Drivers ===
Best 5 pickup zones by total revenue:
+-------------------------+--------------+-----------+------------------+
|pickup_zone_name         |pickup_borough|total_trips|total_revenue     |
+-------------------------+--------------+-----------+------------------+
|JFK Airport              |Queens        |25339      |1449101.2200000025|
|LaGuardia Airport        |Queens        |28757      |1225029.5300000007|
|Midtown Center           |Manhattan     |39763      |592042.2600000004 |
|Times Sq/Theatre District|Manhattan     |35027      |569141.5000000002 |
|Midtown East             |Manhattan     |36390      |536255.1300000002 |
+-------------------------+--------------+-----------+------------------+


Busiest time of day for each top zone:
+-------------------------+-----------+----------+
|pickup_zone_name         |time_of_day|peak_trips|
+-------------------------+-----------+----------+
|LaGuardia Airport        |Afternoon  |10547     |
|M

In [18]:
print("=== Query 5: Top 5 Longest Trips ===")

# Find longest trips by duration
longest_trips = trips_enriched.filter(
    col("trip_duration_minutes").isNotNull() & 
    col("pickup_zone_name").isNotNull() & 
    col("dropoff_zone_name").isNotNull()
).select(
    round(col("trip_duration_minutes"), 1).alias("duration_minutes"),
    round(col("fare_amount"), 2).alias("fare"),
    "pickup_zone_name",
    "dropoff_zone_name", 
    "pickup_borough",
    "dropoff_borough",
    "payment_type",
    round(col("trip_distance"), 2).alias("distance_miles")
).orderBy(col("trip_duration_minutes").desc()).limit(5)

print("Top 5 longest trips by duration:")
longest_trips.show(truncate=False)


=== Query 5: Top 5 Longest Trips ===
Top 5 longest trips by duration:
+----------------+----+---------------------+-----------------------------+--------------+---------------+------------+--------------+
|duration_minutes|fare|pickup_zone_name     |dropoff_zone_name            |pickup_borough|dropoff_borough|payment_type|distance_miles|
+----------------+----+---------------------+-----------------------------+--------------+---------------+------------+--------------+
|1440.0          |15.5|Upper East Side North|Gramercy                     |Manhattan     |Manhattan      |1           |3.34          |
|1440.0          |7.0 |Lenox Hill East      |Upper East Side North        |Manhattan     |Manhattan      |1           |1.06          |
|1440.0          |12.0|Lenox Hill East      |Gramercy                     |Manhattan     |Manhattan      |1           |2.46          |
|1440.0          |12.0|Battery Park City    |Meatpacking/West Village West|Manhattan     |Manhattan      |1           |1

In [19]:
print("=== Query 5: Top 5 Longest Realistic Trips ===")

# Find longest realistic trips (less than 4 hours = 240 minutes)
longest_realistic_trips = trips_enriched.filter(
    (col("trip_duration_minutes") > 0) & 
    (col("trip_duration_minutes") < 240) &  # Less than 4 hours
    col("pickup_zone_name").isNotNull() & 
    col("dropoff_zone_name").isNotNull()
).select(
    round(col("trip_duration_minutes"), 1).alias("duration_minutes"),
    round(col("fare_amount"), 2).alias("fare"),
    "pickup_zone_name",
    "dropoff_zone_name", 
    "pickup_borough",
    "dropoff_borough",
    "payment_type",
    round(col("trip_distance"), 2).alias("distance_miles"),
    round(col("total_trip_cost"), 2).alias("total_cost")
).orderBy(col("trip_duration_minutes").desc()).limit(5)

print("Top 5 longest realistic trips (under 4 hours):")
longest_realistic_trips.show(truncate=False)


=== Query 5: Top 5 Longest Realistic Trips ===
Top 5 longest realistic trips (under 4 hours):
+----------------+----+-------------------------+-------------------------+--------------+---------------+------------+--------------+----------+
|duration_minutes|fare|pickup_zone_name         |dropoff_zone_name        |pickup_borough|dropoff_borough|payment_type|distance_miles|total_cost|
+----------------+----+-------------------------+-------------------------+--------------+---------------+------------+--------------+----------+
|239.0           |23.0|LaGuardia Airport        |Midtown Center           |Queens        |Manhattan      |1           |6.48          |29.16     |
|235.0           |28.5|LaGuardia Airport        |Times Sq/Theatre District|Queens        |Manhattan      |1           |9.4           |42.67     |
|235.0           |52.0|JFK Airport              |Garment District         |Queens        |Manhattan      |2           |17.4          |58.56     |
|234.0           |70.0|West Ch

In [20]:
print("=== Query 6: Most Frequent Inter-Borough Pickup → Dropoff Routes ===")

# Find most frequent inter-borough routes
inter_borough_routes = trips_enriched.filter(
    col("pickup_borough").isNotNull() & 
    col("dropoff_borough").isNotNull() &
    (col("pickup_borough") != col("dropoff_borough")) 
).groupBy("pickup_borough", "dropoff_borough").agg(
    count("*").alias("trip_count"),
    avg("total_trip_cost").alias("avg_cost"),
    avg("trip_distance").alias("avg_distance"),
    avg("trip_duration_minutes").alias("avg_duration")
).orderBy(col("trip_count").desc())

print("Top inter-borough routes:")
inter_borough_routes.select(
    concat(col("pickup_borough"), lit(" to "), col("dropoff_borough")).alias("route"),
    "trip_count",
    round(col("avg_cost"), 2).alias("avg_cost"),
    round(col("avg_distance"), 2).alias("avg_distance"),
    round(col("avg_duration"), 1).alias("avg_duration_min")
).show(10, truncate=False)

=== Query 6: Most Frequent Inter-Borough Pickup → Dropoff Routes ===
Top inter-borough routes:
+---------------------+----------+--------+------------+----------------+
|route                |trip_count|avg_cost|avg_distance|avg_duration_min|
+---------------------+----------+--------+------------+----------------+
|Queens to Manhattan  |37710     |52.3    |13.08       |43.3            |
|Manhattan to Queens  |31953     |42.36   |10.39       |38.2            |
|Manhattan to Brooklyn|26824     |28.4    |6.32        |31.4            |
|Queens to Brooklyn   |8875      |46.3    |13.14       |41.3            |
|Manhattan to Bronx   |4809      |31.79   |8.71        |34.0            |
|Brooklyn to Manhattan|3823      |25.12   |5.64        |27.2            |
|Manhattan to EWR     |1866      |96.51   |17.53       |50.1            |
|Queens to Bronx      |1063      |54.42   |15.48       |41.9            |
|Brooklyn to Queens   |803       |33.65   |9.11        |33.3            |
|Bronx to Manhatt

In [21]:
# Check for null values in trips dataset
print("=== NULL VALUES ANALYSIS - TRIPS ===")
null_counts_trips = []
for column in trips_df.columns:
    null_count = trips_df.filter(col(column).isNull()).count()
    null_counts_trips.append((column, null_count))
    print(f"{column}: {null_count:,} nulls")

# Check for zero values in numeric columns
print("\n=== ZERO VALUES ANALYSIS - TRIPS ===")
numeric_columns = ['passenger_count', 'trip_distance', 'fare_amount', 'tip_amount']
for column in numeric_columns:
    zero_count = trips_df.filter(col(column) == 0).count()
    print(f"{column}: {zero_count:,} zeros")

=== NULL VALUES ANALYSIS - TRIPS ===
vendor_id: 0 nulls
pickup_datetime: 0 nulls
dropoff_datetime: 0 nulls
passenger_count: 0 nulls
trip_distance: 0 nulls
rate_code: 0 nulls
store_and_fwd_flag: 0 nulls
payment_type: 0 nulls
fare_amount: 0 nulls
extra: 0 nulls
mta_tax: 0 nulls
tip_amount: 0 nulls
tolls_amount: 0 nulls
imp_surcharge: 0 nulls
pickup_location_id: 0 nulls
dropoff_location_id: 0 nulls

=== ZERO VALUES ANALYSIS - TRIPS ===
passenger_count: 9,408 zeros
trip_distance: 7,143 zeros
fare_amount: 280 zeros
tip_amount: 356,125 zeros


In [22]:
# Check for null values in zones dataset
print("=== NULL VALUES ANALYSIS - ZONES ===")
null_counts_zones = []
for column in zones_df.columns:
    null_count = zones_df.filter(col(column).isNull()).count()
    null_counts_zones.append((column, null_count))
    print(f"{column}: {null_count:,} nulls")

=== NULL VALUES ANALYSIS - ZONES ===
zone_id: 0 nulls
zone_name: 0 nulls
borough: 0 nulls
zone_geom: 0 nulls


In [23]:
print("=== DATA CLEANING - REMOVING INVALID RECORDS ===")

print(f"Original dataset: {trips_df.count():,} rows")

# Remove trips with invalid passenger counts (0 or unrealistic numbers)
trips_cleaned = trips_df.filter(
    (col("passenger_count") > 0) & (col("passenger_count") <= 8)
)
print(f"After removing invalid passenger_count (0 or >8): {trips_cleaned.count():,} rows")

# Remove trips with zero distance but positive fare (likely data errors)
trips_cleaned = trips_cleaned.filter(
    ~((col("trip_distance") == 0) & (col("fare_amount") > 0))
)
print(f"After removing zero distance with positive fare: {trips_cleaned.count():,} rows")

# Remove trips with zero or negative fare amounts (data errors)
trips_cleaned = trips_cleaned.filter(col("fare_amount") > 0)
print(f"After removing zero/negative fares: {trips_cleaned.count():,} rows")

# Remove extreme outliers (very long trips or very high fares)
trips_cleaned = trips_cleaned.filter(
    (col("total_trip_cost") <= 500) & # Costs over $500 are suspicious
    (col("trip_duration_minutes") > 0) & (col("trip_duration_minutes") <= 1440)  # 0-24 hours
)
print(f"After removing extreme outliers: {trips_cleaned.count():,} rows")

# Calculate data reduction
original_count = trips_final.count()
cleaned_count = trips_cleaned.count()
reduction_pct = (original_count - cleaned_count) / original_count * 100

print(f"\nData cleaning summary:")
print(f"Original: {original_count:,} rows")
print(f"Cleaned: {cleaned_count:,} rows")
print(f"Removed: {original_count - cleaned_count:,} rows ({reduction_pct:.1f}%)")

=== DATA CLEANING - REMOVING INVALID RECORDS ===
Original dataset: 1,048,575 rows
After removing invalid passenger_count (0 or >8): 1,039,165 rows
After removing zero distance with positive fare: 1,032,448 rows
After removing zero/negative fares: 1,031,507 rows


AnalysisException: cannot resolve '`total_trip_cost`' given input columns: [dropoff_datetime, dropoff_location_id, extra, fare_amount, imp_surcharge, mta_tax, passenger_count, payment_type, pickup_datetime, pickup_location_id, rate_code, store_and_fwd_flag, tip_amount, tolls_amount, trip_distance, vendor_id];
'Filter ((('total_trip_cost <= 500) AND ('trip_duration_minutes > 0)) AND ('trip_duration_minutes <= 1440))
+- Filter (fare_amount#24 > cast(0 as double))
   +- Filter NOT ((trip_distance#20 = cast(0 as double)) AND (fare_amount#24 > cast(0 as double)))
      +- Filter ((passenger_count#19 > 0) AND (passenger_count#19 <= 8))
         +- Relation[vendor_id#16,pickup_datetime#17,dropoff_datetime#18,passenger_count#19,trip_distance#20,rate_code#21,store_and_fwd_flag#22,payment_type#23,fare_amount#24,extra#25,mta_tax#26,tip_amount#27,tolls_amount#28,imp_surcharge#29,pickup_location_id#30,dropoff_location_id#31] csv


In [None]:
print("=== DATA CLEANING - REMOVING INVALID RECORDS ===")

print(f"Original dataset: {trips_final.count():,} rows")

# Remove trips with invalid passenger counts (0 or unrealistic numbers)
trips_cleaned = trips_final.filter(
    (col("passenger_count") > 0) & (col("passenger_count") <= 8)
)
print(f"After removing invalid passenger_count (0 or >8): {trips_cleaned.count():,} rows")

# Remove trips with zero distance but positive fare (likely data errors)
trips_cleaned = trips_cleaned.filter(
    ~((col("trip_distance") == 0) & (col("fare_amount") > 0))
)
print(f"After removing zero distance with positive fare: {trips_cleaned.count():,} rows")

# Remove trips with zero or negative fare amounts (data errors)
trips_cleaned = trips_cleaned.filter(col("fare_amount") > 0)
print(f"After removing zero/negative fares: {trips_cleaned.count():,} rows")

# Remove extreme outliers (very long trips or very high fares)
trips_cleaned = trips_cleaned.filter(
    (col("trip_distance") <= 100) &  # Trips over 100 miles are suspicious
    (col("total_trip_cost") <= 500) & # Costs over $500 are suspicious
    (col("trip_duration_minutes") > 0) & (col("trip_duration_minutes") <= 1440)  # 0-24 hours
)
print(f"After removing extreme outliers: {trips_cleaned.count():,} rows")

# Calculate data reduction
original_count = trips_final.count()
cleaned_count = trips_cleaned.count()
reduction_pct = (original_count - cleaned_count) / original_count * 100

print(f"\nData cleaning summary:")
print(f"Original: {original_count:,} rows")
print(f"Cleaned: {cleaned_count:,} rows")
print(f"Removed: {original_count - cleaned_count:,} rows ({reduction_pct:.1f}%)")

In [None]:
# Check for duplicates in trips dataset
print("=== DUPLICATE ANALYSIS ===")

# Check for exact duplicates
duplicate_count = trips_clean.count() - trips_clean.distinct().count()
print(f"Exact duplicates in trips: {duplicate_count:,}")

# Remove exact duplicates
if duplicate_count > 0:
    trips_clean = trips_clean.distinct()
    print(f"Rows after removing exact duplicates: {trips_clean.count():,}")

# Check for potential duplicate trips (same pickup/dropoff time and location)
potential_duplicates = trips_clean.groupBy(
    "pickup_datetime", "dropoff_datetime", 
    "pickup_location_id", "dropoff_location_id"
).count().filter(col("count") > 1)

print(f"Potential duplicate trips: {potential_duplicates.count():,}")

In [None]:
# Analyze column relevance
print("=== COLUMN RELEVANCE ANALYSIS ===")

trips_focused = trips_clean.select(
    "vendor_id",
    "pickup_datetime", 
    "dropoff_datetime",
    "passenger_count",
    "trip_distance", 
    "pickup_location_id",
    "dropoff_location_id",
    "rate_code",
    "payment_type",
    "fare_amount",
    "extra",
    "mta_tax", 
    "tip_amount",
    "tolls_amount",
    "imp_surcharge"
)

print(f"Focused dataset columns: {len(trips_focused.columns)}")
print(f"Focused dataset rows: {trips_focused.count():,}")

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

print("=== CREATING TRIP DURATION COLUMN ===")
# Convert pickup and dropoff columns to proper timestamp
trips_fixed = trips_df.withColumn(
    "pickup_ts", to_timestamp(col("pickup_datetime"), "M/d/yyyy H:mm")
).withColumn(
    "dropoff_ts", to_timestamp(col("dropoff_datetime"), "M/d/yyyy H:mm")
)

# create trip duration in minutes
trips_with_duration = trips_fixed.withColumn(
    "trip_duration_minutes",
    (unix_timestamp(col("dropoff_ts")) - unix_timestamp(col("pickup_ts"))) / 60
)

trips_with_duration.select("pickup_datetime", "dropoff_datetime", "trip_duration_minutes").show(5)
trips_with_duration.describe("trip_duration_minutes").show()

# Check for invalid durations (negative or extremely long)
invalid_durations = trips_with_duration.filter(
    (col("trip_duration_minutes") < 0) | (col("trip_duration_minutes") > 1440)  # More than 24 hours
).count()

print(f"Invalid trip durations (negative or >24 hours): {invalid_durations:,}")


In [None]:
from pyspark.sql.functions import col, lit, when

print("=== CREATING TOTAL TRIP COST COLUMN ===")

# Create total trip cost column (without coalesce)
trips_with_cost = trips_with_duration.withColumn(
    "total_trip_cost",
    col("fare_amount") + 
    col("extra") + 
    col("mta_tax") + 
    col("tip_amount") + 
    col("tolls_amount") +
    col("imp_surcharge")
)

# Show sample of cost breakdown
print("Sample cost breakdown:")
trips_with_cost.select(
    "fare_amount", "extra", "mta_tax", "tip_amount", 
    "tolls_amount", "imp_surcharge", "total_trip_cost"
).show(5)

# Check cost distribution
print("Cost ranges distribution:")
trips_with_cost.withColumn(
    "cost_range",
    when(col("total_trip_cost") <= 10, "$0-10")
    .when(col("total_trip_cost") <= 20, "$10-20")
    .when(col("total_trip_cost") <= 30, "$20-30")
    .when(col("total_trip_cost") <= 50, "$30-50")
    .when(col("total_trip_cost") <= 100, "$50-100")
    .otherwise("$100+")
).groupBy("cost_range").count().orderBy("cost_range").show()



In [None]:
from pyspark.sql.functions import col

print("=== JOINING TRIPS WITH ZONE DATA (DEDUPED) ===")

# Step 1: Deduplicate zone data
zones_df_dedup = zones_df.dropDuplicates(["zone_id"])
print("Zone data sample (after dedup):")
zones_df_dedup.show(5)
print(f"Total zones after dedup: {zones_df_dedup.count()}")

# Step 2: Prepare pickup zones with renamed columns
pickup_zones = zones_df_dedup.select(
    col("zone_id").alias("pickup_zone_id"),
    col("zone_name").alias("pickup_zone_name"),
    col("borough").alias("pickup_borough")
)

# Step 3: Join trips with pickup zone information
trips_with_pickup_zones = trips_with_cost.join(
    pickup_zones,
    trips_with_cost.pickup_location_id == pickup_zones.pickup_zone_id,
    "left"
).drop("pickup_zone_id")  # Remove duplicate zone_id column

print("After pickup zone join:")
print(f"Rows: {trips_with_pickup_zones.count():,}")

# Step 4: Prepare dropoff zones with renamed columns
dropoff_zones = zones_df_dedup.select(
    col("zone_id").alias("dropoff_zone_id"),
    col("zone_name").alias("dropoff_zone_name"),
    col("borough").alias("dropoff_borough")
)

# Step 5: Join with dropoff zone information
trips_enriched = trips_with_pickup_zones.join(
    dropoff_zones,
    trips_with_pickup_zones.dropoff_location_id == dropoff_zones.dropoff_zone_id,
    "left"
).drop("dropoff_zone_id")  # Remove duplicate zone_id column

print("After dropoff zone join:")
print(f"Rows: {trips_enriched.count():,}")

# Step 6: Show sample of enriched data
print("Sample enriched data:")
trips_enriched.select(
    "pickup_location_id", "pickup_zone_name", "pickup_borough",
    "dropoff_location_id", "dropoff_zone_name", "dropoff_borough",
    "fare_amount", "total_trip_cost"
).show(5, truncate=False)

# Step 7: Check for unmatched zones
unmatched_pickup = trips_enriched.filter(col("pickup_zone_name").isNull()).count()
unmatched_dropoff = trips_enriched.filter(col("dropoff_zone_name").isNull()).count()

print(f"Unmatched pickup zones: {unmatched_pickup:,}")
print(f"Unmatched dropoff zones: {unmatched_dropoff:,}")

print("✅ Zone data joined successfully (with dedup)")


In [None]:
print("=== CHECKING ORIGINAL DATETIME DATA ===")

# Let's go back to the original trips_df to see the raw datetime format
print("Original pickup_datetime values:")
trips_df.select("pickup_datetime").show(10, truncate=False)

print("Original dropoff_datetime values:")
trips_df.select("dropoff_datetime").show(10, truncate=False)

print("Original data types:")
for col_name, data_type in trips_df.dtypes:
    if 'datetime' in col_name:
        print(f"{col_name}: {data_type}")

In [None]:
print("=== STARTING FRESH WITH ORIGINAL DATA ===")

# Let's work with the original trips_df and add hour extraction
trips_with_hour = trips_df.withColumn(
    "pickup_hour_str",
    regexp_extract(col("pickup_datetime"), r"(\d{1,2}):(\d{2})", 1)
).withColumn(
    "pickup_hour",
    col("pickup_hour_str").cast("int")
)

print("Sample hour extraction from original data:")
trips_with_hour.select("pickup_datetime", "pickup_hour_str", "pickup_hour").show(10)

# Check how many valid hours we got
valid_hours = trips_with_hour.filter(
    (col("pickup_hour") >= 0) & (col("pickup_hour") <= 23)
).count()

total_rows = trips_with_hour.count()
print(f"Valid hours extracted: {valid_hours:,} out of {total_rows:,}")

In [None]:
print("=== CREATING TIME OF DAY COLUMN ===")

# Add time of day based on pickup hour
trips_with_time_of_day = trips_with_hour.withColumn(
    "time_of_day",
    when((col("pickup_hour") >= 6) & (col("pickup_hour") < 12), "Morning")
    .when((col("pickup_hour") >= 12) & (col("pickup_hour") < 18), "Afternoon") 
    .when((col("pickup_hour") >= 18) & (col("pickup_hour") < 22), "Evening")
    .otherwise("Night")
)

# Show time of day distribution
print("Time of day distribution:")
trips_with_time_of_day.groupBy("time_of_day").count().orderBy(
    when(col("time_of_day") == "Morning", 1)
    .when(col("time_of_day") == "Afternoon", 2)
    .when(col("time_of_day") == "Evening", 3)
    .otherwise(4)
).show()

# Show hourly breakdown
print("Hourly pickup distribution:")
trips_with_time_of_day.groupBy("pickup_hour", "time_of_day").count().orderBy("pickup_hour").show(24)

In [None]:
print("=== ADDING TRIP DURATION AND TOTAL COST ===")

# Add trip duration in minutes
trips_enhanced = trips_with_time_of_day.withColumn(
    "trip_duration_minutes",
    (unix_timestamp(to_timestamp(col("dropoff_datetime"), "M/d/yyyy H:mm")) - 
     unix_timestamp(to_timestamp(col("pickup_datetime"), "M/d/yyyy H:mm"))) / 60
).withColumn(
    "total_trip_cost",
    col("fare_amount") + 
    coalesce(col("extra"), lit(0)) + 
    coalesce(col("mta_tax"), lit(0)) + 
    coalesce(col("tip_amount"), lit(0)) + 
    coalesce(col("tolls_amount"), lit(0)) +
    coalesce(col("imp_surcharge"), lit(0))
)

# Show sample with new columns
print("Sample with duration and cost:")
trips_enhanced.select(
    "pickup_datetime", "dropoff_datetime", 
    "trip_duration_minutes", "fare_amount", "total_trip_cost"
).show(5)

# Check for invalid durations
invalid_durations = trips_enhanced.filter(
    (col("trip_duration_minutes") < 0) | 
    (col("trip_duration_minutes").isNull())
).count()

print(f"Invalid trip durations: {invalid_durations:,}")

In [None]:
print("=== JOINING WITH ZONE DATA ===")

# Join with pickup zones
trips_with_zones = trips_enhanced.join(
    zones_df.select(
        col("zone_id").alias("pickup_zone_id"),
        col("zone_name").alias("pickup_zone_name"),
        col("borough").alias("pickup_borough")
    ),
    trips_enhanced.pickup_location_id == col("pickup_zone_id"),
    "left"
).drop("pickup_zone_id")

# Join with dropoff zones
trips_final = trips_with_zones.join(
    zones_df.select(
        col("zone_id").alias("dropoff_zone_id"),
        col("zone_name").alias("dropoff_zone_name"),
        col("borough").alias("dropoff_borough")
    ),
    trips_with_zones.dropoff_location_id == col("dropoff_zone_id"),
    "left"
).drop("dropoff_zone_id")

print(f"Final dataset rows: {trips_final.count():,}")

# Check zone matching success
matched_pickup = trips_final.filter(col("pickup_zone_name").isNotNull()).count()
matched_dropoff = trips_final.filter(col("dropoff_zone_name").isNotNull()).count()

print(f"Matched pickup zones: {matched_pickup:,}")
print(f"Matched dropoff zones: {matched_dropoff:,}")

# Show sample with zone info
print("Sample with zone information:")
trips_final.select(
    "pickup_zone_name", "pickup_borough",
    "dropoff_zone_name", "dropoff_borough", 
    "time_of_day", "total_trip_cost"
).show(5, truncate=False)