In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re
import os

In [0]:
# Configs
catalog='nyc_taxi'
schema='nyc_schema'
raw_folder=f'/Volumes/{catalog}/{schema}/raw_zone/'
bronze_path=f'/Volumes/{catalog}/{schema}/bronze_zone'

In [0]:
# expected schema
expected_schema = StructType([
    StructField("VendorID", IntegerType(), True),
    StructField("lpep_pickup_datetime", TimestampType(), True),
    StructField("lpep_dropoff_datetime", TimestampType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("ehail_fee", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("trip_type", IntegerType(), True),
    StructField("congestion_surcharge", DoubleType(), True)
])


In [0]:
#raw files list
files = [f for f in dbutils.fs.ls(raw_folder) if f.name.endswith(".parquet")]

In [0]:
for file in files:
    file_path = file.path
    file_name = os.path.basename(file_path)
    file_date = file_name.replace("green_tripdata_", "").replace(".parquet", "")
    print(f"\n--- Processing file: {file_name} ---")
    df_raw = spark.read.parquet(file_path)

    #Schema Validation
    actual_cols = df_raw.columns
    expected_cols = [f.name for f in expected_schema]
    missing_cols = set(expected_cols) - set(actual_cols)
    unexpected_cols = set(actual_cols) - set(expected_cols)

    print("Missing Columns:", missing_cols)
    print("Unexpected Columns:", unexpected_cols)

    # Essential Columns Check
    essential_columns = [
        'VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime',
        'RatecodeID', 'PULocationID', 'DOLocationID',
        'passenger_count', 'trip_distance', 'fare_amount', 'payment_type'
    ]
    missing_essentials = [col for col in essential_columns if col not in df_raw.columns]
    if missing_essentials:
        print("Missing essential columns:", missing_essentials)
        continue
    else:
        print("All essential columns present.")

    # Null Count Logging (Optional in production)
    null_counts = df_raw.select([
        sum(when(col(c).isNull(), 1).otherwise(0)).alias(f"{c}_null_count")
        for c in df_raw.columns
    ])
    print(f"\n Null Count Summary for {file_name}:")
    null_counts.show(truncate=False)

    # File Traceability Columns
    df_bronze = df_raw.withColumn("file_date", lit(file_date)) \
                      .withColumn("source_file", col("_metadata.file_path")) \
                      .withColumn("ingestion_date", current_date())

    # Duplicate Logging
    total_count = df_bronze.count()
    distinct_count = df_bronze.dropDuplicates().count()
    print(f"Total Rows: {total_count}, Distinct Rows: {distinct_count}, Duplicate Rows: {total_count - distinct_count}")

    # Write to bronze zone (partitioned by file_date)
    df_bronze.write.format("delta") \
        .mode("overwrite") \
        .partitionBy("file_date") \
        .saveAsTable("nyc_taxi.nyc_schema.bronze")

    print(f"Successfully written {file_name} to Bronze layer.\n")