In [10]:
from utils.spark_session import get_spark_session
from utils.hadoop_setup import complete_hadoop_setup

# Hadoop setup run
complete_hadoop_setup()

# Create Spark Session and assign it to spark 'Variable'
spark = get_spark_session()

‚úî HADOOP_HOME set to:, os.environ['HADOOP_HOME']
‚úî Added to PATH: C:\hadoop\bin

 ‚úî winutils.exe: True
 ‚úî hadoop.dll: True

üéâ Setup complete!


In [11]:
from pyspark.sql.functions import (
    current_timestamp,
    current_date,
    input_file_name,
    lit
)
from pathlib import Path
from datetime import datetime
from functools import reduce


# Project Configuration
PROJECT_ROOT = Path(r"C:\Users\chira\Desktop\data_engineering\PySpark\nyc-taxi-analytics-platform")


# Base paths
LANDING_BASE_PATH = PROJECT_ROOT /  "data" / "landing" / "nyc_taxi"
BRONZE_BASE_PATH = PROJECT_ROOT / "data" / "bronze" / "nyc_taxi"


# Taxi types to process
TAXI_TYPES = ["green", "yellow"]


# Batch ID for auditability
BATCH_ID = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

PROCESSED_FILES_PATH = str(PROJECT_ROOT / "data" / "_processed_files")

In [12]:
#------ Processed-files tracker-------------#
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

processed_schema = StructType([
    StructField("file_path", StringType(), False),
    StructField("processed_at", TimestampType(), False)
])


try:
    processed_files_df = spark.read.format("delta").load(PROCESSED_FILES_PATH)
    processed_files = {row.file_path for row in processed_files_df.collect()}
    print(f"‚û° Loaded {len(processed_files)} previously processed files")
except Exception:
    print("‚û° Creating new _processed_files tracker table...")
    processed_files_path_obj = Path(PROCESSED_FILES_PATH)
    if processed_files_path_obj.exists():
        import shutil
        shutil.rmtree(processed_files_path_obj)
        print(" ‚úî Cleaned up existing directory")
    spark.createDataFrame([], processed_schema) \
         .write.format("delta") \
         .save(PROCESSED_FILES_PATH)
    processed_files = set()
    print(" ‚úî Tracker table created (empty)")

‚û° Loaded 2 previously processed files


## Generic Ingestion function

In [13]:
def ingest_to_bronze(taxi_type: str):
    """
    Ingest one taxi type (green | yellow) from landing to bronze layer.
    taxi_type must match the folder name under landing/nyc_taxi/.
    """
    landing_path = LANDING_BASE_PATH / taxi_type
    bronze_path = BRONZE_BASE_PATH / taxi_type

    print(f"\n{'='*60}")
    print(f"Processing {taxi_type.upper()} taxi data")
    print(f"\n{'='*60}")

    if not landing_path.exists():
        print(f"‚ùå No landing data found for {taxi_type} at: {landing_path}")
        return
    
    # Only process 2025 data
    year_2025_path = landing_path / "2025"
    if not year_2025_path.exists():
        print(f"‚ùå No 2025 directory found for {taxi_type}")
        return
    
    # Discover valid (non-empty) parquet files
    parquet_files = list(year_2025_path.rglob("*.parquet"))
    valid_files = [f for f in parquet_files if f.stat().st_size > 0]

    if not valid_files:
        print(f"‚ùå No valid parquet files found in: {year_2025_path}")
        return
    
    # ---------------Read--------------------

    try:
        print(f"‚û° Reading {taxi_type} data for year 2025...")
        print(f"‚û° Found {len(valid_files)} valid parquet file(s)")
        for pf in valid_files:
            file_size_mb = pf.stat().st_size / (1024 * 1024)
            print(f"    - {pf.parent.name}/{pf.name}: {file_size_mb:.2f} MB")

        year_path_str = str(year_2025_path).replace("\\", "/")
        df = (
            spark.read
                 .option("basePath", year_path_str)
                 .option("mergeSchema", "true")
                 .parquet(year_path_str)
        )
        print(f"‚úî Loaded {df.count():,} total rows from 2025")
    except Exception as e:
        print(f" ‚ùå Could not read 2025 data: {str(e)[:200]}")
        import traceback
        traceback.print_exc()
        return
    

    # ----------- Incremental Filter ------------------------
    print(f" ‚û° Checking for already-processed files...")
    df = df.withColumn("_source_file", input_file_name())
    df_new = df.filter(~df["_source_file"].isin(processed_files))
    new_count = df_new.count()

    if new_count == 0:
        print(f" ‚úî No new records to ingest for {taxi_type} - already up to date")
        return
    
    print(f" ‚û° Found {new_count:,} new records to process...")


    # -----------Add audit metadata -------------------------
    df_bronze = (
        df_new
        .withColumn("_ingestion_date", current_date())
        .withColumn("_ingestion_timestamp", current_timestamp())
        .withColumn("BATCH_ID", lit(BATCH_ID))
    )

    # ----------------Write to bronze Delta Lake ---------------------
    new_files_df = (
        df_new.select("_source_file")
              .distinct()
              .withColumnRenamed("_source_file", "file_path")
              .withColumn("processed_at", current_timestamp())
    )
    files_to_track = new_files_df.count()
    new_files_df.write.format("delta") \
                .mode("append") \
                .save(PROCESSED_FILES_PATH)
    
    print(f"\n{'='*60}")
    print(f" ‚úî Bronze ingestion completed for {taxi_type.upper()}")
    print(f"\n{'='*60}")
    print(f"    Total records written : {new_count:,}")
    print(f"    Files tracked : {files_to_track}")
    print(f"    BATCH ID : {BATCH_ID}")
    

In [14]:
#--------Convenience Wrapper--------------

def ingest_green_to_bronze():
    ingest_to_bronze("green")


def ingest_yellow_to_bronze():
    ingest_to_bronze("yellow")




In [15]:
# -----------Execute ingestion for all taxi types -------------
print(f"\n{'#'*60}")
print(f"Starting NYC Taxi Bronze Layer Ingestion")
print(f"Batch: {BATCH_ID}")
print(f"{'#'*60}")


ingestion_tasks = [
    ("GREEN", ingest_green_to_bronze),
    ("YELLOW", ingest_yellow_to_bronze),
]

results = {}


for taxi_type_label, ingest_fn in ingestion_tasks:
    try:
        ingest_fn()
        results[taxi_type_label] = "SUCCESS"
        print(f"\n ‚úî SUCCESS: {taxi_type_label} taxi data ingestion completed!")
    except Exception as e:
        results[taxi_type_label] = "FAILED"
        print(f"\n ‚ùå ERROR: {taxi_type_label} taxi ingestion failed")
        print(f"   {str(e)}")
        import traceback
        traceback.print_exc()


# --------Final Summary----------
print(f"\n{'#'*60}")
print(f"Process complete - Summary")
print(f"\n{'#'*60}")

for taxi_type_label, status in results.items():
    icon= "‚úî" if status == "SUCCESS" else "‚ùå"
    print(f" {icon} {taxi_type_label}: {status}" )
print(f"{'#'*60}")


############################################################
Starting NYC Taxi Bronze Layer Ingestion
Batch: batch_20260220_142523
############################################################

Processing GREEN taxi data

‚û° Reading green data for year 2025...
‚û° Found 1 valid parquet file(s)
    - month=09/green_tripdata_2025-09.parquet: 1.15 MB
‚úî Loaded 48,893 total rows from 2025
 ‚û° Checking for already-processed files...
 ‚úî No new records to ingest for green - already up to date

 ‚úî SUCCESS: GREEN taxi data ingestion completed!

Processing YELLOW taxi data

‚û° Reading yellow data for year 2025...
‚û° Found 1 valid parquet file(s)
    - month=09/yellow_tripdata_2025-09.parquet: 69.08 MB
‚úî Loaded 4,251,015 total rows from 2025
 ‚û° Checking for already-processed files...
 ‚úî No new records to ingest for yellow - already up to date

 ‚úî SUCCESS: YELLOW taxi data ingestion completed!

############################################################
Process complete - Summary