In [0]:
# Import DLT and PySpark functions
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
@dlt.table(
    name="dev_1899989130012056.nyctaxi_dlt_python.taxi_trips_bronze",
    comment="Raw NYC taxi trip data from CSV files"
)
def taxi_trips_bronze():
    # Read streaming data from CSV files
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("header", "true")
        .load("/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-*.csv.gz")
        .withColumn("ingestion_time", current_timestamp())
        .withColumn("source_file", col("_metadata.file_path"))
    )

In [0]:
@dlt.table(
    name="dev_1899989130012056.nyctaxi_dlt_python.taxi_trips_silver",
    comment="Cleaned taxi trips with data quality rules"
)
@dlt.expect_or_drop("valid_fare", "fare_amount >= 0")
@dlt.expect_or_drop("valid_trip_distance", "trip_distance > 0")
def taxi_trips_silver():
    # Read from bronze table and clean the data
    df = dlt.readStream("taxi_trips_bronze")
    
    return (
        df.select(
            col("tpep_pickup_datetime").alias("pickup_time"),
            col("tpep_dropoff_datetime").alias("dropoff_time"),
            "passenger_count",
            "trip_distance", 
            "fare_amount",
            "tip_amount",
            "total_amount",
            "payment_type",
            when(col("payment_type") == 1, "Credit card")
            .when(col("payment_type") == 2, "Cash")
            .when(col("payment_type") == 3, "No charge")
            .when(col("payment_type") == 4, "Dispute")
            .otherwise("Unknown").alias("payment_method"),
            to_date("tpep_pickup_datetime").alias("trip_date")
        )
        .filter(year("tpep_pickup_datetime") == 2019)
    )

In [0]:
# Daily summary statistics
@dlt.table(
    name="dev_1899989130012056.nyctaxi_dlt_python.daily_taxi_summary",
    comment="Daily aggregated metrics for taxi trips"
)
def daily_taxi_summary():
    return (
        dlt.read("taxi_trips_silver")
        .groupBy("trip_date")
        .agg(
            count("*").alias("total_trips"),
            sum("fare_amount").alias("total_fares"),
            avg("trip_distance").alias("avg_distance"),
            avg("tip_amount").alias("avg_tip"),
            max("tip_amount").alias("max_tip"),
            countDistinct("payment_method").alias("payment_methods_used")
        )
    )

# Payment analysis table
@dlt.table(
    name="dev_1899989130012056.nyctaxi_dlt_python.payment_analysis",
    comment="Analysis of payment methods and tipping patterns"
)
def payment_analysis():
    df = dlt.read("taxi_trips_silver")
    
    # Calculate tip percentage
    df_with_tip_pct = df.withColumn(
        "tip_percentage",
        when(col("total_amount") > 0, (col("tip_amount") / col("total_amount") * 100))
        .otherwise(0)
    )
    
    return (
        df_with_tip_pct
        .groupBy("payment_method")
        .agg(
            count("*").alias("trip_count"),
            avg("total_amount").alias("avg_total_fare"),
            avg("tip_amount").alias("avg_tip"),
            avg("tip_percentage").alias("avg_tip_percentage"),
            sum("total_amount").alias("total_revenue")
        )
    )

# Hourly patterns table
@dlt.table(
    name="dev_1899989130012056.nyctaxi_dlt_python.hourly_patterns",
    comment="Taxi trip patterns by hour of day"
)
def hourly_patterns():
    df = dlt.read("taxi_trips_silver")
    
    return (
        df.withColumn("pickup_hour", hour("pickup_time"))
        .withColumn(
            "rush_hour_flag",
            when((hour("pickup_time").between(6, 9)) | (hour("pickup_time").between(17, 19)), 1)
            .otherwise(0)
        )
        .groupBy("pickup_hour")
        .agg(
            count("*").alias("trip_count"),
            avg("trip_distance").alias("avg_distance"),
            avg("total_amount").alias("avg_fare"),
            avg("rush_hour_flag").alias("rush_hour_percentage")
        )
        .orderBy("pickup_hour")
    )