## inports

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime

## path

In [0]:
%run ./00_setup

## load bronze ingestion

In [0]:
df_bronze = spark.read.format("delta").load(bronze_write_path)
display(df_bronze.limit(5))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,_file_path,ingest_time,ingest_date
1,2022-12-01T00:37:35.000,2022-12-01T00:47:35.000,1.0,2.0,1.0,N,170,237,1,8.5,3.0,0.5,3.1,0.0,0.3,15.4,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2025-12-12
1,2022-12-01T00:34:35.000,2022-12-01T00:55:21.000,0.0,8.4,1.0,N,138,141,2,26.0,4.25,0.5,0.0,0.0,0.3,31.05,2.5,1.25,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2025-12-12
1,2022-12-01T00:33:26.000,2022-12-01T00:37:34.000,1.0,0.8,1.0,N,140,140,1,5.0,3.0,0.5,1.76,0.0,0.3,10.56,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2025-12-12
1,2022-12-01T00:45:51.000,2022-12-01T00:53:16.000,1.0,3.0,1.0,N,141,79,3,10.0,3.0,0.5,0.0,0.0,0.3,13.8,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2025-12-12
2,2022-12-01T00:49:49.000,2022-12-01T00:54:13.000,1.0,0.76,1.0,N,261,231,1,5.0,0.5,0.5,1.76,0.0,0.3,10.56,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2025-12-12


## casting and derived columns

In [0]:
df_clean = df_bronze.withColumn('trip_pickup_datetime',col('tpep_pickup_datetime').cast('timestamp')) \
    .drop('tpep_pickup_datetime') \
    .withColumn('trip_dropoff_datetime',col('tpep_dropoff_datetime').cast('timestamp')).drop('tpep_dropoff_datetime') \
    .withColumn('trip_duration',(unix_timestamp('trip_dropoff_datetime') - unix_timestamp('trip_pickup_datetime'))/60)\
    .withColumn('pickup_date',to_date('trip_pickup_datetime')) \
    .withColumn('pickup_hour',hour('trip_pickup_datetime')) \
    .withColumn('total_amount',col('total_amount').cast('Double')) \
    .withColumn('passenger_count',col('passenger_count').cast('Integer'))   \
    .drop('ingest_date')


## filtering

In [0]:
df_clean = ( df_clean.filter(col("trip_pickup_datetime").isNotNull())
            .filter(col("trip_dropoff_datetime").isNotNull())
            .filter(col("trip_duration") > 0)
            .filter(col("trip_distance") > 0)
            .filter(col("fare_amount") >= 0))
display(df_clean.limit(5))

VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,_file_path,ingest_time,trip_pickup_datetime,trip_dropoff_datetime,trip_duration,pickup_date,pickup_hour
1,1,2.0,1.0,N,170,237,1,8.5,3.0,0.5,3.1,0.0,0.3,15.4,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:37:35.000Z,2022-12-01T00:47:35.000Z,10.0,2022-12-01,0
1,0,8.4,1.0,N,138,141,2,26.0,4.25,0.5,0.0,0.0,0.3,31.05,2.5,1.25,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:34:35.000Z,2022-12-01T00:55:21.000Z,20.766666666666666,2022-12-01,0
1,1,0.8,1.0,N,140,140,1,5.0,3.0,0.5,1.76,0.0,0.3,10.56,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:33:26.000Z,2022-12-01T00:37:34.000Z,4.133333333333334,2022-12-01,0
1,1,3.0,1.0,N,141,79,3,10.0,3.0,0.5,0.0,0.0,0.3,13.8,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:45:51.000Z,2022-12-01T00:53:16.000Z,7.416666666666667,2022-12-01,0
2,1,0.76,1.0,N,261,231,1,5.0,0.5,0.5,1.76,0.0,0.3,10.56,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:49:49.000Z,2022-12-01T00:54:13.000Z,4.4,2022-12-01,0


## load taxi zone lookup

In [0]:
zone = spark.read.option('header',True).csv('/Volumes/workspace/sathish/trip/zone/taxi_zone_lookup.csv')

zone = zone.selectExpr("cast(locationID as int) as PULocationID", "Borough as pickup_borough","Zone as pickup_zone_name")
display(zone.limit(5))

PULocationID,pickup_borough,pickup_zone_name
1,EWR,Newark Airport
2,Queens,Jamaica Bay
3,Bronx,Allerton/Pelham Gardens
4,Manhattan,Alphabet City
5,Staten Island,Arden Heights


## join zone and cleaned df

In [0]:
df_join_zone = df_clean.join(zone,"PULocationID","left")
display(df_join_zone.limit(5))

PULocationID,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,_file_path,ingest_time,trip_pickup_datetime,trip_dropoff_datetime,trip_duration,pickup_date,pickup_hour,pickup_borough,pickup_zone_name
170,1,1,2.0,1.0,N,237,1,8.5,3.0,0.5,3.1,0.0,0.3,15.4,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:37:35.000Z,2022-12-01T00:47:35.000Z,10.0,2022-12-01,0,Manhattan,Murray Hill
138,1,0,8.4,1.0,N,141,2,26.0,4.25,0.5,0.0,0.0,0.3,31.05,2.5,1.25,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:34:35.000Z,2022-12-01T00:55:21.000Z,20.766666666666666,2022-12-01,0,Queens,LaGuardia Airport
140,1,1,0.8,1.0,N,140,1,5.0,3.0,0.5,1.76,0.0,0.3,10.56,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:33:26.000Z,2022-12-01T00:37:34.000Z,4.133333333333334,2022-12-01,0,Manhattan,Lenox Hill East
141,1,1,3.0,1.0,N,79,3,10.0,3.0,0.5,0.0,0.0,0.3,13.8,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:45:51.000Z,2022-12-01T00:53:16.000Z,7.416666666666667,2022-12-01,0,Manhattan,Lenox Hill West
261,2,1,0.76,1.0,N,231,1,5.0,0.5,0.5,1.76,0.0,0.3,10.56,2.5,0.0,dbfs:/Volumes/workspace/sathish/trip/input/yellow_tripdata_2022-12.parquet,2025-12-12T09:47:52.447Z,2022-12-01T00:49:49.000Z,2022-12-01T00:54:13.000Z,4.4,2022-12-01,0,Manhattan,World Trade Center


## write silver

In [0]:
df_join_zone.write \
    .format("delta") \
    .mode("overwrite") \
    .option('overwriteSchema',True) \
    .partitionBy("pickup_date") \
    .save(silver_write_path)

