In [0]:
spark


In [0]:
df_bronze = spark.table("taxi_data")

df_bronze.printSchema()
df_bronze.show(5)


In [0]:
from pyspark.sql.functions import col, to_timestamp, when

df_silver = (
    df_bronze
    .withColumn("pickup_datetime", to_timestamp(col("pickup_datetime")))
    .withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime")))
    .filter(col("trip_distance") > 0)
    .filter(col("fare_amount") > 0)
    .withColumn(
        "passenger_count",
        when(col("passenger_count").isNull(), 1)
        .otherwise(col("passenger_count"))
    )
)

df_silver.show(5)
df_silver.printSchema()


In [0]:
df_silver.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_taxi")


In [0]:
spark.table("silver_taxi").count()
spark.table("silver_taxi").describe().show()


In [0]:
from pyspark.sql.functions import count, avg, sum as _sum

df_gold = df_silver.agg(
    count("*").alias("total_trips"),
    avg("fare_amount").alias("avg_fare"),
    avg("trip_distance").alias("avg_trip_distance"),
    _sum("total_amount").alias("total_revenue")
)

df_gold.show()


In [0]:
df_gold.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_taxi_metrics")


In [0]:
%sql
SELECT * FROM gold_taxi_metrics;


In [0]:
from pyspark.sql.functions import col

# Data quality checks
dq_checks = {
    "null_pickup_time": df_silver.filter(col("pickup_datetime").isNull()).count(),
    "null_dropoff_time": df_silver.filter(col("dropoff_datetime").isNull()).count(),
    "negative_distance": df_silver.filter(col("trip_distance") <= 0).count(),
    "negative_fare": df_silver.filter(col("fare_amount") <= 0).count(),
    "invalid_passenger_count": df_silver.filter(col("passenger_count") <= 0).count()
}

dq_checks


In [0]:
from pyspark.sql.functions import (
    unix_timestamp, date_format, avg, sum as _sum, count
)

df_time_metrics = (
    df_silver
    .withColumn(
        "trip_duration_minutes",
        (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60
    )
    .withColumn("pickup_date", date_format("pickup_datetime", "yyyy-MM-dd"))
)

df_daily_gold = (
    df_time_metrics
    .groupBy("pickup_date")
    .agg(
        count("*").alias("total_trips"),
        avg("trip_duration_minutes").alias("avg_trip_duration_min"),
        _sum("total_amount").alias("daily_revenue")
    )
)

df_daily_gold.show()


In [0]:
df_daily_gold.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_taxi_daily_metrics")
