# NYC Taxi Data Pipeline with Delta Lake (Medallion Architecture)

This notebook implements a **batch data pipeline** using Apache Spark and Delta Lake, structured around the **Medallion Architecture**:

- **Bronze Layer (Raw Ingestion)**  
  - Ingests raw NYC Taxi Parquet data into Delta format.  
  - Preserves the original schema for reproducibility and auditability.  
  - Provides a foundation for downstream transformations.

- **Silver Layer (Cleaned & Enriched)**  
  - Applies business rules and data quality filters (removes invalid trips, handles nulls).  
  - Derives key metrics such as trip duration, tip percentage, fare per mile, and pickup hour.  
  - Adds categorical labels (e.g., payment type) for interpretability.  
  - Produces a curated dataset ready for analytics.

- **Gold Layer (Business KPIs)**  
  - Aggregates Silver data into business‑ready insights:  
    - Hourly trip volume (demand analysis).  
    - Daily revenue (financial KPI).  
    - Tip behavior by payment type (customer behavior).  
    - Fare efficiency by zone (spatial analysis).  
  - Each output is stored as a Delta table, partitioned for scalability.

- **Governance & Maintenance**  
  - Uses **VACUUM** to safely remove obsolete files after retention period.  
  - Demonstrates **time travel** to query historical versions.  
  - Tracks operations with **DESCRIBE HISTORY** for audit and compliance.





# **Environment Setup**

In [97]:
# Initialize SparkSession with Delta Lake extensions (Spark 4.0)
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

spark.stop()  # ensure clean reset

builder = (
    SparkSession.builder
    .appName("batch-medallion-delta-ingestion")
    .master("local[*]")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.executor.memory", "2g")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


# **Bronze Layer: Ingesting Data**

In [98]:
#  Ingest raw NYC Taxi data and persist as Delta table
df_bronze = (
    spark.read.format("parquet")
    .load("/project/yellow_tripdata_2025-01.parquet")
)

(df_bronze.write
    .format("delta")
    .mode("overwrite")
    .save("/project/delta/bronze/trips"))


                                                                                

In [99]:
#Viewing the schema of the dataframe
df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- payment_type_label: string (nullable = false)
 |-- fare_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tip_percentage: double (nullable = true)
 |-- fare_per_mile: double (nullable = true)
 |-- total_fees: double (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = false)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- congestion_surcharge: dou

In [100]:
#Data profiling - row counts
df = spark.read.format("delta").load("/project/delta/bronze/trips")
df.count()


                                                                                

3475226

In [101]:
#Nulls per column
from pyspark.sql import functions as F
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()




+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       0|                   0|                    0|         540149|            0|    540149|            540149|           0|    

                                                                                

In [102]:
#Basic distributions
df.describe().show()




+-------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-------------------+-------------------+
|summary|          VendorID|   passenger_count|    trip_distance|       RatecodeID|store_and_fwd_flag|     PULocationID|      DOLocationID|      payment_type|       fare_amount|             extra|            mta_tax|        tip_amount|       tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|        Airport_fee| cbd_congestion_fee|
+-------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+---------------------+-------

                                                                                

##### **Trip calculation and profiling**

In [103]:
# Validation / Exploration Step: 
# Here we calculate trip duration in minutes using pickup and dropoff timestamps. 
# We use unix_timestamp to safely convert timestamp_ntz fields into epoch seconds. 
# This allows us to subtract times and derive trip duration. 
# The describe() summary provides validation of logical consistency (mean, min, max), 
# helping us detect anomalies such as negative durations or excessively long trips.

from pyspark.sql import functions as F

df = df.withColumn("trip_duration_minutes",
    (F.unix_timestamp("tpep_dropoff_datetime") -
     F.unix_timestamp("tpep_pickup_datetime")) / 60)

df.select("trip_duration_minutes").describe().show()


+-------+---------------------+
|summary|trip_duration_minutes|
+-------+---------------------+
|  count|              3475226|
|   mean|    15.01811561799608|
| stddev|    38.71358219498149|
|    min|  -51472.316666666666|
|    max|    5626.316666666667|
+-------+---------------------+



                                                                                

##### **Hourly pickup distribution**

In [104]:
# Exploration Step: 
# This block extracts the pickup hour from each trip timestamp. 
# Grouping by hour and counting trips reveals demand patterns across the day. 
# It validates timestamp parsing and provides exploratory insight into peak taxi activity. 
# These results can later inform business logic such as surge pricing or driver allocation.
df.withColumn("pickup_hour", F.hour("tpep_pickup_datetime")) \
  .groupBy("pickup_hour") \
  .count() \
  .orderBy("pickup_hour") \
  .show()


+-----------+------+
|pickup_hour| count|
+-----------+------+
|          0| 93417|
|          1| 64484|
|          2| 43929|
|          3| 28492|
|          4| 20033|
|          5| 22551|
|          6| 50026|
|          7|102581|
|          8|141305|
|          9|142877|
|         10|148316|
|         11|160076|
|         12|175432|
|         13|186144|
|         14|202289|
|         15|213694|
|         16|217051|
|         17|253518|
|         18|267951|
|         19|221055|
+-----------+------+
only showing top 20 rows


                                                                                

# **Data Transformation and Manipulation with Apache Spark (Silver)**

In [106]:
# Transform and enrich Bronze data (duration, tip %, fare/mile, pickup hour)
from pyspark.sql import functions as F
from delta.tables import DeltaTable

# Load Bronze data
df_bronze = spark.read.format("delta").load("/project/delta/bronze/trips")

df_silver = (
    df_bronze
    .withColumn("trip_duration_minutes",
        (F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime")) / 60
    )
    .withColumn("tip_percentage",
        F.when(F.col("fare_amount") > 0, (F.col("tip_amount") / F.col("fare_amount")) * 100)
    )
    .withColumn("fare_per_mile",
        F.when(F.col("trip_distance") > 0, F.col("fare_amount") / F.col("trip_distance"))
    )
    .withColumn("pickup_hour", F.hour("tpep_pickup_datetime"))
)

# Deduplicate source to avoid multiple matches
df_silver = df_silver.dropDuplicates(
    ["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "PULocationID", "DOLocationID"]
)

# Path to Silver table
silver_path = "/project/delta/silver/trips"

try:
    # Load existing Silver table
    silver_table = DeltaTable.forPath(spark, silver_path)

    # Perform MERGE (upsert)
    (
        silver_table.alias("target")
        .merge(
            df_silver.alias("source"),
            "target.VendorID = source.VendorID AND \
             target.tpep_pickup_datetime = source.tpep_pickup_datetime AND \
             target.tpep_dropoff_datetime = source.tpep_dropoff_datetime AND \
             target.PULocationID = source.PULocationID AND \
             target.DOLocationID = source.DOLocationID"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
except:
    # If Silver table doesn't exist yet, initialize it
    df_silver.write.format("delta").save(silver_path)


                                                                                

In [107]:


# Silver Layer: Placeholder for handling late-arriving data using MERGE (Delta Lake ready)

from delta.tables import DeltaTable

# Reference Silver Delta table
silver_table = DeltaTable.forPath(spark, "/project/delta/silver/trips")

# Placeholder logic — no late-arriving data loaded yet
# This block is Delta-ready and can be activated once late data is available

print("MERGE logic for late-arriving data is Delta-ready. No late batch loaded in this run.")


MERGE logic for late-arriving data is Delta-ready. No late batch loaded in this run.


#### **1) Filtering Data**

In [108]:

# We selectively apply filtering techniques that align with our schema and business logic:
# Basic condition filtering: Used to remove trips with negative duration or fare amounts.
# Compound condition filtering: Used to combine multiple business rules (e.g., duration and fare).
# Example: df.filter((F.col("fare_amount") >= 0) & (F.col("trip_duration_minutes") > 0))
# Filtering by list of values: Reserved for future use with categorical fields like RatecodeID or payment_type.
# Date range filtering: Will be applied later in Gold layer for time-based slicing.
# These choices ensure our Silver layer enforces business logic without replicating unnecessary cookbook examples.



from pyspark.sql import functions as F

# 1. Remove trips with negative or zero duration
df = df.filter(F.col("trip_duration_minutes") > 0)

# 2. Remove trips with negative fare amounts
df = df.filter(F.col("fare_amount") >= 0)

# 3. Remove trips with null passenger_count
df = df.filter(F.col("passenger_count").isNotNull())

# 4. Remove trips with zero distance
# (ensures only valid trips are retained)
df = df.filter(F.col("trip_distance") > 0)

# Show row count after filtering to validate impact
df.count()



                                                                                

2840016

#### **2) Handling Null Values**

In [109]:
# Drop rows with nulls in RatecodeID, PULocationID, DOLocationID, payment_type
df = df.dropna(subset=["RatecodeID", "PULocationID", "DOLocationID", "payment_type"])


In [110]:
# Fill nulls in store_and_fwd_flag with 'N' (default for no forwarding)
df = df.fillna({"store_and_fwd_flag": "N"})


In [111]:
# Fill nulls in optional fee columns with 0
fee_columns = ["congestion_surcharge", "Airport_fee", "cbd_congestion_fee"]
df = df.fillna({col: 0.0 for col in fee_columns})


In [112]:
# Show updated null counts
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()




+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|trip_duration_minutes|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+---------------------+
|       0|                   0|                    0|           

                                                                                

### **3) Basic Transformations**

In [113]:
# Tip percentage = (tip_amount / fare_amount) * 100
df = df.withColumn("tip_percentage", (F.col("tip_amount") / F.col("fare_amount")) * 100)


In [114]:
# Fare per mile = fare_amount / trip_distance
df = df.withColumn("fare_per_mile", F.col("fare_amount") / F.col("trip_distance"))


In [115]:
# Total fees (excluding fare) = extra + mta_tax + tolls_amount + improvement_surcharge + congestion_surcharge + Airport_fee + cbd_congestion_fee
df = df.withColumn("total_fees",
    F.col("extra") +
    F.col("mta_tax") +
    F.col("tolls_amount") +
    F.col("improvement_surcharge") +
    F.col("congestion_surcharge") +
    F.col("Airport_fee") +
    F.col("cbd_congestion_fee")
)


In [116]:
# Only apply if you suspect duplicate trips
df = df.dropDuplicates(["tpep_pickup_datetime", "tpep_dropoff_datetime", "PULocationID", "DOLocationID"])


In [117]:
# Sort by pickup time
df_sorted = df.orderBy("tpep_pickup_datetime")


In [118]:
#Data type normalization
from pyspark.sql.types import IntegerType

# Cast categorical IDs to IntegerType for clarity and efficiency
df = df.withColumn("RatecodeID", F.col("RatecodeID").cast(IntegerType()))
df = df.withColumn("payment_type", F.col("payment_type").cast(IntegerType()))


In [119]:
#Categorical Labelling
# Map payment_type codes to human-readable labels
#Makes aggregations and dashboards much more interpretable.

payment_type_map = {
    1: "Credit Card",
    2: "Cash",
    3: "No Charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided Trip"
}

df = df.withColumn("payment_type_label",
    F.when(F.col("payment_type") == 1, "Credit Card")
     .when(F.col("payment_type") == 2, "Cash")
     .when(F.col("payment_type") == 3, "No Charge")
     .when(F.col("payment_type") == 4, "Dispute")
     .when(F.col("payment_type") == 5, "Unknown")
     .when(F.col("payment_type") == 6, "Voided Trip")
     .otherwise("Other")
)


In [120]:
# Checking columns
df.columns


['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee',
 'cbd_congestion_fee',
 'trip_duration_minutes',
 'tip_percentage',
 'fare_per_mile',
 'total_fees',
 'payment_type_label']

In [121]:
#Safe fare per mile
#Derives the fare_per_mile metric by dividing fare amount by trip distance, with safe handling to avoid divide‑by‑zero errors.

df = df.withColumn("fare_per_mile",
    F.when(F.col("trip_distance") > 0, F.col("fare_amount") / F.col("trip_distance"))
     .otherwise(None)
)


In [122]:
#Calculates tip_percentage relative to fare amount, ensuring rows with zero fares are handled gracefully.
df = df.withColumn("tip_percentage",
    F.when(F.col("fare_amount") > 0, (F.col("tip_amount") / F.col("fare_amount")) * 100)
     .otherwise(None)
)


In [123]:
#Adds a pickup_hour column by extracting the hour of day from the pickup timestamp, enabling hourly demand analysis.

from pyspark.sql.functions import hour

df = df.withColumn("pickup_hour", hour("tpep_pickup_datetime"))


In [124]:
df = df.select(
    "VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime",
    "passenger_count", "trip_distance", "RatecodeID", "payment_type", "payment_type_label",
    "fare_amount", "tip_amount", "tip_percentage", "fare_per_mile", "total_fees",
    "PULocationID", "DOLocationID", "pickup_hour", "store_and_fwd_flag",
    "extra", "mta_tax", "tolls_amount", "improvement_surcharge",
    "congestion_surcharge", "Airport_fee", "cbd_congestion_fee", "total_amount"
)


In [125]:
#Displays the current list of DataFrame columns to validate schema after transformations and confirm inclusion of derived fields like pickup_hour, fare_per_mile, and tip_percentage.
df.columns   


['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'payment_type',
 'payment_type_label',
 'fare_amount',
 'tip_amount',
 'tip_percentage',
 'fare_per_mile',
 'total_fees',
 'PULocationID',
 'DOLocationID',
 'pickup_hour',
 'store_and_fwd_flag',
 'extra',
 'mta_tax',
 'tolls_amount',
 'improvement_surcharge',
 'congestion_surcharge',
 'Airport_fee',
 'cbd_congestion_fee',
 'total_amount']

In [126]:
df.show(5)



+--------+--------------------+---------------------+---------------+-------------+----------+------------+------------------+-----------+----------+------------------+------------------+------------------+------------+------------+-----------+------------------+-----+-------+------------+---------------------+--------------------+-----------+------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|payment_type|payment_type_label|fare_amount|tip_amount|    tip_percentage|     fare_per_mile|        total_fees|PULocationID|DOLocationID|pickup_hour|store_and_fwd_flag|extra|mta_tax|tolls_amount|improvement_surcharge|congestion_surcharge|Airport_fee|cbd_congestion_fee|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------+------------+------------------+-----------+----------+------------------+------------------+------------------+------------+------------+-----------+

                                                                                

# **Data Management with Delta Lake (Gold)**

In [127]:

# Hourly Trip Volume (pickup_hour aggregation)
df_silver = spark.read.format("delta").load("/project/delta/silver/trips")

df_gold_trip_volume = (
    df_silver.groupBy(F.to_date("tpep_pickup_datetime").alias("pickup_date"), "pickup_hour")
             .agg(F.count("*").alias("trip_count"))
)

(df_gold_trip_volume.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("pickup_date")
    .save("/project/delta/gold/trip_volume_hourly"))





                                                                                

In [128]:
#  Daily Revenue using total_amount
df_gold_revenue = (
    df_silver.groupBy(F.to_date("tpep_pickup_datetime").alias("pickup_date"))
             .agg(F.sum("total_amount").alias("daily_revenue"))
)

(df_gold_revenue.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("pickup_date")
    .save("/project/delta/gold/revenue_daily"))


                                                                                

In [129]:
# Tip Behavior by Payment Type (categorical aggregation)
from pyspark.sql import functions as F

# Reload Silver table
df_silver = spark.read.format("delta").load("/project/delta/silver/trips")

# Add payment_type_label column
df_silver = df_silver.withColumn(
    "payment_type_label",
    F.when(F.col("payment_type") == 1, "Credit Card")
     .when(F.col("payment_type") == 2, "Cash")
     .when(F.col("payment_type") == 3, "No Charge")
     .when(F.col("payment_type") == 4, "Dispute")
     .when(F.col("payment_type") == 5, "Unknown")
     .when(F.col("payment_type") == 6, "Voided Trip")
     .otherwise("Other")
)

# Now perform the aggregation
df_gold_tip_behavior = (
    df_silver.groupBy("payment_type_label")
             .agg(F.avg("tip_percentage").alias("avg_tip_percentage"))
)

(df_gold_tip_behavior.write
    .format("delta")
    .mode("overwrite")
    .save("/project/delta/gold/tip_behavior"))



                                                                                

In [130]:
## Fare Efficiency by Zone (Spatial Aggregation)
df_gold_fare_efficiency = (
    df.groupBy("PULocationID", "DOLocationID")
      .agg(F.avg("fare_per_mile").alias("avg_fare_per_mile"))
)

(df_gold_fare_efficiency
    .write
    .format("delta")
    .mode("overwrite")
    .save("/project/gold/fare_efficiency"))


                                                                                

# **Delta Lake Maintenance and Governance**

In [131]:
# 1. VACUUM: Remove obsolete files older than 7 days (safe retention period)
spark.sql("VACUUM delta.`/project/delta/silver/trips` RETAIN 168 HOURS")

# 2. Time Travel: Query Silver Layer as of a previous version
df_old = spark.read.format("delta").option("versionAsOf", 0).load("/project/delta/silver/trips")
df_old.show(5)

# 3. History: Inspect table versions and operations
spark.sql("DESCRIBE HISTORY delta.`/project/delta/silver/trips`").show(10, truncate=False)


                                                                                

Deleted 0 files and directories in a total of 1 directories.


                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+---------------------+--------------+-----------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|trip_duration_minutes|tip_percentage|    fare_per_mile|pickup_hour|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------------

##### **Checking History**

In [132]:
# Spark SQL
spark.sql("DESCRIBE HISTORY delta.`/project/delta/silver/trips`").show(50, truncate=False)




+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+-----------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------