### Load data from the Bronze Layer --> Silver Layer 

In [0]:
df = spark.read.table("workspace.raw_data_taxi.bronze_taxi_data")

### Create Meta Data Table 

In [0]:
df = df.toDF(*[c.lower() for c in df.columns])

In [0]:
from pyspark.sql.functions import current_timestamp
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Initial checkpoint (set to a very old date so first run loads everything)
metadata_df = spark.createDataFrame(
    [("fact_trips", "2000-01-01T00:00:00")],  # initial checkpoint
    ["table_name", "last_processed_ts"]
).withColumn("updated_at", current_timestamp())

metadata_df.write.mode("overwrite").format("delta").saveAsTable("workspace.raw_data_taxi.pipeline_metadata_checkpoint")

### Removing Negative values and zeros

In [0]:
from pyspark.sql.functions import col

df_casted = df \
    .withColumn("passenger_count", col("passenger_count").cast("int"))\
    .withColumn("trip_distance", col("trip_distance").cast("double")) \
    .withColumn("fare_amount", col("fare_amount").cast("double")) \
    .withColumn("total_amount", col("total_amount").cast("double"))

    
df_good = df_casted.filter(
    col("passenger_count").isNotNull() &
    col("trip_distance").isNotNull() &
    col("fare_amount").isNotNull() &
    col("total_amount").isNotNull() &
    col("tpep_pickup_datetime").isNotNull() &
    col("tpep_dropoff_datetime").isNotNull() &
    (col("passenger_count") > 0) &
    (col("trip_distance") > 0.0) &
    (col("fare_amount") >= 0.0) &
    (col("total_amount") >= 0.0) &
    (col("tpep_pickup_datetime") < col("tpep_dropoff_datetime"))
)



### Adding new color for better understanding

In [0]:
from pyspark.sql.functions import unix_timestamp, to_date, hour, round, to_timestamp

df_ready = df_good \
    .withColumn(
        "tpep_pickup_datetime",
        to_timestamp(col("tpep_pickup_datetime"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
    ) \
    .withColumn(
        "tpep_dropoff_datetime",
        to_timestamp(col("tpep_dropoff_datetime"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
    ) \
    .withColumn(
        "trip_duration_minutes",
        (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60
    ) \
    .withColumn("trip_date", to_date(col("tpep_pickup_datetime"))) \
    .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \
    .withColumn("trip_duration_minutes", round(col("trip_duration_minutes"), 2))


### Creating tripID

In [0]:
from pyspark.sql.functions import concat_ws, sha2

df_ready = df_ready.withColumn(
    "trip_id",
    sha2(
        concat_ws(
            "||",
            col("tpep_pickup_datetime"),
            col("tpep_dropoff_datetime"),
            col("vendorid"),
            col("pulocationid"),
            col("dolocationid")
        ),
        256
    )
)


### Removing duplicate tripID

In [0]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Deduplicate source by trip_id (keep latest pickup datetime)
windowSpec = Window.partitionBy("trip_id").orderBy(col("tpep_pickup_datetime").desc())

fact_trips_dedup = df_ready.withColumn("rn", row_number().over(windowSpec)) \
    .filter(col("rn") == 1) \
    .drop("rn")

### Normalizing the payment_type column

In [0]:
from pyspark.sql.functions import col, when

fact_trips_dedup = fact_trips_dedup.withColumn(
    "payment_type_normalized",
    when(col("payment_type") == 1, "Credit Card")
    .when(col("payment_type") == 2, "Cash")
    .when(col("payment_type") == 3, "UPI")
    .when(col("payment_type") == 4, "Debit Card")
    .otherwise("Other")
)

In [0]:
print(df_ready.columns)

['vendorid', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'ratecodeid', 'store_and_fwd_flag', 'pulocationid', 'dolocationid', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'trip_duration_minutes', 'trip_date', 'pickup_hour', 'trip_id']


### Rearranging the columns

In [0]:
new_order = [
    "trip_id", "vendorid",
    "trip_date", "pickup_hour", "trip_duration_minutes",
    "tpep_pickup_datetime", "tpep_dropoff_datetime",
    "passenger_count", "trip_distance", "ratecodeid", "store_and_fwd_flag",
    "pulocationid", "dolocationid",
    "payment_type", "payment_type_normalized",
    "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount",
    "improvement_surcharge", "total_amount",
    "congestion_surcharge"
]

df_reordered = fact_trips_dedup.select(*new_order)
df_reordered.display()

trip_id,vendorid,trip_date,pickup_hour,trip_duration_minutes,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,payment_type_normalized,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
008c503251cfd929305b21e4698fca64b3b59ed966e72fadb62e26d2266a9f46,2,2021-01-01,0,18.87,2021-01-01T00:31:40.000Z,2021-01-01T00:50:32.000Z,1,3.6,1,N,48,79,1,Credit Card,15.0,0.5,0.5,4.7,0.0,0.3,23.5,2.5
00abdead617377b6d2d2aa53c7c86c38bfdc4def6093131e6d066a655d5b7e10,2,2021-01-01,0,7.15,2021-01-01T00:46:36.000Z,2021-01-01T00:53:45.000Z,2,1.21,1,N,255,80,1,Credit Card,7.0,0.5,0.5,2.49,0.0,0.3,10.79,0.0
00e73106c67c2bac2d125abf1a45ef039257636c5295eb48d4885512820f3a35,1,2021-01-01,0,14.43,2021-01-01T00:12:48.000Z,2021-01-01T00:27:14.000Z,1,4.1,1,N,249,142,1,Credit Card,14.5,3.0,0.5,5.45,0.0,0.3,23.75,2.5
00f7629efe20dd3684449e4d278ba7e558b6aa11e4369a45866e93c72ac22f11,2,2021-01-01,0,12.47,2021-01-01T00:37:20.000Z,2021-01-01T00:49:48.000Z,1,1.99,1,N,238,43,1,Credit Card,10.0,0.5,0.5,2.76,0.0,0.3,16.56,2.5
0171352ea7422396adbf923b69700fbbeb7f6b98cd544ef8a5193acc71aadcf3,2,2021-01-01,1,12.48,2021-01-01T01:00:55.000Z,2021-01-01T01:13:24.000Z,2,3.79,1,N,140,7,1,Credit Card,13.0,0.5,0.5,5.04,0.0,0.3,21.84,2.5
0211e2d4bf05ede820868d170c8a14e01ad235b263a93eb24b201702663ec933,2,2021-01-01,0,7.25,2021-01-01T00:25:33.000Z,2021-01-01T00:32:48.000Z,2,1.96,1,N,263,238,2,Cash,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5
02244aed8ff0ac97fdf91872425216aeb251212f88426c1c5d7f6240fdf19559,2,2021-01-01,0,3.73,2021-01-01T00:03:01.000Z,2021-01-01T00:06:45.000Z,1,0.74,1,N,236,141,1,Credit Card,5.0,0.5,0.5,0.0,0.0,0.3,8.8,2.5
029b4d924846f8df8369207b8c9b3c17e90b4b721c5830060e42845aa5803d16,2,2021-01-01,0,15.65,2021-01-01T00:52:58.000Z,2021-01-01T01:08:37.000Z,1,4.17,1,N,48,263,1,Credit Card,15.0,0.5,0.5,0.0,0.0,0.3,18.8,2.5
02e8545cc8245a7ad0da72ea1a3089f3c76aab6a17f647c5b0050e1321dc8fbd,1,2021-01-01,0,6.82,2021-01-01T00:06:58.000Z,2021-01-01T00:13:47.000Z,1,1.2,1,N,166,41,2,Cash,7.0,0.5,0.5,0.0,0.0,0.3,8.3,0.0
034d885644be9a2988af32f4b958e4092db7b0f6b490dac39bf3ab4fcb80830d,2,2021-01-01,0,11.87,2021-01-01T00:05:10.000Z,2021-01-01T00:17:02.000Z,1,3.06,1,N,163,236,1,Credit Card,11.5,0.5,0.5,1.0,0.0,0.3,16.3,2.5


In [0]:
print(df_reordered.columns)

['trip_id', 'vendorid', 'trip_date', 'pickup_hour', 'trip_duration_minutes', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'ratecodeid', 'store_and_fwd_flag', 'pulocationid', 'dolocationid', 'payment_type', 'payment_type_normalized', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge']


### Update into metadata

In [0]:
from pyspark.sql.functions import col

meta = spark.read.table("workspace.raw_data_taxi.pipeline_metadata_checkpoint") \
    .filter(col("table_name") == "fact_trips") \
    .collect()[0]

last_processed = meta["last_processed_ts"]
fact_trips_increment = df_reordered.filter(col("tpep_pickup_datetime") > last_processed)

#### Removing any duplicate trip ID

In [0]:
fact_trips_increment = fact_trips_increment.dropDuplicates(["trip_id"])

### Overwriting the Table into delta table 

In [0]:
df_reordered.write.mode("overwrite").format("delta").saveAsTable("workspace.raw_data_taxi.silver_taxi_data")

In [0]:
print(df_reordered.columns)

['trip_id', 'vendorid', 'trip_date', 'pickup_hour', 'trip_duration_minutes', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'ratecodeid', 'store_and_fwd_flag', 'pulocationid', 'dolocationid', 'payment_type', 'payment_type_normalized', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge']


In [0]:
%sql
select * from workspace.raw_data_taxi.silver_taxi_data

trip_id,vendorid,trip_date,pickup_hour,trip_duration_minutes,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,payment_type_normalized,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
008c503251cfd929305b21e4698fca64b3b59ed966e72fadb62e26d2266a9f46,2,2021-01-01,0,18.87,2021-01-01T00:31:40.000Z,2021-01-01T00:50:32.000Z,1,3.6,1,N,48,79,1,Credit Card,15.0,0.5,0.5,4.7,0.0,0.3,23.5,2.5
00abdead617377b6d2d2aa53c7c86c38bfdc4def6093131e6d066a655d5b7e10,2,2021-01-01,0,7.15,2021-01-01T00:46:36.000Z,2021-01-01T00:53:45.000Z,2,1.21,1,N,255,80,1,Credit Card,7.0,0.5,0.5,2.49,0.0,0.3,10.79,0.0
00e73106c67c2bac2d125abf1a45ef039257636c5295eb48d4885512820f3a35,1,2021-01-01,0,14.43,2021-01-01T00:12:48.000Z,2021-01-01T00:27:14.000Z,1,4.1,1,N,249,142,1,Credit Card,14.5,3.0,0.5,5.45,0.0,0.3,23.75,2.5
00f7629efe20dd3684449e4d278ba7e558b6aa11e4369a45866e93c72ac22f11,2,2021-01-01,0,12.47,2021-01-01T00:37:20.000Z,2021-01-01T00:49:48.000Z,1,1.99,1,N,238,43,1,Credit Card,10.0,0.5,0.5,2.76,0.0,0.3,16.56,2.5
0171352ea7422396adbf923b69700fbbeb7f6b98cd544ef8a5193acc71aadcf3,2,2021-01-01,1,12.48,2021-01-01T01:00:55.000Z,2021-01-01T01:13:24.000Z,2,3.79,1,N,140,7,1,Credit Card,13.0,0.5,0.5,5.04,0.0,0.3,21.84,2.5
0211e2d4bf05ede820868d170c8a14e01ad235b263a93eb24b201702663ec933,2,2021-01-01,0,7.25,2021-01-01T00:25:33.000Z,2021-01-01T00:32:48.000Z,2,1.96,1,N,263,238,2,Cash,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5
02244aed8ff0ac97fdf91872425216aeb251212f88426c1c5d7f6240fdf19559,2,2021-01-01,0,3.73,2021-01-01T00:03:01.000Z,2021-01-01T00:06:45.000Z,1,0.74,1,N,236,141,1,Credit Card,5.0,0.5,0.5,0.0,0.0,0.3,8.8,2.5
029b4d924846f8df8369207b8c9b3c17e90b4b721c5830060e42845aa5803d16,2,2021-01-01,0,15.65,2021-01-01T00:52:58.000Z,2021-01-01T01:08:37.000Z,1,4.17,1,N,48,263,1,Credit Card,15.0,0.5,0.5,0.0,0.0,0.3,18.8,2.5
02e8545cc8245a7ad0da72ea1a3089f3c76aab6a17f647c5b0050e1321dc8fbd,1,2021-01-01,0,6.82,2021-01-01T00:06:58.000Z,2021-01-01T00:13:47.000Z,1,1.2,1,N,166,41,2,Cash,7.0,0.5,0.5,0.0,0.0,0.3,8.3,0.0
034d885644be9a2988af32f4b958e4092db7b0f6b490dac39bf3ab4fcb80830d,2,2021-01-01,0,11.87,2021-01-01T00:05:10.000Z,2021-01-01T00:17:02.000Z,1,3.06,1,N,163,236,1,Credit Card,11.5,0.5,0.5,1.0,0.0,0.3,16.3,2.5


In [0]:
latest_ts = fact_trips_increment.selectExpr("max(tpep_pickup_datetime)").collect()[0][0]

update_df = spark.createDataFrame(
    [("fact_trips", str(latest_ts))],
    ["table_name", "last_processed_ts"]
).withColumn("updated_at", current_timestamp())

update_df.write.mode("overwrite").format("delta").saveAsTable("workspace.raw_data_taxi.pipeline_metadata_checkpoint")

In [0]:
%sql
select * from workspace.raw_data_taxi.pipeline_metadata_checkpoint

table_name,last_processed_ts,updated_at
fact_trips,2021-01-01 01:00:55,2026-02-02T05:49:23.125Z
