In [0]:
from pyspark.sql.functions import *
from datetime import datetime
from delta.tables import DeltaTable

In [0]:
df_bronze=spark.table('nyc_taxi.bronze.data')

In [0]:
df_cleaned=df_bronze.drop('ehail_fee')

In [0]:
# filter bad records
df_cleaned = df_cleaned.filter(col("trip_distance") > 0)
df_cleaned = df_cleaned.filter(col("passenger_count").isNotNull() & (col("passenger_count") > 0))
df_cleaned = df_cleaned.filter(col("lpep_pickup_datetime").isNotNull())
df_cleaned = df_cleaned.filter(col("fare_amount") >= 0)
df_cleaned=df_cleaned.filter(col('trip_type').isNotNull())

In [0]:
# deduplication
dedup_cols = ["VendorID", "lpep_pickup_datetime", "trip_distance", "PULocationID", "DOLocationID"]
df_cleaned = df_cleaned.dropDuplicates(subset=dedup_cols)

In [0]:
# enrichments
df_cleaned = df_cleaned.withColumn("trip_duration_minutes", 
                   round((unix_timestamp("lpep_dropoff_datetime") - unix_timestamp("lpep_pickup_datetime")) / 60, 2))

df_cleaned = df_cleaned.withColumn("trip_speed_mph", 
                   round(col("trip_distance") / (col("trip_duration_minutes") / 60), 2))

df_cleaned = df_cleaned.withColumn("pickup_hour", hour("lpep_pickup_datetime")) \
       .withColumn("pickup_dayofweek", dayofweek("lpep_pickup_datetime")) \
       .withColumn("pickup_date", to_date("lpep_pickup_datetime"))


In [0]:
# surrogate key for trip
df_cleaned = df_cleaned.withColumn("trip_id", md5(concat_ws("_",
    *[col(c).cast("string") for c in dedup_cols]
)))

In [0]:
# column renaming for readability
df_cleaned=df_cleaned.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
       .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime") \
       .withColumnRenamed("RatecodeID", "rate_code_id") \
       .withColumnRenamed("PULocationID", "pickup_location_id") \
       .withColumnRenamed("DOLocationID", "dropoff_location_id") \
       .withColumnRenamed("VendorID", "vendor_id")

In [0]:
# Merge logic for incremental load
if spark.catalog.tableExists("nyc_taxi.silver.data"):
  delta_cleaned = DeltaTable.forName(spark, "nyc_taxi.silver.data")
  delta_cleaned.alias("target") \
    .merge(df_cleaned.alias("source"), "target.trip_id = source.trip_id") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

else:
    df_cleaned.write.format("delta").mode("overwrite").partitionBy("pickup_year","pickup_month").saveAsTable("nyc_taxi.silver.data")