In [0]:
import dlt
from pyspark.sql.functions import *

In [0]:
@dlt.table(
    name="silver.default.silvertripsclean",
    comment="Silver: cleaned trips with zone enrichment and quality rules",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_distance", "tripDistance >= 0")
@dlt.expect_or_drop("valid_fare", "fareAmount >= 0 AND totalAmount >= 0")
@dlt.expect_or_drop("valid_timestamps", 
    "pickupDatetime IS NOT NULL AND dropoffDatetime IS NOT NULL AND dropoffDatetime > pickupDatetime")
@dlt.expect_or_drop("valid_pu_location", "puLocationId IS NOT NULL")
@dlt.expect_or_drop("valid_do_location", "doLocationId IS NOT NULL")
def silvertripsclean():
    trips = dlt.read("bronze.default.yellow_taxi_tripdata_2025")
    zone = dlt.read("bronze.default.taxi_zone_lookup")

    zone_pu = zone.select(
        col("locationId").alias("puLocationId"),
        col("borough").alias("puBorough"),
        col("zone").alias("puZone"),
        col("serviceZone").alias("puServiceZone")
    )
    zone_do = zone.select(
        col("locationId").alias("doLocationId"),
        col("borough").alias("doBorough"),
        col("zone").alias("doZone"),
        col("serviceZone").alias("doServiceZone")
    )

    return (
        trips
        .join(zone_pu, "puLocationId", "left")
        .join(zone_do, "doLocationId", "left")
    )