In [0]:
import dlt
from pyspark.sql.functions import *

# Define data quality expectations
expectations = {
    "valid_trip_distance": "trip_distance > 0 AND trip_distance < 100",
    "valid_fare_amount": "fare_amount >= 0",
    "valid_datetime": "tpep_pickup_datetime < tpep_dropoff_datetime",
    "valid_passenger_count": "passenger_count > 0 AND passenger_count <= 8",
    "valid_locations": "PULocationID BETWEEN 1 AND 263 AND DOLocationID BETWEEN 1 AND 263"
}

@dlt.table(
    name="bronze_nyc_taxi",
    comment="Raw NYC taxi data from source files",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_nyc_taxi():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", "/mnt/delta/schemas/nyc_taxi/bronze")
        .load("abfss://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/")
        .withColumn("input_file_name", input_file_name())
        .withColumn("ingestion_timestamp", current_timestamp())
    )

@dlt.table(
    name="silver_nyc_taxi_clean",
    comment="Cleaned and validated NYC taxi data",
    table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
@dlt.expect_all(expectations)
def silver_nyc_taxi_clean():
    df = dlt.read_stream("bronze_nyc_taxi")
    
    return df.withColumn("trip_duration_min", 
               (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))/60) \
            .withColumn("is_airport_pickup", col("PULocationID").isin([1, 132, 138])) \
            .withColumn("time_of_day_slot", 
                when(hour("tpep_pickup_datetime").between(5,11), "Morning")
                .when(hour("tpep_pickup_datetime").between(12,16), "Afternoon")
                .when(hour("tpep_pickup_datetime").between(17,20), "Evening")
                .otherwise("Night")) \
            .withColumn("trip_id", sha2(concat_ws("||", 
                col("VendorID"), 
                col("tpep_pickup_datetime"), 
                col("tpep_dropoff_datetime")), 256))

@dlt.table(
    name="gold_daily_location",
    comment="Daily aggregates by pickup location",
    table_properties={
        "quality": "gold",
        "pipelines.autoOptimize.managed": "true"
    }
)
def gold_daily_location():
    df = dlt.read("silver_nyc_taxi_clean")
    
    return df.groupBy(
        to_date("tpep_pickup_datetime").alias("report_date"),
        col("PULocationID").alias("pickup_location_id")
    ).agg(
        count("*").alias("total_trips"),
        avg("fare_amount").alias("avg_fare_amount"),
        avg("trip_duration_min").alias("avg_trip_duration"),
        avg(when(col("tip_amount") > 0, 1).otherwise(0)).alias("tip_percentage")
    )

@dlt.table(
    name="gold_payment_analysis",
    comment="Payment type analysis by time of day",
    table_properties={
        "quality": "gold",
        "pipelines.autoOptimize.managed": "true"
    }
)
def gold_payment_analysis():
    df = dlt.read("silver_nyc_taxi_clean")
    
    return df.groupBy(
        "payment_type",
        "time_of_day_slot"
    ).agg(
        count("*").alias("trip_count"),
        avg("fare_amount").alias("avg_fare"),
        avg("tip_amount").alias("avg_tip"),
        avg(when(col("tip_amount") > 0, 1).otherwise(0)).alias("tip_percentage")
    ).orderBy("payment_type", "time_of_day_slot")