# NYC Taxi DLT Pipeline with Autoloader

This Delta Live Tables (DLT) pipeline implements true streaming using Databricks Autoloader:
- **Landing Zone**: Autoloader reads from volume files for true streaming
- **Bronze Layer**: Cleansed and validated taxi trip data
- **Data Quality**: Monitoring and validation

**Data Flow:**
1. Data export job writes batches to `/Volumes/nyc_trips_dev/bronze/landing_zone_nyctrips`
2. Autoloader detects new files and streams them
3. DLT processes streaming data through bronze layer

Pipeline defined in resources/ghithub_trends_digger.pipeline.yml


In [None]:
# Import required libraries
import dlt
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, coalesce, current_timestamp, lit, 
    count, countDistinct, approx_count_distinct, avg, min, max, date_trunc,
    expr, isnotnull
)
from pyspark.sql.types import TimestampType, DoubleType, IntegerType

# Get Spark session (works in DLT environment)
spark = SparkSession.getActiveSession()
if spark is None:
    spark = SparkSession.builder.getOrCreate()

# Add src path for custom modules
bundle_path = spark.conf.get("bundle.sourcePath", ".")
if bundle_path:
    sys.path.append(bundle_path)


In [None]:
# =============================================================================
# LANDING ZONE - Raw streaming data from volume files
# =============================================================================

@dlt.table(
    name="taxi_raw_landing",
    comment="Raw NYC taxi data streaming from volume files using Autoloader",
    table_properties={
        "quality": "bronze",
        "layer": "landing" 
    }
)
def taxi_raw_landing():
    """
    Stream raw taxi data from volume files using Databricks Autoloader.
    
    This table reads parquet files as they arrive in the landing zone volume,
    providing true streaming ingestion without full table refreshes.
    """
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", "/Volumes/nyc_trips_dev/bronze/landing_zone/_schemas")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.maxFilesPerTrigger", "10")  # Control batch size
        .load("/Volumes/nyc_trips_dev/bronze/landing_zone")
        .withColumn("dlt_load_timestamp", current_timestamp())
        .withColumn("dlt_file_path", col("_metadata.file_path"))
    )


In [None]:
# =============================================================================
# BRONZE LAYER - Cleansed and validated streaming data
# =============================================================================

@dlt.table(
    name="taxi_bronze",
    comment="Cleansed and validated NYC taxi trips with data quality checks",
    table_properties={
        "quality": "bronze",
        "layer": "bronze"
    }
)
@dlt.expect_or_fail("valid_pickup_datetime", "tpep_pickup_datetime IS NOT NULL")
@dlt.expect_or_fail("valid_dropoff_datetime", "tpep_dropoff_datetime IS NOT NULL") 
@dlt.expect_or_fail("valid_trip_distance", "trip_distance >= 0")
@dlt.expect_or_fail("valid_fare_amount", "fare_amount > 0")
@dlt.expect_or_drop("reasonable_fare", "fare_amount < 1000")  # Drop extreme outliers
@dlt.expect("valid_zip_codes", "pickup_zip > 0 AND dropoff_zip > 0")
def taxi_bronze():
    """
    Bronze layer with data quality validations applied to streaming taxi data.
    
    Quality Rules:
    - Fail pipeline if core datetime/amount fields are null/invalid
    - Drop records with unreasonable fare amounts (>$1000)
    - Warn on invalid zip codes but continue processing
    """
    return (
        dlt.read_stream("taxi_raw_landing")
        .select(
            col("tpep_pickup_datetime").cast(TimestampType()),
            col("tpep_dropoff_datetime").cast(TimestampType()),
            col("trip_distance").cast(DoubleType()),
            col("fare_amount").cast(DoubleType()),
            col("pickup_zip").cast(IntegerType()),
            col("dropoff_zip").cast(IntegerType()),
            col("export_timestamp"),
            col("batch_id"),
            col("source_system"),
            col("export_hour"),
            col("dlt_load_timestamp"),
            col("dlt_file_path")
        )
        .withColumn("trip_duration_minutes", 
                   (col("tpep_dropoff_datetime").cast("long") - 
                    col("tpep_pickup_datetime").cast("long")) / 60)
        .withColumn("is_valid_trip", 
                   when((col("trip_duration_minutes") > 0) & 
                        (col("trip_duration_minutes") < 600), True).otherwise(False))  # 0-10 hours
    )


In [None]:
# =============================================================================
# SILVER LAYER - Business logic transformations
# =============================================================================

@dlt.table(
    name="silver.taxi_silver_trips", 
    comment="Enriched taxi trips with business logic and aggregations",
    table_properties={
        "quality": "silver",
        "layer": "silver"
    }
)
def taxi_silver_trips():
    """
    Silver layer with business transformations:
    - Trip categorization (short/medium/long distance)
    - Fare efficiency metrics
    - Time-based enrichments
    """
    return (
        dlt.read_stream("taxi_bronze")
        .filter(col("is_valid_trip") == True)  # Only process valid trips
        .withColumn("distance_category",
                   when(col("trip_distance") <= 2, "short")
                   .when(col("trip_distance") <= 10, "medium") 
                   .otherwise("long"))
        .withColumn("fare_per_mile", 
                   when(col("trip_distance") > 0, 
                        col("fare_amount") / col("trip_distance"))
                   .otherwise(0))
        .withColumn("pickup_hour", expr("hour(tpep_pickup_datetime)"))
        .withColumn("pickup_day_of_week", expr("dayofweek(tpep_pickup_datetime)"))
        .withColumn("is_weekend", 
                   when(col("pickup_day_of_week").isin([1, 7]), True).otherwise(False))
        .withColumn("time_period",
                   when(col("pickup_hour").between(6, 11), "morning")
                   .when(col("pickup_hour").between(12, 17), "afternoon")
                   .when(col("pickup_hour").between(18, 22), "evening")
                   .otherwise("night"))
    )


In [None]:
# =============================================================================
# GOLD LAYER - Fixed version with watermark and approx_count_distinct
# =============================================================================

@dlt.table(
    name="gold.taxi_gold_hourly_metrics",
    comment="Hourly aggregated taxi trip metrics for analytics and reporting",
    table_properties={
        "quality": "gold",
        "layer": "gold"
    }
)
def taxi_gold_hourly_metrics():
    """
    Gold layer with hourly aggregations for business intelligence.
    
    Provides key metrics aggregated by hour for dashboard consumption.
    Uses watermark for streaming aggregations and approx_count_distinct for performance.
    """
    return (
        dlt.read_stream("silver.taxi_silver_trips")
        # Add watermark for streaming aggregations - allow up to 10 minutes of late data
        .withWatermark("tpep_pickup_datetime", "10 minutes")
        .groupBy(
            col("export_hour"),
            date_trunc("hour", col("tpep_pickup_datetime")).alias("pickup_hour_window"),
            col("time_period"),
            col("is_weekend")
        )
        .agg(
            count("*").alias("total_trips"),
            approx_count_distinct("batch_id").alias("unique_batches"),  # Use approx for streaming
            avg("trip_distance").alias("avg_distance"),
            avg("fare_amount").alias("avg_fare"),
            avg("trip_duration_minutes").alias("avg_duration_minutes"),
            avg("fare_per_mile").alias("avg_fare_per_mile"),
            min("tpep_pickup_datetime").alias("window_start"),
            max("tpep_pickup_datetime").alias("window_end"),
            
            # Distance category breakdown
            count(when(col("distance_category") == "short", 1)).alias("short_trips"),
            count(when(col("distance_category") == "medium", 1)).alias("medium_trips"),
            count(when(col("distance_category") == "long", 1)).alias("long_trips"),
            
            # Quality metrics
            count(when(col("is_valid_trip") == True, 1)).alias("valid_trips")
            # current_timestamp().alias("aggregation_timestamp")
        )
    )


In [None]:
# This cell has been replaced - see cell above for the corrected gold layer function
