In [0]:
# Import 
from pyspark.sql.functions import *


# List of paths for the large monthly trip data files (Parquet).
TRIP_DATA_PATHS = [
    f"/Volumes/nyc_yellow_tripdata/default/filestore/yellow_tripdata_2025-{month:02d}.parquet" 
    for month in range(1, 9) # This generates months 01-08
]

# Path for the Taxi Zone Lookup CSV
LOOKUP_TABLE_PATH = "/Volumes/nyc_yellow_tripdata/default/filestore/taxi_zone_lookup.csv"


# --- Ingestion: Read and Union Trip Data ---

print("Starting ingestion and union of 8 months of trip data...")


raw_trips_df = (spark.read
                .format("parquet")
                .load(TRIP_DATA_PATHS)
               )

# Verifying the load by counting and displaying a sample
total_records = raw_trips_df.count()
print(f"Total raw records loaded: {total_records}")

print("\nRaw Data Schema:")
# Print the data types to ensure they were inferred correctly
raw_trips_df.printSchema()

print("\nRaw Data Sample:")
raw_trips_df.limit(5).display()

In [0]:

print("Starting initial cleaning and filtering...")

# Filtering out records with invalid or impossible values ensures the integrity of all subsequent analytical calculations (e.g., averages, sums, and ML model training)

cleaned_trips_df = (raw_trips_df
    # TRIP DISTANCE FILTER
    # A trip distance of 0 is either an error or a canceled ride where the meter was not reset.
    # I am exclduing these records from the dataset as they create infinite speed values (distance/time) and skew metrics.
    .filter(col("trip_distance") > 0)
    
    # FARE AMOUNT FILTER
    # A fare of less than $1 suggests a free trip, a severe error, or a test transaction.
    .filter(col("fare_amount") >= 1.0)
    
    # PASSENGER COUNT FILTER
    # A passenger count of 0 is illogical for a completed trip and usually indicates a data entry error.
    .filter(col("passenger_count") > 0)
    
    # 4. PAYMENT TYPE FILTER
    # Focusing on payment types 1 (Credit Card) and 2 (Cash) provides the most reliable data
    # for revenue analysis. Types 3 (No Charge) and 4 (Dispute) are typically excluded from core revenue KPIs.
    .filter(col("payment_type").isin([1, 2])) 
    
    # COLUMN RENAMING
    # Renaming columns to a cleaner, more generalized standard to make the code easier to read
    # and maintain, as 'tpep_' prefixes are specific to "Yellow Taxi" data.
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
)

# Check removed rows
initial_records = raw_trips_df.count()
final_records = cleaned_trips_df.count()
print(f"Initial Records: {initial_records}")
print(f"Records after cleaning: {final_records}")
print(f"Records removed: {initial_records - final_records}")

# Display schema 
print("\nCleaned Data Schema:")
cleaned_trips_df.printSchema()

In [0]:
print("\nStarting feature engineering...")


transformed_df = (cleaned_trips_df
    # TRIP DURATION CALCULATION
    # Feature: 'trip_duration_seconds'
    # This feature is essential for time-based analysis (e.g., identifying high-traffic hours) and is necessary to calculate speed. PySpark `unix_timestamp` function for fast, distributed calculation.
    .withColumn(
        "trip_duration_seconds",
        unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime"))
    )
    
    # AVERAGE SPEED CALCULATION
    # Feature: 'avg_speed_mph'
    # Average speed is a powerful metric for analyzing city congestion, driver efficiency, and identifying potential outliers (like trips with impossible speeds).
    .withColumn(
        "avg_speed_mph",
        # Formula: distance / (duration_in_seconds / 3600 seconds per hour)
        col("trip_distance") / (col("trip_duration_seconds") / 3600)
    )
    
    #  FINAL DURATION FILTER
    # A trip lasting less than 60 seconds is physically unlikely for the NYC system. 
    # This filter removes more noise, ensuring we focus on legitimate completed trips.
    .filter(col("trip_duration_seconds") > 60) 
    
    # FINAL SPEED FILTER
    # Speeds over 60 MPH are nearly impossible within city limits. This acts as a final layer of quality control, identifying and removing trips that are likely GPS errors or data corruption.
    .filter(col("avg_speed_mph") <= 60) 
    
    # SELECT AND ORDER COLUMNS
    # Explicitly selecting columns ensures only relevant fields are carried forward, improving performance (by reducing data size) and providing a clean output schema for the next phase.
    .select(
        "VendorID", "pickup_datetime", "dropoff_datetime", 
        "PULocationID", "DOLocationID", "passenger_count",
        "trip_distance", "fare_amount", "tip_amount", 
        "total_amount", "payment_type", 
        "trip_duration_seconds", "avg_speed_mph"
    )
)

# Final verification of the total count after all filters
print(f"Final records after feature engineering and final speed filter: {transformed_df.count()}")
transformed_df.limit(5).display()

In [0]:
print("Starting ingestion of the small lookup table...")


# Read the CSV file (lookup table witj zip codes)
lookup_df = (spark.read
             .format("csv")
             .option("header", "true") #The first row is the header
             .option("inferSchema", "true") # guess the correct data types
             .load(LOOKUP_TABLE_PATH)
            )

print("\nLookup Table Schema:")
lookup_df.printSchema()

print("\nLookup Table Sample:")
lookup_df.limit(5).display()

In [0]:

print("\nJoining trip data with the lookup table...")

# This step creates the final, ready-for-analysis dataset.

# Create the final joined DataFrame
joined_df = (transformed_df
    # Join the main trip data with the lookup table on the Pickup Location ID (PULocationID)
    .join(
        lookup_df, 
        # Define the join condition: Trip's pickup ID must match the Lookup table's ID
        (transformed_df["PULocationID"] == lookup_df["LocationID"]), 
        "inner" # Inner join to ensure every trip has a valid, known zone name
    )
    .select(
        transformed_df["*"], # Keep all columns from the trip data
        lookup_df["Zone"].alias("Pickup_Zone"),
        lookup_df["Borough"].alias("Pickup_Borough")
    )
    # Drop the original PULocationID column as the zone name is more useful now
    .drop("PULocationID")
)

# Display a sample of the joined data 
print(f"Total records after join: {joined_df.count()}")
print("\nJoined Data Sample:")
joined_df.limit(5).display()