In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

RAW_DATA_PATH = "/Volumes/workspace/default/taxi_data/raw"
DATABASE_NAME = "taxi_etl_db"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
spark.catalog.setCurrentDatabase(DATABASE_NAME)

taxi_schema = StructType([
    StructField("trip_id", StringType(), True),
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_type", StringType(), True),
])

bronze_df = (
    spark.read
        .format("csv")
        .option("header", "true")
        .schema(taxi_schema)
        .load(RAW_DATA_PATH)
        .withColumn("ingest_time", F.current_timestamp())
)

print("Bronze rows:", bronze_df.count())

bronze_df.write.mode("overwrite").format("delta").saveAsTable("taxi_bronze")

display(spark.table("taxi_bronze").limit(5))



Bronze rows: 500


trip_id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,fare_amount,total_amount,payment_type,ingest_time
trip_1,V2,2024-01-01T14:54:00.000Z,2024-01-01T15:31:00.000Z,4,12.02,26.71,28.54,CARD,2025-11-24T20:16:03.018Z
trip_2,V2,2024-01-01T13:53:00.000Z,2024-01-01T13:59:00.000Z,2,7.69,19.1,24.69,CASH,2025-11-24T20:16:03.018Z
trip_3,V1,2024-01-01T11:11:00.000Z,2024-01-01T11:37:00.000Z,1,10.11,24.01,30.95,CARD,2025-11-24T20:16:03.018Z
trip_4,V2,2024-01-01T15:13:00.000Z,2024-01-01T15:27:00.000Z,3,9.87,24.09,25.76,CASH,2025-11-24T20:16:03.018Z
trip_5,V1,2024-01-01T11:01:00.000Z,2024-01-01T11:10:00.000Z,1,8.9,21.24,27.54,CARD,2025-11-24T20:16:03.018Z


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

# make sure we're in the right DB
spark.catalog.setCurrentDatabase("taxi_etl_db")

bronze_df = spark.table("taxi_bronze")

# How many invalid rows?
invalid_count = bronze_df.filter("trip_distance <= 0 OR trip_distance IS NULL").count()
total_count   = bronze_df.count()
print(f"Invalid rows with non-positive trip_distance: {invalid_count} / {total_count}")

# Cleaned Silver table
silver_clean_df = (
    bronze_df
        .filter("trip_distance > 0")   # quality rule
        .withColumn(
            "ride_duration_min",
            (F.col("dropoff_datetime").cast("double") - F.col("pickup_datetime").cast("double")) / 60.0
        )
        .withColumn(
            "fare_per_km",
            F.when(F.col("trip_distance") > 0,
                   F.col("fare_amount") / F.col("trip_distance"))
             .otherwise(F.lit(None).cast("double"))
        )
)

silver_clean_df.write.mode("overwrite").format("delta").saveAsTable("taxi_silver_clean")

print("Silver clean rows:", silver_clean_df.count())
display(spark.table("taxi_silver_clean").limit(5))


Invalid rows with non-positive trip_distance: 33 / 500
Silver clean rows: 467


trip_id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,fare_amount,total_amount,payment_type,ingest_time,ride_duration_min,fare_per_km
trip_1,V2,2024-01-01T14:54:00.000Z,2024-01-01T15:31:00.000Z,4,12.02,26.71,28.54,CARD,2025-11-24T20:16:03.018Z,37.0,2.222129783693844
trip_2,V2,2024-01-01T13:53:00.000Z,2024-01-01T13:59:00.000Z,2,7.69,19.1,24.69,CASH,2025-11-24T20:16:03.018Z,6.0,2.4837451235370613
trip_3,V1,2024-01-01T11:11:00.000Z,2024-01-01T11:37:00.000Z,1,10.11,24.01,30.95,CARD,2025-11-24T20:16:03.018Z,26.0,2.3748763600395653
trip_4,V2,2024-01-01T15:13:00.000Z,2024-01-01T15:27:00.000Z,3,9.87,24.09,25.76,CASH,2025-11-24T20:16:03.018Z,14.0,2.440729483282675
trip_5,V1,2024-01-01T11:01:00.000Z,2024-01-01T11:10:00.000Z,1,8.9,21.24,27.54,CARD,2025-11-24T20:16:03.018Z,9.0,2.386516853932584


In [0]:
silver_clean_df = spark.table("taxi_silver_clean")

suspicious_df = (
    silver_clean_df
        .withColumn(
            "suspicious_fare_per_km",
            F.col("fare_per_km") > 10.0
        )
        .withColumn(
            "short_trip_high_fare",
            (F.col("trip_distance") < 1.0) & (F.col("fare_amount") > 25.0)
        )
        .withColumn(
            "is_suspicious_ride",
            F.col("suspicious_fare_per_km") | F.col("short_trip_high_fare")
        )
)

suspicious_df.write.mode("overwrite").format("delta").saveAsTable("taxi_silver_suspicious_rides")

print("Suspicious table rows:", suspicious_df.count())
display(
    spark.table("taxi_silver_suspicious_rides")
         .filter("is_suspicious_ride = true")
         .limit(10)
)


Suspicious table rows: 467


trip_id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,fare_amount,total_amount,payment_type,ingest_time,ride_duration_min,fare_per_km,suspicious_fare_per_km,short_trip_high_fare,is_suspicious_ride
trip_34,V2,2024-01-01T14:10:00.000Z,2024-01-01T14:42:00.000Z,2,0.36,4.81,5.54,CASH,2025-11-24T20:16:03.018Z,32.0,13.36111111111111,True,False,True
trip_69,V1,2024-01-01T14:06:00.000Z,2024-01-01T14:36:00.000Z,3,0.18,4.18,4.93,CASH,2025-11-24T20:16:03.018Z,30.0,23.22222222222222,True,False,True
trip_152,V2,2024-01-01T15:03:00.000Z,2024-01-01T15:40:00.000Z,3,0.29,5.02,6.2,CASH,2025-11-24T20:16:03.018Z,37.0,17.310344827586206,True,False,True
trip_191,V2,2024-01-01T16:23:00.000Z,2024-01-01T16:48:00.000Z,4,0.25,4.39,5.27,CARD,2025-11-24T20:16:03.018Z,25.0,17.56,True,False,True
trip_207,V2,2024-01-01T13:28:00.000Z,2024-01-01T14:06:00.000Z,3,0.37,6.41,6.71,CASH,2025-11-24T20:16:03.018Z,38.0,17.324324324324326,True,False,True
trip_222,V1,2024-01-01T15:20:00.000Z,2024-01-01T15:56:00.000Z,1,0.31,5.23,6.16,CASH,2025-11-24T20:16:03.018Z,36.0,16.870967741935484,True,False,True
trip_229,V2,2024-01-01T13:20:00.000Z,2024-01-01T13:36:00.000Z,2,0.28,4.31,4.54,CASH,2025-11-24T20:16:03.018Z,16.0,15.39285714285714,True,False,True
trip_246,V2,2024-01-01T17:02:00.000Z,2024-01-01T17:29:00.000Z,4,0.16,5.09,5.92,CARD,2025-11-24T20:16:03.018Z,27.0,31.8125,True,False,True
trip_362,V1,2024-01-01T17:29:00.000Z,2024-01-01T17:47:00.000Z,2,0.1,4.66,5.98,CASH,2025-11-24T20:16:03.018Z,18.0,46.6,True,False,True
trip_373,V1,2024-01-01T13:21:00.000Z,2024-01-01T13:44:00.000Z,2,0.24,4.63,4.8,CARD,2025-11-24T20:16:03.018Z,23.0,19.291666666666668,True,False,True


In [0]:
silver_clean_df = spark.table("taxi_silver_clean")

weekly_df = (
    silver_clean_df
        .withColumn("week_start", F.date_trunc("week", F.col("pickup_datetime")))
        .groupBy("week_start")
        .agg(
            F.count("*").alias("total_trips"),
            F.sum("total_amount").alias("total_revenue"),
            F.avg("fare_amount").alias("avg_fare_amount"),
            F.avg("trip_distance").alias("avg_trip_distance")
        )
)

weekly_df.write.mode("overwrite").format("delta").saveAsTable("taxi_silver_weekly_aggregates")

print("Weekly agg rows:", weekly_df.count())
display(spark.table("taxi_silver_weekly_aggregates").orderBy("week_start"))


Weekly agg rows: 1


week_start,total_trips,total_revenue,avg_fare_amount,avg_trip_distance
2024-01-01T00:00:00.000Z,467,10230.610000000006,19.07023554603854,7.578543897216274


In [0]:
silver_clean_df = spark.table("taxi_silver_clean")

gold_df = (
    silver_clean_df
        .select(
            "trip_id",
            "vendor_id",
            "pickup_datetime",
            "dropoff_datetime",
            "passenger_count",
            "trip_distance",
            "fare_amount",
            "total_amount",
            "payment_type"
        )
        .orderBy(F.col("fare_amount").desc())
        .limit(3)
)

gold_df.write.mode("overwrite").format("delta").saveAsTable("taxi_gold_top3_highest_fares")

print("Gold rows:", gold_df.count())
display(spark.table("taxi_gold_top3_highest_fares"))


Gold rows: 3


trip_id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,fare_amount,total_amount,payment_type
trip_421,V2,2024-01-01T10:45:00.000Z,2024-01-01T11:17:00.000Z,2,14.88,35.74,44.87,CASH
trip_453,V2,2024-01-01T14:50:00.000Z,2024-01-01T15:30:00.000Z,2,14.41,34.73,39.31,CARD
trip_253,V2,2024-01-01T13:42:00.000Z,2024-01-01T13:57:00.000Z,4,14.6,34.57,39.52,CASH
