In [0]:
import dlt
from pyspark.sql.functions import *

# Define the silver table using DLT
@dlt.table(
    name="silver_nyctaxi",
    comment="Silver table containing cleaned NYC taxi data from bronze layer",
    table_properties={
        "quality": "silver",
        "delta.tuneFileSizesForRewrites": "true",
        "delta.feature.timestampNtz": "supported"  # Enable timestampNtz feature
    }
)
def create_silver_nyctaxi():
    # Read the bronze table
    bronze_df = spark.read.format("delta").table("bronze_nyctaxi")
    
    # Basic transformation
    silver_df = (bronze_df
        # Apply data quality filters
        .filter(col("trip_distance") > 0)  # Remove invalid distances
        .filter(col("fare_amount") > 0)    # Remove invalid fares
        # Add processing timestamp
        .withColumn("processed_timestamp", current_timestamp())
    )
    
    return silver_df

In [0]:
import dlt
from pyspark.sql.functions import *

# Define the gold table using DLT
@dlt.table(
    name="gold_nyctaxi_summary",
    comment="Gold table with aggregated NYC taxi metrics by VendorID",
    table_properties={
        "quality": "gold",
        "delta.tuneFileSizesForRewrites": "true"
    }
)
def create_gold_nyctaxi_summary():
    # Read the silver table
    silver_df = dlt.read("silver_nyctaxi")
    
    # Aggregate data: sum total_amount, tolls_amount, Airport_fee; avg passenger_count; group by VendorID
    gold_df = (silver_df
        .groupBy("VendorID")
        .agg(
            sum("total_amount").alias("total_amount_sum"),
            sum("tolls_amount").alias("tolls_amount_sum"),
            sum("Airport_fee").alias("airport_fee_sum"),
            avg("passenger_count").alias("avg_passenger_count")
        )
    )
    
    return gold_df