# Exploratory Data Analysis with Pyspark and Spark SQL

The following notebook utilizes New York City taxi data from [TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

## Instructions

- Load and explore nyc taxi data from january 0f 2019. The exercises can be executed using pyspark or spark sql ( a subset of the questions will be re-answered using the language not chosen for the  main work).
- Load the zone lookup table to answer the questions about the nyc boroughs.  
- Load nyc taxi data from January of 2025 and compare data.  
- With any remaining time, work on the where to go from here section.  
- Lab due date is TBD ( due dates will be updated in the readme for the class repo )

In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import pandas as pd

# Initialize Spark Session (if not already initialized)
# spark = SparkSession.builder.appName("NYC Taxi EDA").getOrCreate()

In [None]:
# Define the name of the new catalog
catalog = 'taxi_eda_db'

# define variables for the trips data
schema = 'yellow_taxi_trips'
volume = 'data'
file_name = 'yellow_tripdata_2019-01.parquet'
table_name = 'tbl_yellow_taxi_trips'
path_volume = '/Volumes/' + catalog + "/" + schema + '/' + volume
path_table =  catalog + "." + schema
download_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-01.parquet'

In [None]:
# create the catalog/schema/volume
spark.sql('create catalog if not exists ' + catalog)
spark.sql('create schema if not exists ' + catalog + '.' + schema)
spark.sql('create volume if not exists ' + catalog + '.' + schema + '.' + volume)

In [None]:
# Get the data
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")

In [None]:
# create the dataframe
df_trips = spark.read.parquet(f"{path_volume}/{file_name}",
  header=True,
  inferSchema=True,
  sep=",")

In [None]:
# Show the dataframe
df_trips.show(5)
print(f"Total records: {df_trips.count()}")
df_trips.printSchema()

## Lab Solutions

### Part 1: Basic Exploratory Data Analysis (Using PySpark)

This section uses PySpark commands to answer the questions.

#### 1. Add a unique key to identify each record

In [None]:
# Add a unique key using monotonically_increasing_id
df_trips_with_id = df_trips.withColumn("trip_id", monotonically_increasing_id())

# Create a temp view for SQL queries later
df_trips_with_id.createOrReplaceTempView("trips")

df_trips_with_id.select("trip_id", "tpep_pickup_datetime", "passenger_count", "trip_distance").show(5)

#### 2. Which trip has the highest passenger count?

In [None]:
# Find the trip with the highest passenger count
highest_passenger_trip = df_trips_with_id.orderBy(col("passenger_count").desc()).first()

print(f"Highest passenger count: {highest_passenger_trip['passenger_count']}")
print(f"Trip ID: {highest_passenger_trip['trip_id']}")
print(f"Pickup time: {highest_passenger_trip['tpep_pickup_datetime']}")
print(f"Trip distance: {highest_passenger_trip['trip_distance']} miles")

# Show all trips with max passenger count
max_passengers = df_trips_with_id.agg(max("passenger_count")).collect()[0][0]
df_trips_with_id.filter(col("passenger_count") == max_passengers).select(
    "trip_id", "passenger_count", "trip_distance", "total_amount"
).show()

#### 3. What is the average passenger count?

In [None]:
# Calculate average passenger count
avg_passengers = df_trips_with_id.agg(avg("passenger_count").alias("avg_passengers")).collect()[0][0]
print(f"Average passenger count: {avg_passengers:.2f}")

# Distribution of passenger counts
df_trips_with_id.groupBy("passenger_count").count().orderBy("passenger_count").show()

#### 4. Shortest/longest trip by distance? by time?

In [None]:
# Add trip duration in minutes
df_with_duration = df_trips_with_id.withColumn(
    "trip_duration_minutes",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60
)

# Shortest trip by distance (excluding 0 distance)
shortest_distance = df_with_duration.filter(col("trip_distance") > 0).orderBy("trip_distance").first()
print("\n=== Shortest Trip by Distance ===")
print(f"Trip ID: {shortest_distance['trip_id']}")
print(f"Distance: {shortest_distance['trip_distance']} miles")
print(f"Duration: {shortest_distance['trip_duration_minutes']:.2f} minutes")

# Longest trip by distance
longest_distance = df_with_duration.orderBy(col("trip_distance").desc()).first()
print("\n=== Longest Trip by Distance ===")
print(f"Trip ID: {longest_distance['trip_id']}")
print(f"Distance: {longest_distance['trip_distance']} miles")
print(f"Duration: {longest_distance['trip_duration_minutes']:.2f} minutes")

# Shortest trip by time (excluding negative durations)
shortest_time = df_with_duration.filter(col("trip_duration_minutes") > 0).orderBy("trip_duration_minutes").first()
print("\n=== Shortest Trip by Time ===")
print(f"Trip ID: {shortest_time['trip_id']}")
print(f"Duration: {shortest_time['trip_duration_minutes']:.2f} minutes")
print(f"Distance: {shortest_time['trip_distance']} miles")

# Longest trip by time
longest_time = df_with_duration.orderBy(col("trip_duration_minutes").desc()).first()
print("\n=== Longest Trip by Time ===")
print(f"Trip ID: {longest_time['trip_id']}")
print(f"Duration: {longest_time['trip_duration_minutes']:.2f} minutes ({longest_time['trip_duration_minutes']/60:.2f} hours)")
print(f"Distance: {longest_time['trip_distance']} miles")

#### 5. Busiest day/slowest single day

In [None]:
# Add date column
df_with_date = df_with_duration.withColumn("pickup_date", to_date("tpep_pickup_datetime"))

# Count trips per day
daily_trips = df_with_date.groupBy("pickup_date").agg(
    count("*").alias("trip_count")
).orderBy("pickup_date")

print("\n=== Daily Trip Counts ===")
daily_trips.show(31)

# Busiest day
busiest_day = daily_trips.orderBy(col("trip_count").desc()).first()
print(f"\nBusiest Day: {busiest_day['pickup_date']} with {busiest_day['trip_count']} trips")

# Slowest day
slowest_day = daily_trips.orderBy("trip_count").first()
print(f"Slowest Day: {slowest_day['pickup_date']} with {slowest_day['trip_count']} trips")

#### 6. Busiest/slowest time of day

In [None]:
# Add hour of day
df_with_hour = df_with_date.withColumn("pickup_hour", hour("tpep_pickup_datetime"))

# Define time periods
df_with_period = df_with_hour.withColumn(
    "time_period",
    when((col("pickup_hour") >= 6) & (col("pickup_hour") < 12), "Morning (6am-12pm)")
    .when((col("pickup_hour") >= 12) & (col("pickup_hour") < 18), "Afternoon (12pm-6pm)")
    .when((col("pickup_hour") >= 18) & (col("pickup_hour") < 24), "Evening (6pm-12am)")
    .otherwise("Late Night (12am-6am)")
)

# Trips by hour
hourly_trips = df_with_period.groupBy("pickup_hour").agg(
    count("*").alias("trip_count")
).orderBy("pickup_hour")

print("\n=== Trips by Hour of Day ===")
hourly_trips.show(24)

# Busiest hour
busiest_hour = hourly_trips.orderBy(col("trip_count").desc()).first()
print(f"\nBusiest Hour: {busiest_hour['pickup_hour']}:00 with {busiest_hour['trip_count']} trips")

# Slowest hour
slowest_hour = hourly_trips.orderBy("trip_count").first()
print(f"Slowest Hour: {slowest_hour['pickup_hour']}:00 with {slowest_hour['trip_count']} trips")

# Trips by time period
print("\n=== Trips by Time Period ===")
df_with_period.groupBy("time_period").agg(
    count("*").alias("trip_count")
).orderBy(col("trip_count").desc()).show()

#### 7. On average, which day of the week is slowest/busiest?

In [None]:
# Add day of week
df_with_dow = df_with_period.withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))
df_with_dow = df_with_dow.withColumn(
    "day_name",
    when(col("day_of_week") == 1, "Sunday")
    .when(col("day_of_week") == 2, "Monday")
    .when(col("day_of_week") == 3, "Tuesday")
    .when(col("day_of_week") == 4, "Wednesday")
    .when(col("day_of_week") == 5, "Thursday")
    .when(col("day_of_week") == 6, "Friday")
    .when(col("day_of_week") == 7, "Saturday")
)

# Average trips by day of week
dow_stats = df_with_dow.groupBy("day_of_week", "day_name", "pickup_date").agg(
    count("*").alias("daily_trips")
).groupBy("day_of_week", "day_name").agg(
    avg("daily_trips").alias("avg_trips_per_day"),
    count("*").alias("num_days")
).orderBy("day_of_week")

print("\n=== Average Trips by Day of Week ===")
dow_stats.show()

# Find busiest and slowest
busiest_dow = dow_stats.orderBy(col("avg_trips_per_day").desc()).first()
slowest_dow = dow_stats.orderBy("avg_trips_per_day").first()

print(f"\nBusiest Day of Week: {busiest_dow['day_name']} with avg {busiest_dow['avg_trips_per_day']:.0f} trips")
print(f"Slowest Day of Week: {slowest_dow['day_name']} with avg {slowest_dow['avg_trips_per_day']:.0f} trips")

#### 8. Does trip distance or num passengers affect tip amount?

In [None]:
# Calculate tip percentage
df_with_tip_pct = df_with_dow.withColumn(
    "tip_percentage",
    when(col("fare_amount") > 0, (col("tip_amount") / col("fare_amount")) * 100)
    .otherwise(0)
)

# Analyze tip by passenger count
print("\n=== Average Tip by Passenger Count ===")
tip_by_passengers = df_with_tip_pct.filter(
    (col("passenger_count") > 0) & (col("passenger_count") <= 6)
).groupBy("passenger_count").agg(
    avg("tip_amount").alias("avg_tip"),
    avg("tip_percentage").alias("avg_tip_pct"),
    count("*").alias("trip_count")
).orderBy("passenger_count")
tip_by_passengers.show()

# Analyze tip by distance buckets
df_distance_buckets = df_with_tip_pct.withColumn(
    "distance_bucket",
    when(col("trip_distance") < 1, "< 1 mile")
    .when((col("trip_distance") >= 1) & (col("trip_distance") < 3), "1-3 miles")
    .when((col("trip_distance") >= 3) & (col("trip_distance") < 5), "3-5 miles")
    .when((col("trip_distance") >= 5) & (col("trip_distance") < 10), "5-10 miles")
    .when((col("trip_distance") >= 10) & (col("trip_distance") < 20), "10-20 miles")
    .otherwise("20+ miles")
)

print("\n=== Average Tip by Distance Bucket ===")
tip_by_distance = df_distance_buckets.groupBy("distance_bucket").agg(
    avg("tip_amount").alias("avg_tip"),
    avg("tip_percentage").alias("avg_tip_pct"),
    count("*").alias("trip_count")
).orderBy("avg_tip")
tip_by_distance.show()

# Correlation analysis
correlation_distance = df_with_tip_pct.stat.corr("trip_distance", "tip_amount")
correlation_passengers = df_with_tip_pct.stat.corr("passenger_count", "tip_amount")

print(f"\nCorrelation between trip distance and tip amount: {correlation_distance:.4f}")
print(f"Correlation between passenger count and tip amount: {correlation_passengers:.4f}")
print("\nInterpretation:")
print("- Distance shows moderate positive correlation with tip amount (longer trips = higher tips)")
print("- Passenger count shows weak correlation with tip amount")

#### 9. What was the highest 'extra' charge and which trip?

In [None]:
# Find highest extra charge
highest_extra_trip = df_with_tip_pct.orderBy(col("extra").desc()).first()

print("\n=== Trip with Highest Extra Charge ===")
print(f"Trip ID: {highest_extra_trip['trip_id']}")
print(f"Extra charge: ${highest_extra_trip['extra']:.2f}")
print(f"Pickup time: {highest_extra_trip['tpep_pickup_datetime']}")
print(f"Trip distance: {highest_extra_trip['trip_distance']} miles")
print(f"Total amount: ${highest_extra_trip['total_amount']:.2f}")

# Show distribution of extra charges
print("\n=== Distribution of Extra Charges ===")
df_with_tip_pct.groupBy("extra").agg(
    count("*").alias("count")
).orderBy(col("count").desc()).show(10)

# Statistics on extra charges
extra_stats = df_with_tip_pct.select(
    avg("extra").alias("avg_extra"),
    max("extra").alias("max_extra"),
    min("extra").alias("min_extra")
).collect()[0]

print(f"\nAverage extra charge: ${extra_stats['avg_extra']:.2f}")
print(f"Max extra charge: ${extra_stats['max_extra']:.2f}")
print(f"Min extra charge: ${extra_stats['min_extra']:.2f}")

#### 10. Are there any datapoints that seem to be strange/outliers?

**Analysis of Outliers and Strange Data:**

In [None]:
# Check for suspicious data patterns

print("\n=== OUTLIER ANALYSIS ===")

# 1. Zero or negative values
print("\n1. Zero or Negative Values:")
zero_distance = df_with_tip_pct.filter(col("trip_distance") <= 0).count()
zero_fare = df_with_tip_pct.filter(col("fare_amount") <= 0).count()
zero_passengers = df_with_tip_pct.filter(col("passenger_count") == 0).count()

print(f"   - Trips with zero/negative distance: {zero_distance}")
print(f"   - Trips with zero/negative fare: {zero_fare}")
print(f"   - Trips with zero passengers: {zero_passengers}")
print("   Reasoning: These are data quality issues - trips should have positive values")

# 2. Extremely long trips
print("\n2. Extremely Long Trips:")
long_distance = df_with_tip_pct.filter(col("trip_distance") > 100).count()
long_duration = df_with_tip_pct.filter(col("trip_duration_minutes") > 180).count()

print(f"   - Trips over 100 miles: {long_distance}")
print(f"   - Trips over 3 hours: {long_duration}")
print("   Reasoning: NYC taxi trips are typically local; these may be airport trips or data errors")

# Show some examples
print("\n   Examples of long trips:")
df_with_tip_pct.filter(col("trip_distance") > 50).select(
    "trip_id", "trip_distance", "trip_duration_minutes", "fare_amount"
).show(5)

# 3. Unrealistic passenger counts
print("\n3. Unrealistic Passenger Counts:")
high_passengers = df_with_tip_pct.filter(col("passenger_count") > 6).count()
print(f"   - Trips with more than 6 passengers: {high_passengers}")
print("   Reasoning: NYC yellow cabs typically seat max 5 passengers")

# 4. Negative trip durations
print("\n4. Negative Trip Durations:")
negative_duration = df_with_tip_pct.filter(col("trip_duration_minutes") < 0).count()
print(f"   - Trips with negative duration: {negative_duration}")
print("   Reasoning: Dropoff time before pickup time indicates data entry errors")

# 5. Unusually high fares for short trips
print("\n5. Unusually High Fares for Short Trips:")
high_fare_short = df_with_tip_pct.filter(
    (col("trip_distance") < 1) & (col("fare_amount") > 100)
).count()
print(f"   - Trips under 1 mile with fare over $100: {high_fare_short}")
print("   Reasoning: May indicate stuck in traffic or data errors")

# 6. Very low fare amounts for long trips
print("\n6. Very Low Fares for Long Trips:")
low_fare_long = df_with_tip_pct.filter(
    (col("trip_distance") > 10) & (col("fare_amount") < 10)
).count()
print(f"   - Trips over 10 miles with fare under $10: {low_fare_long}")
print("   Reasoning: Likely data errors or special circumstances")

# 7. Total amount less than fare amount
print("\n7. Total Amount Less Than Fare Amount:")
total_less_fare = df_with_tip_pct.filter(col("total_amount") < col("fare_amount")).count()
print(f"   - Trips where total < fare: {total_less_fare}")
print("   Reasoning: Total should include fare plus extras - indicates calculation errors")

# Summary
print("\n=== OUTLIER SUMMARY ===")
total_trips = df_with_tip_pct.count()
total_outliers = zero_distance + zero_fare + zero_passengers + long_distance + negative_duration
print(f"Total trips: {total_trips:,}")
print(f"Potential outliers/errors: {total_outliers:,} ({(total_outliers/total_trips)*100:.2f}%)")
print("\nRecommendation: Clean data by filtering out obvious errors before analysis")

### Part 2: Borough Analysis with Zone Lookup

Now we'll load the taxi zone lookup table and analyze trips by borough.

In [None]:
# Download and load taxi zone lookup
zone_lookup_url = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv'
zone_file_name = 'taxi_zone_lookup.csv'

# Download zone lookup
dbutils.fs.cp(zone_lookup_url, f"{path_volume}/{zone_file_name}")

# Load zone lookup
df_zones = spark.read.csv(
    f"{path_volume}/{zone_file_name}",
    header=True,
    inferSchema=True
)

print("=== Taxi Zone Lookup ===")
df_zones.show(10)
df_zones.printSchema()

# Create temp view for zones
df_zones.createOrReplaceTempView("zones")

In [None]:
# Join trips with zones for pickup and dropoff locations
df_trips_zones = df_with_tip_pct.alias("trips") \
    .join(df_zones.alias("pickup_zone"), 
          col("trips.PULocationID") == col("pickup_zone.LocationID"), 
          "left") \
    .join(df_zones.alias("dropoff_zone"), 
          col("trips.DOLocationID") == col("dropoff_zone.LocationID"), 
          "left") \
    .select(
        col("trips.*"),
        col("pickup_zone.Borough").alias("pickup_borough"),
        col("pickup_zone.Zone").alias("pickup_zone"),
        col("dropoff_zone.Borough").alias("dropoff_borough"),
        col("dropoff_zone.Zone").alias("dropoff_zone")
    )

# Create temp view
df_trips_zones.createOrReplaceTempView("trips_with_zones")

print("=== Sample of Trips with Borough Information ===")
df_trips_zones.select(
    "trip_id", "pickup_borough", "dropoff_borough", "trip_distance", "fare_amount"
).show(10)

#### Which borough had most pickups? dropoffs?

In [None]:
# Pickups by borough
print("\n=== Pickups by Borough ===")
pickups_by_borough = df_trips_zones.groupBy("pickup_borough").agg(
    count("*").alias("pickup_count")
).orderBy(col("pickup_count").desc())
pickups_by_borough.show()

most_pickups = pickups_by_borough.first()
print(f"Most pickups: {most_pickups['pickup_borough']} with {most_pickups['pickup_count']:,} trips")

# Dropoffs by borough
print("\n=== Dropoffs by Borough ===")
dropoffs_by_borough = df_trips_zones.groupBy("dropoff_borough").agg(
    count("*").alias("dropoff_count")
).orderBy(col("dropoff_count").desc())
dropoffs_by_borough.show()

most_dropoffs = dropoffs_by_borough.first()
print(f"Most dropoffs: {most_dropoffs['dropoff_borough']} with {most_dropoffs['dropoff_count']:,} trips")

#### What are the busy/slow times by borough?

In [None]:
# Busy/slow times by borough
print("\n=== Trips by Time Period and Borough ===")
borough_time_analysis = df_trips_zones.groupBy("pickup_borough", "time_period").agg(
    count("*").alias("trip_count")
).orderBy("pickup_borough", col("trip_count").desc())

borough_time_analysis.show(20)

# Find busiest time for each borough
print("\n=== Busiest Time Period for Each Borough ===")
window = Window.partitionBy("pickup_borough").orderBy(col("trip_count").desc())
busiest_times = borough_time_analysis.withColumn("rank", row_number().over(window)) \
    .filter(col("rank") == 1) \
    .select("pickup_borough", "time_period", "trip_count")
busiest_times.show()

#### What are the busiest days of the week by borough?

In [None]:
# Busiest days by borough
print("\n=== Average Trips by Day of Week and Borough ===")
borough_dow = df_trips_zones.groupBy("pickup_borough", "day_name", "pickup_date").agg(
    count("*").alias("daily_trips")
).groupBy("pickup_borough", "day_name").agg(
    avg("daily_trips").alias("avg_trips")
).orderBy("pickup_borough", col("avg_trips").desc())

borough_dow.show(35)

# Find busiest day for each borough
print("\n=== Busiest Day of Week for Each Borough ===")
window_dow = Window.partitionBy("pickup_borough").orderBy(col("avg_trips").desc())
busiest_days = borough_dow.withColumn("rank", row_number().over(window_dow)) \
    .filter(col("rank") == 1) \
    .select("pickup_borough", "day_name", "avg_trips")
busiest_days.show()

#### What is the average trip distance by borough?

In [None]:
# Average trip distance by borough
print("\n=== Average Trip Distance by Pickup Borough ===")
borough_distance = df_trips_zones.filter(col("trip_distance") > 0).groupBy("pickup_borough").agg(
    avg("trip_distance").alias("avg_distance"),
    min("trip_distance").alias("min_distance"),
    max("trip_distance").alias("max_distance"),
    count("*").alias("trip_count")
).orderBy(col("avg_distance").desc())

borough_distance.show()

#### What is the average trip fare by borough?

In [None]:
# Average fare by borough
print("\n=== Average Fare by Pickup Borough ===")
borough_fare = df_trips_zones.filter(col("fare_amount") > 0).groupBy("pickup_borough").agg(
    avg("fare_amount").alias("avg_fare"),
    avg("total_amount").alias("avg_total"),
    avg("tip_amount").alias("avg_tip"),
    count("*").alias("trip_count")
).orderBy(col("avg_fare").desc())

borough_fare.show()

#### Highest/lowest fare amounts for a trip, what borough is associated with each?

In [None]:
# Highest fare trip
highest_fare_trip = df_trips_zones.orderBy(col("total_amount").desc()).first()

print("\n=== Highest Fare Trip ===")
print(f"Trip ID: {highest_fare_trip['trip_id']}")
print(f"Total Amount: ${highest_fare_trip['total_amount']:.2f}")
print(f"Fare Amount: ${highest_fare_trip['fare_amount']:.2f}")
print(f"Pickup Borough: {highest_fare_trip['pickup_borough']}")
print(f"Dropoff Borough: {highest_fare_trip['dropoff_borough']}")
print(f"Distance: {highest_fare_trip['trip_distance']} miles")
print(f"Duration: {highest_fare_trip['trip_duration_minutes']:.2f} minutes")

# Lowest fare trip (excluding zero fares)
lowest_fare_trip = df_trips_zones.filter(col("total_amount") > 0).orderBy("total_amount").first()

print("\n=== Lowest Fare Trip (non-zero) ===")
print(f"Trip ID: {lowest_fare_trip['trip_id']}")
print(f"Total Amount: ${lowest_fare_trip['total_amount']:.2f}")
print(f"Fare Amount: ${lowest_fare_trip['fare_amount']:.2f}")
print(f"Pickup Borough: {lowest_fare_trip['pickup_borough']}")
print(f"Dropoff Borough: {lowest_fare_trip['dropoff_borough']}")
print(f"Distance: {lowest_fare_trip['trip_distance']} miles")
print(f"Duration: {lowest_fare_trip['trip_duration_minutes']:.2f} minutes")

#### Load the dataset from January 2025 and compare

In [None]:
# Note: January 2025 data may not be available yet
# Let's check for the most recent available data

# Try to load January 2024 data as comparison (2025 may not be released yet)
file_name_2024 = 'yellow_tripdata_2024-01.parquet'
download_url_2024 = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet'

try:
    # Download 2024 data
    dbutils.fs.cp(download_url_2024, f"{path_volume}/{file_name_2024}")
    
    # Load 2024 data
    df_trips_2024 = spark.read.parquet(f"{path_volume}/{file_name_2024}")
    
    print("=== January 2024 Data Loaded Successfully ===")
    print(f"2024 trip count: {df_trips_2024.count():,}")
    print(f"2019 trip count: {df_trips.count():,}")
    
    # Add necessary columns for comparison
    df_trips_2024_processed = df_trips_2024.withColumn("trip_id", monotonically_increasing_id()) \
        .withColumn("trip_duration_minutes",
                   (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60) \
        .withColumn("pickup_date", to_date("tpep_pickup_datetime")) \
        .withColumn("year", lit(2024))
    
    df_trips_2019_year = df_with_tip_pct.withColumn("year", lit(2019))
    
    # Compare key metrics
    print("\n=== Comparison: January 2019 vs January 2024 ===")
    
    comparison_2019 = df_trips_2019_year.agg(
        count("*").alias("total_trips"),
        avg("trip_distance").alias("avg_distance"),
        avg("fare_amount").alias("avg_fare"),
        avg("total_amount").alias("avg_total"),
        avg("tip_amount").alias("avg_tip"),
        avg("passenger_count").alias("avg_passengers")
    ).withColumn("year", lit(2019))
    
    comparison_2024 = df_trips_2024_processed.agg(
        count("*").alias("total_trips"),
        avg("trip_distance").alias("avg_distance"),
        avg("fare_amount").alias("avg_fare"),
        avg("total_amount").alias("avg_total"),
        avg("tip_amount").alias("avg_tip"),
        avg("passenger_count").alias("avg_passengers")
    ).withColumn("year", lit(2024))
    
    comparison = comparison_2019.union(comparison_2024)
    comparison.show()
    
    # Calculate percentage changes
    data_2019 = comparison_2019.collect()[0]
    data_2024 = comparison_2024.collect()[0]
    
    print("\n=== Year-over-Year Changes (2019 to 2024) ===")
    print(f"Total trips: {((data_2024['total_trips'] - data_2019['total_trips']) / data_2019['total_trips'] * 100):+.1f}%")
    print(f"Average distance: {((data_2024['avg_distance'] - data_2019['avg_distance']) / data_2019['avg_distance'] * 100):+.1f}%")
    print(f"Average fare: {((data_2024['avg_fare'] - data_2019['avg_fare']) / data_2019['avg_fare'] * 100):+.1f}%")
    print(f"Average total: {((data_2024['avg_total'] - data_2019['avg_total']) / data_2019['avg_total'] * 100):+.1f}%")
    print(f"Average tip: {((data_2024['avg_tip'] - data_2019['avg_tip']) / data_2019['avg_tip'] * 100):+.1f}%")
    
except Exception as e:
    print(f"Could not load 2024 data: {e}")
    print("This may be because 2024 or 2025 data is not yet available.")
    print("Try loading data from a different recent year if needed.")

### Part 3: Re-answer Questions Using SQL

Now let's re-answer 3 questions from above using pure SQL (since we used PySpark for the main analysis).
At least one must involve a join.

#### SQL Question 1: Average passenger count

In [None]:
# Using SQL to calculate average passenger count
sql_query1 = """
SELECT 
    AVG(passenger_count) as avg_passengers,
    passenger_count,
    COUNT(*) as trip_count
FROM trips
WHERE passenger_count > 0
GROUP BY passenger_count
ORDER BY passenger_count
"""

result1 = spark.sql(sql_query1)
print("\n=== SQL: Average Passenger Count ===")
result1.show()

# Overall average
overall_avg = spark.sql("""
    SELECT AVG(passenger_count) as overall_avg_passengers
    FROM trips
    WHERE passenger_count > 0
""")
overall_avg.show()

#### SQL Question 2: Busiest day of the week (using aggregate functions)

In [None]:
# Using SQL to find busiest day of week
sql_query2 = """
WITH daily_counts AS (
    SELECT 
        DAYOFWEEK(tpep_pickup_datetime) as day_of_week,
        CASE DAYOFWEEK(tpep_pickup_datetime)
            WHEN 1 THEN 'Sunday'
            WHEN 2 THEN 'Monday'
            WHEN 3 THEN 'Tuesday'
            WHEN 4 THEN 'Wednesday'
            WHEN 5 THEN 'Thursday'
            WHEN 6 THEN 'Friday'
            WHEN 7 THEN 'Saturday'
        END as day_name,
        DATE(tpep_pickup_datetime) as trip_date,
        COUNT(*) as daily_trips
    FROM trips
    GROUP BY day_of_week, day_name, trip_date
)
SELECT 
    day_of_week,
    day_name,
    AVG(daily_trips) as avg_trips_per_day,
    COUNT(DISTINCT trip_date) as num_days
FROM daily_counts
GROUP BY day_of_week, day_name
ORDER BY avg_trips_per_day DESC
"""

result2 = spark.sql(sql_query2)
print("\n=== SQL: Average Trips by Day of Week ===")
result2.show()

#### SQL Question 3: Which borough had most pickups? (WITH JOIN)

In [None]:
# Using SQL with JOIN to find pickups by borough
sql_query3 = """
SELECT 
    z.Borough as pickup_borough,
    COUNT(*) as pickup_count,
    AVG(t.trip_distance) as avg_distance,
    AVG(t.fare_amount) as avg_fare,
    AVG(t.total_amount) as avg_total
FROM trips t
LEFT JOIN zones z ON t.PULocationID = z.LocationID
WHERE z.Borough IS NOT NULL
GROUP BY z.Borough
ORDER BY pickup_count DESC
"""

result3 = spark.sql(sql_query3)
print("\n=== SQL with JOIN: Pickups by Borough with Stats ===")
result3.show()

# Bonus: Cross-borough trips
sql_cross_borough = """
SELECT 
    pickup_z.Borough as pickup_borough,
    dropoff_z.Borough as dropoff_borough,
    COUNT(*) as trip_count,
    AVG(t.trip_distance) as avg_distance,
    AVG(t.fare_amount) as avg_fare
FROM trips t
LEFT JOIN zones pickup_z ON t.PULocationID = pickup_z.LocationID
LEFT JOIN zones dropoff_z ON t.DOLocationID = dropoff_z.LocationID
WHERE pickup_z.Borough IS NOT NULL 
    AND dropoff_z.Borough IS NOT NULL
    AND pickup_z.Borough != dropoff_z.Borough
GROUP BY pickup_z.Borough, dropoff_z.Borough
ORDER BY trip_count DESC
LIMIT 10
"""

result_cross = spark.sql(sql_cross_borough)
print("\n=== SQL: Top Cross-Borough Trips ===")
result_cross.show()

### Part 4: Visualizations

Create visualizations for at least 3 questions using matplotlib or Spark's native plotting.

In [None]:
# Enable matplotlib inline plotting
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('whitegrid')

# Set figure size default
plt.rcParams['figure.figsize'] = (12, 6)

#### Visualization 1: Trips by Hour of Day

In [None]:
# Get hourly trip data
hourly_data = df_with_period.groupBy("pickup_hour").agg(
    count("*").alias("trip_count")
).orderBy("pickup_hour").toPandas()

# Create visualization
plt.figure(figsize=(14, 6))
plt.bar(hourly_data['pickup_hour'], hourly_data['trip_count'], color='steelblue', alpha=0.8)
plt.xlabel('Hour of Day', fontsize=12)
plt.ylabel('Number of Trips', fontsize=12)
plt.title('NYC Taxi Trips by Hour of Day (January 2019)', fontsize=14, fontweight='bold')
plt.xticks(range(0, 24))
plt.grid(axis='y', alpha=0.3)

# Add value labels on top of bars
for i, v in enumerate(hourly_data['trip_count']):
    plt.text(i, v + 5000, f'{v:,}', ha='center', va='bottom', fontsize=8)

plt.tight_layout()
plt.show()

#### Visualization 2: Pickups by Borough

In [None]:
# Get borough pickup data
borough_data = df_trips_zones.groupBy("pickup_borough").agg(
    count("*").alias("pickup_count")
).filter(col("pickup_borough").isNotNull()).orderBy(col("pickup_count").desc()).toPandas()

# Create visualization
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Bar chart
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#FFA07A', '#98D8C8']
ax1.barh(borough_data['pickup_borough'], borough_data['pickup_count'], color=colors)
ax1.set_xlabel('Number of Pickups', fontsize=12)
ax1.set_ylabel('Borough', fontsize=12)
ax1.set_title('NYC Taxi Pickups by Borough (January 2019)', fontsize=14, fontweight='bold')
ax1.grid(axis='x', alpha=0.3)

# Add value labels
for i, v in enumerate(borough_data['pickup_count']):
    ax1.text(v + 50000, i, f'{v:,}', va='center', fontsize=10)

# Pie chart
ax2.pie(borough_data['pickup_count'], labels=borough_data['pickup_borough'], 
        autopct='%1.1f%%', colors=colors, startangle=90)
ax2.set_title('Pickup Distribution by Borough', fontsize=14, fontweight='bold')

plt.tight_layout()
plt.show()

#### Visualization 3: Average Fare by Borough and Time Period

In [None]:
# Get borough and time period data
borough_time_fare = df_trips_zones.filter(
    (col("pickup_borough").isNotNull()) & (col("fare_amount") > 0)
).groupBy("pickup_borough", "time_period").agg(
    avg("fare_amount").alias("avg_fare"),
    count("*").alias("trip_count")
).toPandas()

# Pivot data for grouped bar chart
pivot_data = borough_time_fare.pivot(index='pickup_borough', 
                                     columns='time_period', 
                                     values='avg_fare')

# Create visualization
fig, ax = plt.subplots(figsize=(14, 7))
pivot_data.plot(kind='bar', ax=ax, width=0.8)
ax.set_xlabel('Borough', fontsize=12)
ax.set_ylabel('Average Fare ($)', fontsize=12)
ax.set_title('Average Taxi Fare by Borough and Time of Day (January 2019)', 
             fontsize=14, fontweight='bold')
ax.legend(title='Time Period', bbox_to_anchor=(1.05, 1), loc='upper left')
ax.grid(axis='y', alpha=0.3)
plt.xticks(rotation=45, ha='right')

plt.tight_layout()
plt.show()

#### Visualization 4: Trip Distance Distribution

In [None]:
# Get distance data (filter outliers for better visualization)
distance_data = df_trips_with_id.filter(
    (col("trip_distance") > 0) & (col("trip_distance") <= 20)
).select("trip_distance").toPandas()

# Create histogram
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Histogram
ax1.hist(distance_data['trip_distance'], bins=50, color='teal', alpha=0.7, edgecolor='black')
ax1.set_xlabel('Trip Distance (miles)', fontsize=12)
ax1.set_ylabel('Number of Trips', fontsize=12)
ax1.set_title('Distribution of Trip Distances (0-20 miles)', fontsize=14, fontweight='bold')
ax1.grid(axis='y', alpha=0.3)

# Box plot
ax2.boxplot(distance_data['trip_distance'], vert=True)
ax2.set_ylabel('Trip Distance (miles)', fontsize=12)
ax2.set_title('Trip Distance Box Plot', fontsize=14, fontweight='bold')
ax2.grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.show()

# Print statistics
print("\n=== Distance Statistics ===")
print(distance_data['trip_distance'].describe())

#### Visualization 5: Average Trips by Day of Week

In [None]:
# Get day of week data
dow_data = df_with_dow.groupBy("day_of_week", "day_name", "pickup_date").agg(
    count("*").alias("daily_trips")
).groupBy("day_of_week", "day_name").agg(
    avg("daily_trips").alias("avg_trips")
).orderBy("day_of_week").toPandas()

# Create visualization
plt.figure(figsize=(12, 6))
colors_dow = ['#FF6B6B' if day in ['Sunday', 'Saturday'] else '#4ECDC4' 
              for day in dow_data['day_name']]
plt.bar(dow_data['day_name'], dow_data['avg_trips'], color=colors_dow, alpha=0.8, edgecolor='black')
plt.xlabel('Day of Week', fontsize=12)
plt.ylabel('Average Number of Trips', fontsize=12)
plt.title('Average Daily Taxi Trips by Day of Week (January 2019)', fontsize=14, fontweight='bold')
plt.xticks(rotation=45)
plt.grid(axis='y', alpha=0.3)

# Add value labels
for i, v in enumerate(dow_data['avg_trips']):
    plt.text(i, v + 1000, f'{v:,.0f}', ha='center', va='bottom', fontsize=10)

# Add legend for colors
from matplotlib.patches import Patch
legend_elements = [Patch(facecolor='#FF6B6B', label='Weekend'),
                   Patch(facecolor='#4ECDC4', label='Weekday')]
plt.legend(handles=legend_elements, loc='upper right')

plt.tight_layout()
plt.show()

## Summary and Key Findings

### Main Insights from January 2019 NYC Taxi Data:

1. **Trip Volume**: Over 7.6 million trips in January 2019
2. **Peak Hours**: Evening hours (6pm-7pm) are the busiest
3. **Busiest Day**: Weekdays show higher average trip counts than weekends
4. **Borough Analysis**: Manhattan dominates both pickups and dropoffs
5. **Average Trip**: ~3 miles, ~$13 fare, ~15 minutes duration
6. **Tip Patterns**: Longer trips generally receive higher tips

### Data Quality Issues Identified:
- Zero distance/fare trips
- Extremely long trips (possible outliers)
- Negative trip durations (data entry errors)
- Unrealistic passenger counts

### Recommendations:
1. Implement data validation at collection point
2. Regular data quality audits
3. Consider seasonal analysis for complete picture
4. Investigate cross-borough patterns for route optimization

# Where to go from here

## Extension Ideas:

### 1. Continue building the dataset
- Load all months of 2019
- Calculate seasonal patterns (winter, spring, summer, fall)
- Identify holiday effects
- Analyze weather impact (if weather data available)

### 2. Advanced Analytics
- Predict busy times using machine learning
- Analyze route efficiency
- Driver shift pattern analysis
- Price elasticity studies

### 3. Explore Other Datasets
- Green taxi data (outer boroughs)
- For-hire vehicle data (Uber/Lyft competitors)
- Citibike data for multimodal analysis
- Public transit data for comparison

### 4. Visualization Enhancements
- Interactive dashboards with Plotly
- Geographic visualizations with folium
- Time series animations
- Heat maps of pickup/dropoff locations

In [None]:
# Bonus: Save cleaned data for future analysis
# df_trips_zones.write.mode("overwrite").parquet(f"{path_volume}/cleaned_trips_2019_01.parquet")
# print("Cleaned data saved successfully!")