# NYC Yellow Taxi - Bronze Layer Ingestion Pipeline

## Overview

This notebook implements the **Bronze Layer** of the Medallion Architecture for NYC Yellow Taxi trip data.

### Pipeline Characteristics

| Attribute        | Value                                          |
| ---------------- | ---------------------------------------------- |
| **Source**       | NYC TLC Yellow Taxi Parquet files (2009-2025)  |
| **Target**       | Delta Lake Bronze Table                        |
| **Processing**   | Batch ingestion with schema evolution handling |
| **Partitioning** | By Year                                        |

### Schema Evolution Timeline

- **2009-2014**: Old format with lat/lon coordinates (`Start_Lon`, `End_Lat`, etc.)
- **2015+**: New format with LocationIDs (`PULocationID`, `DOLocationID`)
- **2015+**: Added `improvement_surcharge`
- **2019+**: Added `congestion_surcharge`
- **2025+**: Added `Airport_fee`, `cbd_congestion_fee`

### Bronze Layer Principles

1. **Raw data preservation** - All columns cast to STRING
2. **Schema-on-read** - No type enforcement at ingestion
3. **Full lineage** - Source file tracking via `source_file` column
4. **Idempotent** - Re-runnable without side effects

---


## 1. Configuration & Setup


In [None]:
"""
Configuration and environment setup for Bronze layer ingestion.
"""
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any
import logging
import os
import sys


# ============================================================================
# PIPELINE CONFIGURATION
# ============================================================================
@dataclass
class PipelineConfig:
    """Centralized configuration for the Bronze ingestion pipeline."""

    # Pipeline metadata
    pipeline_name: str = "nyc_yellow_taxi_bronze_ingestion"
    pipeline_version: str = "1.3.0"  # Updated for AMD EPYC optimization
    environment: str = field(default_factory=lambda: os.getenv("ENV", "development"))

    # Spark configuration - AMD EPYC 7742 (64 cores) + 128GB RAM
    # Reserve 8 cores for Linux GUI + OS, use 56 for Spark
    app_name: str = "NYCTaxiBronze"
    driver_memory: str = "16g"    # Driver just coordinates
    executor_memory: str = "80g"  # Executor does heavy lifting
    executor_cores: int = 56      # 64 - 8 reserved for OS/GUI
    parallelism: int = 56         # Match available cores
    shuffle_partitions: int = 112 # 2x cores for shuffle operations
    
    # Batch processing - process files in chunks to avoid OOM
    batch_size: int = 20  # Process 20 files at a time

    # Data paths (will be set from settings)
    source_dir: Optional[Path] = None
    target_dir: Optional[Path] = None

    # Processing options
    write_mode: str = "overwrite"  # overwrite, append
    partition_by: List[str] = field(default_factory=lambda: ["year"])
    enable_data_quality_checks: bool = True

    # Schema version detection by year
    # V1: 2009 (vendor_name, Trip_Pickup_DateTime)
    # V2: 2010 (vendor_id, pickup_datetime) 
    # V3: 2011+ (VendorID, tpep_pickup_datetime)
    v1_years: List[str] = field(default_factory=lambda: ["2009"])
    v2_years: List[str] = field(default_factory=lambda: ["2010"])
    # All other years are V3 (new format)

    def __post_init__(self):
        """Validate configuration after initialization."""
        assert self.write_mode in ["overwrite", "append"], (
            f"Invalid write_mode: {self.write_mode}"
        )
        assert self.driver_memory.endswith("g"), (
            "Memory should be specified in gigabytes (e.g., '8g')"
        )


# ============================================================================
# LOGGING SETUP
# ============================================================================
def setup_logging(config: PipelineConfig) -> logging.Logger:
    """Configure logging for the pipeline."""
    logger = logging.getLogger(config.pipeline_name)
    logger.setLevel(logging.INFO)

    # Clear existing handlers
    logger.handlers = []

    # Console handler with formatting
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(
        fmt="%(asctime)s | %(levelname)-8s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    return logger


# Initialize configuration and logger
config = PipelineConfig()
logger = setup_logging(config)

logger.info("=" * 60)
logger.info(f"Pipeline: {config.pipeline_name} v{config.pipeline_version}")
logger.info(f"Environment: {config.environment}")
logger.info(f"Started at: {datetime.now().isoformat()}")
logger.info(f"Memory: Driver={config.driver_memory}, Executor={config.executor_memory}")
logger.info(f"CPU: {config.executor_cores}/64 cores (8 reserved for OS/GUI)")
logger.info(f"Batch size: {config.batch_size} files per batch")
logger.info("Schema versions: V1 (2009), V2 (2010), V3 (2011+)")
logger.info("=" * 60)

2025-11-29 14:40:20 | INFO     | Pipeline: nyc_yellow_taxi_bronze_ingestion v1.2.0
2025-11-29 14:40:20 | INFO     | Environment: development
2025-11-29 14:40:20 | INFO     | Started at: 2025-11-29T14:40:20.832415
2025-11-29 14:40:20 | INFO     | Memory: Driver=64g, Executor=64g
2025-11-29 14:40:20 | INFO     | Batch size: 20 files per batch
2025-11-29 14:40:20 | INFO     | Schema versions: V1 (2009), V2 (2010), V3 (2011+)
2025-11-29 14:40:20 | INFO     | Pipeline: nyc_yellow_taxi_bronze_ingestion v1.2.0
2025-11-29 14:40:20 | INFO     | Environment: development
2025-11-29 14:40:20 | INFO     | Started at: 2025-11-29T14:40:20.832415
2025-11-29 14:40:20 | INFO     | Memory: Driver=64g, Executor=64g
2025-11-29 14:40:20 | INFO     | Batch size: 20 files per batch
2025-11-29 14:40:20 | INFO     | Schema versions: V1 (2009), V2 (2010), V3 (2011+)


In [None]:
"""
Spark Session initialization with Delta Lake support.

⚠️ IMPORTANT: You MUST restart the kernel for memory/CPU changes to take effect!
   Kernel → Restart Kernel, then run all cells from the beginning.

Optimized for: AMD EPYC 7742 (64 cores) + 128GB RAM
"""
from delta.pip_utils import configure_spark_with_delta_pip
import pyspark.sql


def create_spark_session(config: PipelineConfig) -> pyspark.sql.SparkSession:
    """
    Create and configure Spark session with Delta Lake support.

    Args:
        config: Pipeline configuration object

    Returns:
        Configured SparkSession
    """
    logger.info("Initializing Spark session...")

    builder = (
        pyspark.sql.SparkSession.builder.appName(config.app_name)
        # Delta Lake extensions
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
        # Memory configuration
        .config("spark.driver.memory", config.driver_memory)
        .config("spark.executor.memory", config.executor_memory)
        .config("spark.driver.maxResultSize", "4g")
        .config("spark.memory.fraction", "0.8")
        .config("spark.memory.storageFraction", "0.2")
        # Off-heap memory for extra headroom
        .config("spark.memory.offHeap.enabled", "true")
        .config("spark.memory.offHeap.size", "16g")
        # CPU/Core configuration - AMD EPYC 7742 optimization
        .config("spark.executor.cores", str(config.executor_cores))
        .config("spark.default.parallelism", str(config.parallelism))
        .config("spark.sql.shuffle.partitions", str(config.shuffle_partitions))
        # Performance tuning
        .config("spark.sql.parquet.mergeSchema", "false")
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.sql.adaptive.skewJoin.enabled", "true")
        # Delta optimizations
        .config("spark.databricks.delta.optimizeWrite.enabled", "true")
        .config("spark.databricks.delta.autoCompact.enabled", "true")
        # File handling
        .config("spark.sql.files.maxPartitionBytes", "128m")
    )

    spark = configure_spark_with_delta_pip(builder).getOrCreate()

    logger.info(f"Spark version: {spark.version}")
    logger.info(f"Driver memory: {config.driver_memory}")
    logger.info(f"Executor memory: {config.executor_memory}")
    logger.info(f"Executor cores: {config.executor_cores}")
    logger.info(f"Default parallelism: {config.parallelism}")
    logger.info(f"Shuffle partitions: {config.shuffle_partitions}")

    return spark


# Create Spark session
spark = create_spark_session(config)

2025-11-29 14:40:21 | INFO     | Initializing Spark session...


:: loading settings :: url = jar:file:/home/administrator/Desktop/datascience/github/nyc-taxi-eta/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/administrator/.ivy2.5.2/cache
The jars for the packages stored in: /home/administrator/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f54775f7-b2c0-46fb-b231-9c7f10d4ba46;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
:: loading settings :: url = jar:file:/home/administrator/Desktop/datascience/github/nyc-taxi-eta/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/administrator/.ivy2.5.2/cache
The jars for the packages stored in: /home/administrator/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spa

2025-11-29 14:40:26 | INFO     | Spark version: 4.0.0
2025-11-29 14:40:26 | INFO     | Configured driver memory: 64g
2025-11-29 14:40:26 | INFO     | Configured executor memory: 64g
2025-11-29 14:40:26 | INFO     | Off-heap memory: 16g
2025-11-29 14:40:26 | INFO     | Configured driver memory: 64g
2025-11-29 14:40:26 | INFO     | Configured executor memory: 64g
2025-11-29 14:40:26 | INFO     | Off-heap memory: 16g


## 2. Schema Registry

Define versioned schemas for handling schema evolution across different data vintages.


In [3]:
"""
Schema Registry for NYC Yellow Taxi data.

Handles schema evolution from 2009 to present day.

IMPORTANT: There are THREE distinct schema versions:
- 2009: Old V1 format (vendor_name, Trip_Pickup_DateTime, Start_Lon, etc.)
- 2010: Old V2 format (vendor_id, pickup_datetime, pickup_longitude, etc.)  
- 2011+: New format (VendorID, tpep_pickup_datetime, PULocationID, etc.)
"""
from dataclasses import dataclass
from typing import Dict, List, Tuple
from pyspark.sql import Column, DataFrame
from pyspark.sql.functions import col, lit, input_file_name


class SchemaRegistry:
    """
    Registry for managing schema versions and mappings.

    This class handles the schema evolution of NYC TLC Yellow Taxi data
    which changed multiple times between 2009 and 2011.
    """

    # Unified Bronze layer column order
    BRONZE_COLUMNS: List[str] = [
        "vendor_id",
        "pickup_datetime",
        "dropoff_datetime",
        "passenger_count",
        "trip_distance",
        "pickup_longitude",
        "pickup_latitude",
        "dropoff_longitude",
        "dropoff_latitude",
        "rate_code",
        "store_and_fwd_flag",
        "payment_type",
        "fare_amount",
        "extra",
        "mta_tax",
        "tip_amount",
        "tolls_amount",
        "total_amount",
        "pulocationid",
        "dolocationid",
        "improvement_surcharge",
        "congestion_surcharge",
        "airport_fee",
        "cbd_congestion_fee",
        "source_file",
    ]

    # =========================================================================
    # SCHEMA V1: 2009 (vendor_name, Trip_Pickup_DateTime, Start_Lon)
    # =========================================================================
    V1_2009_MAPPING: Dict[str, str] = {
        "vendor_name": "vendor_id",
        "Trip_Pickup_DateTime": "pickup_datetime",
        "Trip_Dropoff_DateTime": "dropoff_datetime",
        "Passenger_Count": "passenger_count",
        "Trip_Distance": "trip_distance",
        "Start_Lon": "pickup_longitude",
        "Start_Lat": "pickup_latitude",
        "End_Lon": "dropoff_longitude",
        "End_Lat": "dropoff_latitude",
        "Rate_Code": "rate_code",
        "store_and_forward": "store_and_fwd_flag",
        "Payment_Type": "payment_type",
        "Fare_Amt": "fare_amount",
        "surcharge": "extra",
        "mta_tax": "mta_tax",
        "Tip_Amt": "tip_amount",
        "Tolls_Amt": "tolls_amount",
        "Total_Amt": "total_amount",
    }

    # =========================================================================
    # SCHEMA V2: 2010 (vendor_id, pickup_datetime, pickup_longitude)
    # =========================================================================
    V2_2010_MAPPING: Dict[str, str] = {
        "vendor_id": "vendor_id",
        "pickup_datetime": "pickup_datetime",
        "dropoff_datetime": "dropoff_datetime",
        "passenger_count": "passenger_count",
        "trip_distance": "trip_distance",
        "pickup_longitude": "pickup_longitude",
        "pickup_latitude": "pickup_latitude",
        "dropoff_longitude": "dropoff_longitude",
        "dropoff_latitude": "dropoff_latitude",
        "rate_code": "rate_code",
        "store_and_fwd_flag": "store_and_fwd_flag",
        "payment_type": "payment_type",
        "fare_amount": "fare_amount",
        "surcharge": "extra",
        "mta_tax": "mta_tax",
        "tip_amount": "tip_amount",
        "tolls_amount": "tolls_amount",
        "total_amount": "total_amount",
    }

    # =========================================================================
    # SCHEMA V3: 2011+ (VendorID, tpep_pickup_datetime, PULocationID)
    # =========================================================================
    V3_NEW_MAPPING: Dict[str, str] = {
        "VendorID": "vendor_id",
        "tpep_pickup_datetime": "pickup_datetime",
        "tpep_dropoff_datetime": "dropoff_datetime",
        "passenger_count": "passenger_count",
        "trip_distance": "trip_distance",
        "RatecodeID": "rate_code",
        "store_and_fwd_flag": "store_and_fwd_flag",
        "payment_type": "payment_type",
        "fare_amount": "fare_amount",
        "extra": "extra",
        "mta_tax": "mta_tax",
        "tip_amount": "tip_amount",
        "tolls_amount": "tolls_amount",
        "total_amount": "total_amount",
        "PULocationID": "pulocationid",
        "DOLocationID": "dolocationid",
        "improvement_surcharge": "improvement_surcharge",
        "congestion_surcharge": "congestion_surcharge",
        "Airport_fee": "airport_fee",
        "cbd_congestion_fee": "cbd_congestion_fee",
    }

    @classmethod
    def _build_select_exprs(cls, mapping: Dict[str, str], available_cols: List[str]) -> List[Column]:
        """Build SELECT expressions using a mapping and available columns."""
        exprs = []
        for target_col in cls.BRONZE_COLUMNS:
            if target_col == "source_file":
                exprs.append(input_file_name().alias(target_col))
            else:
                # Find source column in mapping
                source_col = next(
                    (src for src, tgt in mapping.items() if tgt == target_col),
                    None,
                )
                if source_col and source_col in available_cols:
                    # Cast to string to handle type variations
                    exprs.append(col(source_col).cast("string").alias(target_col))
                else:
                    exprs.append(lit(None).cast("string").alias(target_col))
        return exprs

    @classmethod
    def transform_v1_2009(cls, df: DataFrame) -> DataFrame:
        """Transform 2009 V1 format data to Bronze schema."""
        exprs = cls._build_select_exprs(cls.V1_2009_MAPPING, df.columns)
        return df.select(*exprs)

    @classmethod
    def transform_v2_2010(cls, df: DataFrame) -> DataFrame:
        """Transform 2010 V2 format data to Bronze schema."""
        exprs = cls._build_select_exprs(cls.V2_2010_MAPPING, df.columns)
        return df.select(*exprs)

    @classmethod
    def transform_v3_new(cls, df: DataFrame) -> DataFrame:
        """Transform 2011+ new format data to Bronze schema."""
        exprs = cls._build_select_exprs(cls.V3_NEW_MAPPING, df.columns)
        return df.select(*exprs)


# Initialize schema registry
schema_registry = SchemaRegistry()
logger.info(f"Schema Registry initialized with {len(SchemaRegistry.BRONZE_COLUMNS)} target columns")
logger.info("Supports 3 schema versions: V1 (2009), V2 (2010), V3 (2011+)")

2025-11-29 14:40:26 | INFO     | Schema Registry initialized with 25 target columns
2025-11-29 14:40:26 | INFO     | Supports 3 schema versions: V1 (2009), V2 (2010), V3 (2011+)
2025-11-29 14:40:26 | INFO     | Supports 3 schema versions: V1 (2009), V2 (2010), V3 (2011+)


## 3. Data Quality Framework

Implement data quality checks to validate data before and after transformations.


In [4]:
"""
Data Quality Framework for Bronze layer validation.
"""
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from datetime import datetime
from pyspark.sql import DataFrame
from pyspark.sql.functions import count, col, sum as spark_sum, when, isnan, isnull


@dataclass
class DataQualityResult:
    """Results from data quality checks."""

    check_name: str
    passed: bool
    details: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)

    def __str__(self) -> str:
        status = "✅ PASSED" if self.passed else "❌ FAILED"
        return f"{status} | {self.check_name}: {self.details}"


class DataQualityChecker:
    """
    Enterprise-grade data quality checker for Spark DataFrames.
    """

    def __init__(self, logger: logging.Logger):
        self.logger = logger
        self.results: List[DataQualityResult] = []

    def check_not_empty(self, df: DataFrame, name: str) -> DataQualityResult:
        """Check that DataFrame is not empty."""
        # Use limit(1) to avoid full scan
        is_empty = df.limit(1).count() == 0
        result = DataQualityResult(
            check_name=f"not_empty_{name}",
            passed=not is_empty,
            details={"is_empty": is_empty},
        )
        self.results.append(result)
        self.logger.info(str(result))
        return result

    def check_required_columns(
        self, df: DataFrame, required_cols: List[str], name: str
    ) -> DataQualityResult:
        """Check that required columns exist."""
        missing = [c for c in required_cols if c not in df.columns]
        result = DataQualityResult(
            check_name=f"required_columns_{name}",
            passed=len(missing) == 0,
            details={"missing_columns": missing, "total_columns": len(df.columns)},
        )
        self.results.append(result)
        self.logger.info(str(result))
        return result

    def check_null_percentage(
        self, df: DataFrame, column: str, max_null_pct: float = 0.5, name: str = ""
    ) -> DataQualityResult:
        """Check that null percentage is within threshold (using sample for performance)."""
        # Sample for performance on large datasets
        sample_df = df.sample(fraction=0.01, seed=42) if df.count() > 100000 else df

        total = sample_df.count()
        if total == 0:
            null_pct = 0.0
        else:
            nulls = sample_df.filter(col(column).isNull() | (col(column) == "")).count()
            null_pct = nulls / total

        result = DataQualityResult(
            check_name=f"null_check_{column}_{name}",
            passed=null_pct <= max_null_pct,
            details={
                "null_percentage": round(null_pct * 100, 2),
                "threshold": max_null_pct * 100,
            },
        )
        self.results.append(result)
        self.logger.info(str(result))
        return result

    def get_summary(self) -> Dict[str, Any]:
        """Get summary of all quality checks."""
        passed = sum(1 for r in self.results if r.passed)
        failed = len(self.results) - passed
        return {
            "total_checks": len(self.results),
            "passed": passed,
            "failed": failed,
            "pass_rate": round(passed / len(self.results) * 100, 2)
            if self.results
            else 0,
        }

    def all_passed(self) -> bool:
        """Check if all quality checks passed."""
        return all(r.passed for r in self.results)


# Initialize data quality checker
dq_checker = DataQualityChecker(logger)
logger.info("Data Quality Framework initialized")

2025-11-29 14:40:26 | INFO     | Data Quality Framework initialized


## 4. Data Ingestion Pipeline

Core ETL logic with error handling, metrics collection, and processing orchestration.


In [None]:
"""
Bronze Layer Ingestion Pipeline - Core ETL Logic

Handles THREE schema versions with BATCH PROCESSING to avoid OOM:
- V1 (2009): vendor_name, Trip_Pickup_DateTime, Start_Lon
- V2 (2010): vendor_id, pickup_datetime, pickup_longitude
- V3 (2011+): VendorID, tpep_pickup_datetime, PULocationID

Strategy: Write each schema version directly to Delta (append mode),
rather than unioning everything in memory.
"""
from nyc_taxi_eta.configs.settings import YELLOW_TAXI_DIR, ROOT_DIR
from pyspark.sql.functions import year, to_timestamp
import time
import gc
import shutil


@dataclass
class PipelineMetrics:
    """Metrics collected during pipeline execution."""

    start_time: datetime = field(default_factory=datetime.now)
    end_time: Optional[datetime] = None
    v1_files: int = 0
    v2_files: int = 0
    v3_files: int = 0
    total_records_processed: int = 0
    processing_duration_seconds: float = 0.0
    status: str = "running"
    error_message: Optional[str] = None

    def complete(self, status: str = "success", error: Optional[str] = None):
        """Mark pipeline as complete."""
        self.end_time = datetime.now()
        self.status = status
        self.error_message = error
        self.processing_duration_seconds = (
            self.end_time - self.start_time
        ).total_seconds()


class BronzeIngestionPipeline:
    """
    Enterprise-grade Bronze layer ingestion pipeline with BATCH PROCESSING.

    Handles:
    - THREE schema versions across different data vintages
    - Batch processing to avoid memory issues
    - Error handling and retry logic
    - Data quality validation
    - Metrics collection and logging
    """

    def __init__(
        self,
        spark: pyspark.sql.SparkSession,
        config: PipelineConfig,
        schema_registry: SchemaRegistry,
        dq_checker: DataQualityChecker,
        logger: logging.Logger,
    ):
        self.spark = spark
        self.config = config
        self.schema_registry = schema_registry
        self.dq_checker = dq_checker
        self.logger = logger
        self.metrics = PipelineMetrics()

        # Set paths from settings
        self.config.source_dir = YELLOW_TAXI_DIR
        self.config.target_dir = ROOT_DIR / "data" / "bronze" / "yellow_taxi"

    def discover_files(self) -> Tuple[List[str], List[str], List[str]]:
        """Discover and classify source files by schema version."""
        self.logger.info(f"Discovering files in: {self.config.source_dir}")

        all_files = sorted(
            [
                str(self.config.source_dir / f)
                for f in os.listdir(self.config.source_dir)
                if f.endswith(".parquet")
            ]
        )

        # Classify by year
        v1_files = [f for f in all_files if any(y in f for y in self.config.v1_years)]
        v2_files = [f for f in all_files if any(y in f for y in self.config.v2_years)]
        v3_files = [f for f in all_files if f not in v1_files and f not in v2_files]

        self.metrics.v1_files = len(v1_files)
        self.metrics.v2_files = len(v2_files)
        self.metrics.v3_files = len(v3_files)

        self.logger.info(f"Found {len(v1_files)} V1 files (2009)")
        self.logger.info(f"Found {len(v2_files)} V2 files (2010)")
        self.logger.info(f"Found {len(v3_files)} V3 files (2011+)")

        return v1_files, v2_files, v3_files

    def _process_and_write_batch(
        self, 
        files: List[str], 
        transform_fn, 
        batch_name: str,
        write_mode: str
    ) -> None:
        """Process a batch of files and write directly to Delta."""
        if not files:
            return
            
        self.logger.info(f"Processing {len(files)} files for {batch_name}...")
        
        # Read files WITHOUT merging schema - each file read separately then unioned
        # This avoids Spark trying to reconcile types across files
        dfs = []
        for f in files:
            df = self.spark.read.parquet(f)
            df_transformed = transform_fn(df)
            dfs.append(df_transformed)
        
        # Union all transformed DataFrames (all have same STRING schema now)
        df_bronze = dfs[0]
        for df in dfs[1:]:
            df_bronze = df_bronze.unionByName(df)
        
        # Add partition column
        df_bronze = df_bronze.withColumn(
            "year", year(to_timestamp(col("pickup_datetime")))
        )
        
        # NOTE: No data quality filtering at Bronze layer - preserve raw data as-is
        # Invalid years (e.g., 2001, 2084) will be filtered at Silver layer
        
        # Write to Delta with mergeSchema to handle any variations
        df_bronze.write.format("delta") \
            .mode(write_mode) \
            .partitionBy(*self.config.partition_by) \
            .option("mergeSchema", "true") \
            .save(str(self.config.target_dir))
        
        self.logger.info(f"{batch_name} written to Delta")
        
        # Clear cache and trigger GC
        del dfs, df_bronze
        self.spark.catalog.clearCache()
        gc.collect()

    def process_v1_format(self, files: List[str], write_mode: str = "overwrite") -> None:
        """Process V1 (2009) format files in batches."""
        if not files:
            return

        self.logger.info("Processing V1 (2009) format files...")
        batch_size = self.config.batch_size
        
        for i in range(0, len(files), batch_size):
            batch = files[i:i + batch_size]
            batch_num = i // batch_size + 1
            total_batches = (len(files) + batch_size - 1) // batch_size
            
            self.logger.info(f"V1 batch {batch_num}/{total_batches}: {len(batch)} files")
            
            # First batch uses specified mode, subsequent use append
            mode = write_mode if i == 0 else "append"
            self._process_and_write_batch(
                batch, 
                self.schema_registry.transform_v1_2009,
                f"V1_batch_{batch_num}",
                mode
            )

        self.logger.info("V1 format processing complete")

    def process_v2_format(self, files: List[str], write_mode: str = "append") -> None:
        """Process V2 (2010) format files in batches."""
        if not files:
            return

        self.logger.info("Processing V2 (2010) format files...")
        batch_size = self.config.batch_size
        
        for i in range(0, len(files), batch_size):
            batch = files[i:i + batch_size]
            batch_num = i // batch_size + 1
            total_batches = (len(files) + batch_size - 1) // batch_size
            
            self.logger.info(f"V2 batch {batch_num}/{total_batches}: {len(batch)} files")
            
            mode = write_mode if i == 0 else "append"
            self._process_and_write_batch(
                batch,
                self.schema_registry.transform_v2_2010,
                f"V2_batch_{batch_num}",
                mode
            )

        self.logger.info("V2 format processing complete")

    def process_v3_format(self, files: List[str], write_mode: str = "append") -> None:
        """Process V3 (2011+) format files in batches."""
        if not files:
            return

        self.logger.info("Processing V3 (2011+) format files...")
        batch_size = self.config.batch_size
        
        for i in range(0, len(files), batch_size):
            batch = files[i:i + batch_size]
            batch_num = i // batch_size + 1
            total_batches = (len(files) + batch_size - 1) // batch_size
            
            self.logger.info(f"V3 batch {batch_num}/{total_batches}: {len(batch)} files")
            
            mode = write_mode if i == 0 else "append"
            self._process_and_write_batch(
                batch,
                self.schema_registry.transform_v3_new,
                f"V3_batch_{batch_num}",
                mode
            )

        self.logger.info("V3 format processing complete")

    def cleanup_target(self) -> None:
        """Remove existing Delta table to start fresh."""
        target = self.config.target_dir
        if target.exists():
            self.logger.info(f"Removing existing Delta table: {target}")
            shutil.rmtree(target)

    def run(self) -> PipelineMetrics:
        """Execute the full ingestion pipeline with batch processing."""
        self.logger.info("=" * 60)
        self.logger.info("STARTING BRONZE INGESTION PIPELINE (BATCH MODE)")
        self.logger.info("=" * 60)

        try:
            # Step 0: Clean up existing table for fresh start
            self.cleanup_target()

            # Step 1: Discover files
            v1_files, v2_files, v3_files = self.discover_files()

            # Step 2: Process each schema version and write directly to Delta
            # V1 starts fresh (overwrite), subsequent append
            self.process_v1_format(v1_files, write_mode="overwrite")
            self.process_v2_format(v2_files, write_mode="append")
            self.process_v3_format(v3_files, write_mode="append")

            # Step 3: Validate output
            self.logger.info("Validating Delta table...")
            df_check = self.spark.read.format("delta").load(str(self.config.target_dir))
            self.dq_checker.check_not_empty(df_check, "bronze_output")
            self.dq_checker.check_required_columns(
                df_check, SchemaRegistry.BRONZE_COLUMNS + ["year"], "bronze_output"
            )

            # Complete metrics
            self.metrics.complete(status="success")
            self.logger.info("Pipeline completed successfully!")

        except Exception as e:
            self.logger.error(f"Pipeline failed: {str(e)}")
            self.metrics.complete(status="failed", error=str(e))
            raise

        return self.metrics


# Initialize pipeline
pipeline = BronzeIngestionPipeline(
    spark=spark,
    config=config,
    schema_registry=schema_registry,
    dq_checker=dq_checker,
    logger=logger,
)

logger.info("Pipeline initialized and ready to run (batch mode enabled)")

2025-11-29 14:40:26 | INFO     | Pipeline initialized and ready to run (batch mode enabled)


## 5. Pipeline Execution

Execute the pipeline with optional preview mode for testing.


In [6]:
"""
Preview Mode - Test the pipeline without writing data.
Run this cell to validate transformations before full execution.
"""
from pyspark.sql.functions import col

# Preview: Discover files
v1_files, v2_files, v3_files = pipeline.discover_files()

# Preview V1 (2009) format transformation - single file only
if v1_files:
    logger.info("Preview: V1 (2009) format sample")
    df_v1 = spark.read.parquet(v1_files[0])
    df_v1_bronze = schema_registry.transform_v1_2009(df_v1)
    df_v1_bronze.limit(3).show(truncate=30)
    del df_v1, df_v1_bronze

# Preview V2 (2010) format transformation - single file only
if v2_files:
    logger.info("Preview: V2 (2010) format sample")
    df_v2 = spark.read.parquet(v2_files[0])
    df_v2_bronze = schema_registry.transform_v2_2010(df_v2)
    df_v2_bronze.limit(3).show(truncate=30)
    del df_v2, df_v2_bronze

# Preview V3 (2011+) format transformation - single file only
if v3_files:
    logger.info("Preview: V3 (2011+) format sample")
    df_v3 = spark.read.parquet(v3_files[0])
    df_v3_bronze = schema_registry.transform_v3_new(df_v3)
    df_v3_bronze.limit(3).show(truncate=30)
    del df_v3, df_v3_bronze

# Clear any cached data
spark.catalog.clearCache()
logger.info("Preview complete - ready for full pipeline execution")

2025-11-29 14:40:26 | INFO     | Discovering files in: /home/administrator/Desktop/datascience/github/nyc-taxi-eta/data/landing/trips_data/yellow_taxi
2025-11-29 14:40:26 | INFO     | Found 12 V1 files (2009)
2025-11-29 14:40:26 | INFO     | Found 12 V2 files (2010)
2025-11-29 14:40:26 | INFO     | Found 178 V3 files (2011+)
2025-11-29 14:40:26 | INFO     | Preview: V1 (2009) format sample
2025-11-29 14:40:26 | INFO     | Found 12 V1 files (2009)
2025-11-29 14:40:26 | INFO     | Found 12 V2 files (2010)
2025-11-29 14:40:26 | INFO     | Found 178 V3 files (2011+)
2025-11-29 14:40:26 | INFO     | Preview: V1 (2009) format sample




+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+-----------------+----------------+---------+------------------+------------+-----------+-----+-------+----------+------------+------------+------------+------------+---------------------+--------------------+-----------+------------------+------------------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|pulocationid|dolocationid|improvement_surcharge|congestion_surcharge|airport_fee|cbd_congestion_fee|                   source_file|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+-----------------+----------------+---------+------------------+------------+-----------+-----+-------+----------+

                                                                                

+---------+-------------------+-------------------+---------------+-------------+------------------+---------------+------------------+----------------+---------+------------------+------------+-----------+-----+-------+----------+------------+------------+------------+------------+---------------------+--------------------+-----------+------------------+------------------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|pickup_latitude| dropoff_longitude|dropoff_latitude|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|pulocationid|dolocationid|improvement_surcharge|congestion_surcharge|airport_fee|cbd_congestion_fee|                   source_file|
+---------+-------------------+-------------------+---------------+-------------+------------------+---------------+------------------+----------------+---------+------------------+------------+-----------+-----+-------+--

In [7]:
"""
FULL PIPELINE EXECUTION

⚠️ WARNING: This will process ALL files and write to Delta table.
   Estimated time: 10-30 minutes depending on data volume.

Uncomment and run when ready for production execution.
"""
# Execute the full pipeline
metrics = pipeline.run()

2025-11-29 14:40:32 | INFO     | STARTING BRONZE INGESTION PIPELINE (BATCH MODE)
2025-11-29 14:40:32 | INFO     | Removing existing Delta table: /home/administrator/Desktop/datascience/github/nyc-taxi-eta/data/bronze/yellow_taxi
2025-11-29 14:40:32 | INFO     | STARTING BRONZE INGESTION PIPELINE (BATCH MODE)
2025-11-29 14:40:32 | INFO     | Removing existing Delta table: /home/administrator/Desktop/datascience/github/nyc-taxi-eta/data/bronze/yellow_taxi
2025-11-29 14:40:34 | INFO     | Discovering files in: /home/administrator/Desktop/datascience/github/nyc-taxi-eta/data/landing/trips_data/yellow_taxi
2025-11-29 14:40:34 | INFO     | Found 12 V1 files (2009)
2025-11-29 14:40:34 | INFO     | Found 12 V2 files (2010)
2025-11-29 14:40:34 | INFO     | Found 178 V3 files (2011+)
2025-11-29 14:40:34 | INFO     | Processing V1 (2009) format files...
2025-11-29 14:40:34 | INFO     | V1 batch 1/1: 12 files
2025-11-29 14:40:34 | INFO     | Processing 12 files for V1_batch_1...
2025-11-29 14:40:3

25/11/29 14:40:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

2025-11-29 14:44:57 | INFO     | V1_batch_1 written to Delta
2025-11-29 14:44:57 | INFO     | V1 format processing complete
2025-11-29 14:44:57 | INFO     | Processing V2 (2010) format files...
2025-11-29 14:44:57 | INFO     | V2 batch 1/1: 12 files
2025-11-29 14:44:57 | INFO     | Processing 12 files for V2_batch_1...
2025-11-29 14:44:57 | INFO     | V1 format processing complete
2025-11-29 14:44:57 | INFO     | Processing V2 (2010) format files...
2025-11-29 14:44:57 | INFO     | V2 batch 1/1: 12 files
2025-11-29 14:44:57 | INFO     | Processing 12 files for V2_batch_1...


                                                                                

2025-11-29 14:49:23 | INFO     | V2_batch_1 written to Delta
2025-11-29 14:49:23 | INFO     | V2 format processing complete
2025-11-29 14:49:23 | INFO     | Processing V3 (2011+) format files...
2025-11-29 14:49:23 | INFO     | V3 batch 1/9: 20 files
2025-11-29 14:49:23 | INFO     | Processing 20 files for V3_batch_1...
2025-11-29 14:49:23 | INFO     | V2 format processing complete
2025-11-29 14:49:23 | INFO     | Processing V3 (2011+) format files...
2025-11-29 14:49:23 | INFO     | V3 batch 1/9: 20 files
2025-11-29 14:49:23 | INFO     | Processing 20 files for V3_batch_1...


                                                                                

2025-11-29 14:54:43 | INFO     | V3_batch_1 written to Delta
2025-11-29 14:54:44 | INFO     | V3 batch 2/9: 20 files
2025-11-29 14:54:44 | INFO     | Processing 20 files for V3_batch_2...
2025-11-29 14:54:44 | INFO     | V3 batch 2/9: 20 files
2025-11-29 14:54:44 | INFO     | Processing 20 files for V3_batch_2...


                                                                                

2025-11-29 15:00:03 | INFO     | V3_batch_2 written to Delta
2025-11-29 15:00:03 | INFO     | V3 batch 3/9: 20 files
2025-11-29 15:00:03 | INFO     | Processing 20 files for V3_batch_3...
2025-11-29 15:00:03 | INFO     | V3 batch 3/9: 20 files
2025-11-29 15:00:03 | INFO     | Processing 20 files for V3_batch_3...


                                                                                

2025-11-29 15:04:31 | INFO     | V3_batch_3 written to Delta
2025-11-29 15:04:32 | INFO     | V3 batch 4/9: 20 files
2025-11-29 15:04:32 | INFO     | Processing 20 files for V3_batch_4...
2025-11-29 15:04:32 | INFO     | V3 batch 4/9: 20 files
2025-11-29 15:04:32 | INFO     | Processing 20 files for V3_batch_4...


                                                                                

2025-11-29 15:08:16 | INFO     | V3_batch_4 written to Delta
2025-11-29 15:08:16 | INFO     | V3 batch 5/9: 20 files
2025-11-29 15:08:16 | INFO     | Processing 20 files for V3_batch_5...
2025-11-29 15:08:16 | INFO     | V3 batch 5/9: 20 files
2025-11-29 15:08:16 | INFO     | Processing 20 files for V3_batch_5...


                                                                                

2025-11-29 15:11:26 | INFO     | V3_batch_5 written to Delta
2025-11-29 15:11:26 | INFO     | V3 batch 6/9: 20 files
2025-11-29 15:11:26 | INFO     | Processing 20 files for V3_batch_6...
2025-11-29 15:11:26 | INFO     | V3 batch 6/9: 20 files
2025-11-29 15:11:26 | INFO     | Processing 20 files for V3_batch_6...


                                                                                

2025-11-29 15:13:33 | INFO     | V3_batch_6 written to Delta
2025-11-29 15:13:34 | INFO     | V3 batch 7/9: 20 files
2025-11-29 15:13:34 | INFO     | Processing 20 files for V3_batch_7...
2025-11-29 15:13:34 | INFO     | V3 batch 7/9: 20 files
2025-11-29 15:13:34 | INFO     | Processing 20 files for V3_batch_7...


                                                                                

2025-11-29 15:15:00 | INFO     | V3_batch_7 written to Delta
2025-11-29 15:15:00 | INFO     | V3 batch 8/9: 20 files
2025-11-29 15:15:00 | INFO     | Processing 20 files for V3_batch_8...
2025-11-29 15:15:00 | INFO     | V3 batch 8/9: 20 files
2025-11-29 15:15:00 | INFO     | Processing 20 files for V3_batch_8...


                                                                                

2025-11-29 15:16:41 | INFO     | V3_batch_8 written to Delta
2025-11-29 15:16:41 | INFO     | V3 batch 9/9: 18 files
2025-11-29 15:16:41 | INFO     | Processing 18 files for V3_batch_9...
2025-11-29 15:16:41 | INFO     | V3 batch 9/9: 18 files
2025-11-29 15:16:41 | INFO     | Processing 18 files for V3_batch_9...


                                                                                

2025-11-29 15:18:20 | INFO     | V3_batch_9 written to Delta
2025-11-29 15:18:20 | INFO     | V3 format processing complete
2025-11-29 15:18:20 | INFO     | Validating Delta table...
2025-11-29 15:18:20 | INFO     | V3 format processing complete
2025-11-29 15:18:20 | INFO     | Validating Delta table...
2025-11-29 15:18:20 | INFO     | ✅ PASSED | not_empty_bronze_output: {'is_empty': False}
2025-11-29 15:18:20 | INFO     | ✅ PASSED | required_columns_bronze_output: {'missing_columns': [], 'total_columns': 26}
2025-11-29 15:18:20 | INFO     | Pipeline completed successfully!
2025-11-29 15:18:20 | INFO     | ✅ PASSED | not_empty_bronze_output: {'is_empty': False}
2025-11-29 15:18:20 | INFO     | ✅ PASSED | required_columns_bronze_output: {'missing_columns': [], 'total_columns': 26}
2025-11-29 15:18:20 | INFO     | Pipeline completed successfully!


## 6. Pipeline Results & Monitoring

Review execution metrics, data quality results, and validate output.


In [8]:
"""
Pipeline Execution Summary and Metrics Report
"""
from IPython.display import display, HTML

# Pipeline Metrics Summary
logger.info("=" * 60)
logger.info("PIPELINE EXECUTION SUMMARY")
logger.info("=" * 60)

print(f"""
╔══════════════════════════════════════════════════════════════╗
║                    PIPELINE METRICS                          ║
╠══════════════════════════════════════════════════════════════╣
║ Status:              {metrics.status.upper():<40} ║
║ Start Time:          {str(metrics.start_time):<40} ║
║ End Time:            {str(metrics.end_time):<40} ║
║ Duration:            {metrics.processing_duration_seconds:.2f} seconds{" ":<30} ║
╠══════════════════════════════════════════════════════════════╣
║                    FILE STATISTICS                           ║
╠══════════════════════════════════════════════════════════════╣
║ V1 Files (2009):     {metrics.v1_files:<40} ║
║ V2 Files (2010):     {metrics.v2_files:<40} ║
║ V3 Files (2011+):    {metrics.v3_files:<40} ║
║ Total Files:         {metrics.v1_files + metrics.v2_files + metrics.v3_files:<40} ║
╚══════════════════════════════════════════════════════════════╝
""")

# Data Quality Summary
dq_summary = dq_checker.get_summary()
print(f"""
╔══════════════════════════════════════════════════════════════╗
║                  DATA QUALITY SUMMARY                        ║
╠══════════════════════════════════════════════════════════════╣
║ Total Checks:        {dq_summary["total_checks"]:<40} ║
║ Passed:              {dq_summary["passed"]:<40} ║
║ Failed:              {dq_summary["failed"]:<40} ║
║ Pass Rate:           {dq_summary["pass_rate"]:.1f}%{" ":<37} ║
╚══════════════════════════════════════════════════════════════╝
""")

2025-11-29 15:18:20 | INFO     | PIPELINE EXECUTION SUMMARY

╔══════════════════════════════════════════════════════════════╗
║                    PIPELINE METRICS                          ║
╠══════════════════════════════════════════════════════════════╣
║ Status:              SUCCESS                                  ║
║ Start Time:          2025-11-29 14:40:26.492860               ║
║ End Time:            2025-11-29 15:18:20.645689               ║
║ Duration:            2274.15 seconds                               ║
╠══════════════════════════════════════════════════════════════╣
║                    FILE STATISTICS                           ║
╠══════════════════════════════════════════════════════════════╣
║ V1 Files (2009):     12                                       ║
║ V2 Files (2010):     12                                       ║
║ V3 Files (2011+):    178                                      ║
║ Total Files:         202                                      ║
╚═══════════════

In [9]:
"""
Validate Output: Read back from Delta table and verify.
"""
# Read the Delta table
BRONZE_DIR = ROOT_DIR / "data" / "bronze" / "yellow_taxi"

df_bronze_output = spark.read.format("delta").load(str(BRONZE_DIR))

logger.info("Bronze Table Schema:")
df_bronze_output.printSchema()

logger.info("Bronze Table Partitions (Year):")
df_bronze_output.select("year").distinct().orderBy("year").show(50, truncate=False)

logger.info("Sample Data from Bronze Table:")
df_bronze_output.limit(5).show(truncate=25)

2025-11-29 15:18:20 | INFO     | Bronze Table Schema:
root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- pulocationid: string (nullable = true)
 |-- dolocationid: string (nullable = true)
 |-- improvement_surcharge: string (nullable = tr



+----+
|year|
+----+
|2001|
|2002|
|2003|
|2004|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
|2022|
|2023|
|2024|
|2025|
|2026|
|2028|
|2029|
|2031|
|2032|
|2033|
|2037|
|2038|
|2041|
|2042|
|2053|
|2058|
|2066|
|2070|
|2084|
|2088|
|2090|
|2098|
+----+

2025-11-29 15:18:22 | INFO     | Sample Data from Bronze Table:


                                                                                

+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+-----------------+----------------+---------+------------------+------------+-----------+-----+-------+----------+------------+------------+------------+------------+---------------------+--------------------+-----------+------------------+-------------------------+----+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|pulocationid|dolocationid|improvement_surcharge|congestion_surcharge|airport_fee|cbd_congestion_fee|              source_file|year|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+-----------------+----------------+---------+------------------+------------+-----------+-----+-------+----------+

In [10]:
"""
Cleanup: Stop Spark session and release resources.
"""
logger.info("=" * 60)
logger.info("PIPELINE COMPLETE")
logger.info(f"Finished at: {datetime.now().isoformat()}")
logger.info("=" * 60)

# Uncomment to stop Spark session
spark.stop()
logger.info("Spark session stopped")

2025-11-29 15:18:22 | INFO     | PIPELINE COMPLETE
2025-11-29 15:18:22 | INFO     | Finished at: 2025-11-29T15:18:22.625834
2025-11-29 15:18:22 | INFO     | PIPELINE COMPLETE
2025-11-29 15:18:22 | INFO     | Finished at: 2025-11-29T15:18:22.625834
2025-11-29 15:18:28 | INFO     | Spark session stopped
2025-11-29 15:18:28 | INFO     | Spark session stopped
