In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, LongType

# Define schema
yellow_taxi_schema = StructType([
    StructField("VendorID", IntegerType(), True),                  # Vendor code
    StructField("tpep_pickup_datetime", TimestampType(), True),    # Pickup timestamp
    StructField("tpep_dropoff_datetime", TimestampType(), True),   # Dropoff timestamp
    StructField("passenger_count", LongType(), True),           # Number of passengers
    StructField("trip_distance", DoubleType(), True),              # Distance in miles
    StructField("RateCodeID", LongType(), True),                # Rate code
    StructField("store_and_fwd_flag", StringType(), True),         # Y/N
    StructField("PULocationID", IntegerType(), True),          # Dropoff longitude
    StructField("DOLocationID", IntegerType(), True),           # Dropoff latitude
    StructField("payment_type", LongType(), True),              # Payment type code
    StructField("fare_amount", DoubleType(), True),                # Meter fare
    StructField("extra", DoubleType(), True),                      # Extra charges
    StructField("mta_tax", DoubleType(), True),                    # MTA tax
    StructField("tip_amount", DoubleType(), True),                 # Tip
    StructField("tolls_amount", DoubleType(), True),               # Tolls
    StructField("improvement_surcharge", DoubleType(), True),      # Improvement surcharge
    StructField("total_amount", DoubleType(), True)    ,            # Total fare charged
    StructField("congestion_surcharge", DoubleType(), True)  ,              # Congestion_Surcharge
    StructField("Airport_fee", DoubleType(), True)    ,            # Airport_fee
    StructField("cbd_congestion_fee", DoubleType(), True)              # Cbd_Congestion_fee
])

In [0]:
import dlt

In [0]:

@dlt.table(
    name="nyc_taxi.default.bronze_data",
    comment="Bronze Table",
    table_properties={"quality": "bronze"}
)
def bronze_table():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .schema(yellow_taxi_schema)
        .load("/Volumes/nyc_taxi/nyc_taxi/data/")
    )

In [0]:
@dlt.table(
    name="nyc_taxi.default.silver_data",
    comment="Silver Table without null ids",
    table_properties={"quality": "silver"}
)
def silver_table():
    return (
        spark.readStream.table("nyc_taxi.default.bronze_data")
        .select("*")
        .where(
            "VendorID IS NOT NULL AND tpep_pickup_datetime IS NOT NULL AND tpep_dropoff_datetime IS NOT NULL AND (payment_type IS NULL OR payment_type in (0,1,2,3,4,5,6)) AND year(tpep_pickup_datetime)=2025 AND year(tpep_dropoff_datetime)=2025 AND year(tpep_pickup_datetime)<=year(tpep_dropoff_datetime)"
        )
    )

In [0]:
# Materialized View
import dlt
from pyspark.sql import functions as F

@dlt.table(
    name="nyc_taxi.default.daily_revenue",
    comment="Materialized view for daily revenue analytics",
    table_properties={"quality": "gold"}
)
def daily_revenue():
    return (
        dlt.read("nyc_taxi.default.silver_data")
        .groupBy("VendorID", F.to_date("tpep_pickup_datetime").alias("pickup_date"))
        .agg(
            F.sum("total_amount").alias("total_revenue"),
            F.avg("trip_distance").alias("avg_trip_distance"),
            F.count("*").alias("trip_count"),
            F.sum("tip_amount").alias("total_tip_amount")      # New aggregation
        )
    )
