In [0]:
%run ../config/pipeline_config

In [0]:
#  Tasks - 
#   - Reads raw CSV from Unity Catalog Volume 
#   - Adds audit columns (ingestion_timestamp, source_file etc)
#   - Enforces schema (everything as String in Bronze — safe landing)
#   - Saves as Delta table in Unity Catalog
#   - Partitions by ingestion_date for query performance


from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, current_timestamp, current_date,
    lit, count, when
)
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

#     reading from Unity Catalog Volume
#     schema = everything as String — safest approach for Bronze layer
#    Silver layer will cast to proper types after cleaning

print("\nStep 1: Reading raw CSV from Volume...")

raw_schema = StructType([
    StructField("search_id",            StringType(), True),
    StructField("user_id",              StringType(), True),
    StructField("timestamp",            StringType(), True),
    StructField("city",                 StringType(), True),
    StructField("state",                StringType(), True),
    StructField("city_tier",            StringType(), True),
    StructField("pickup_lat",           StringType(), True),
    StructField("pickup_lng",           StringType(), True),
    StructField("ride_type",            StringType(), True),
    StructField("status",               StringType(), True),
    StructField("error_type",           StringType(), True),
    StructField("device",               StringType(), True),
    StructField("app_version",          StringType(), True),
    StructField("session_duration_sec", StringType(), True),
    StructField("is_repeat_search",     StringType(), True),
])

df_raw = spark.read \
    .option("header", "true") \
    .option("multiLine", "false") \
    .schema(raw_schema) \
    .csv(RAW_CSV_PATH)

raw_count = df_raw.count()
print(f"  Records read from CSV : {raw_count:,}")

# -----------------------------------------------------------------------------
# 5. ADD AUDIT COLUMNS
#    These track WHERE data came from and WHEN it was loaded

print("\nStep 2: Adding audit columns...")

df_bronze = df_raw \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .withColumn("ingestion_date",      current_date()) \
    .withColumn("pipeline_version",    lit(PIPELINE_VERSION)) \
    .withColumn("source_file",         lit(RAW_CSV_PATH)) \
    .withColumn("pipeline_layer",      lit("BRONZE"))

print("  ✅ ingestion_timestamp → when this batch was loaded")
print("  ✅ ingestion_date      → date partition key")
print("  ✅ pipeline_version    → 1.0.0")
print("  ✅ source_file         → source CSV path")
print("  ✅ pipeline_layer      → BRONZE")

# -----------------------------------------------------------------------------
# 6. PREVIEW SCHEMA
# -----------------------------------------------------------------------------

print("\nStep 3: Bronze table schema...")
df_bronze.printSchema()

# -----------------------------------------------------------------------------
# 7. SETUP CATALOG AND SCHEMA
# -----------------------------------------------------------------------------

print(f"\nStep 4: Setting up catalog and schema...")
spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME}")
print(f"  ✅ Using: {CATALOG_NAME}.{SCHEMA_NAME}")

# -----------------------------------------------------------------------------
# 8. WRITE TO DELTA TABLE
#    format("delta")           → Delta format (ACID, time travel, versioning)
#    partitionBy("ingestion_date") → splits by date for fast queries
#    saveAsTable                → registers in Unity Catalog (SQL queryable)
# -----------------------------------------------------------------------------

print(f"\nStep 5: Writing to Delta table → {BRONZE_TABLE_FQN}...")

df_bronze.write \
    .format("delta") \
    .mode(WRITE_MODE) \
    .partitionBy("ingestion_date") \
    .option("overwriteSchema", "true") \
    .saveAsTable(BRONZE_TABLE_FQN)

print(f"  ✅ Delta table written successfully!")

# -----------------------------------------------------------------------------
# 9. VERIFY — READ BACK FROM DELTA TABLE
# -----------------------------------------------------------------------------

print(f"\nStep 6: Verifying Delta table...")

df_verify    = spark.table(BRONZE_TABLE_FQN)
bronze_count = df_verify.count()

print(f"  Records in Bronze table : {bronze_count:,}")
print(f"  Records from CSV        : {raw_count:,}")
print(f"  Match                   : {'✅ YES' if bronze_count == raw_count else '❌ NO - investigate!'}")

# -----------------------------------------------------------------------------
# 10. QUICK STATS
# -----------------------------------------------------------------------------

print("\n--- BRONZE TABLE SAMPLE (5 rows) ---")
df_verify.select(
    "search_id", "city", "status", "error_type",
    "ingestion_timestamp", "pipeline_version"
).show(5, truncate=False)

print("--- PARTITION CHECK (records per ingestion date) ---")
df_verify.groupBy("ingestion_date") \
    .count() \
    .orderBy("ingestion_date") \
    .show()

print("--- NULL CHECK (raw nulls expected at Bronze layer) ---")
df_verify.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in ["search_id", "user_id", "city", "state",
              "pickup_lat", "pickup_lng", "timestamp"]
]).show()

# -----------------------------------------------------------------------------
# 11. DELTA TABLE HISTORY (time travel!)
# -----------------------------------------------------------------------------

print("--- DELTA TABLE HISTORY ---")
spark.sql(f"DESCRIBE HISTORY {BRONZE_TABLE_FQN}").select(
    "version", "timestamp", "operation", "operationMetrics"
).show(5, truncate=False)

print("\n" + "=" * 60)
print("  Phase 2 Complete! Bronze Layer ready.")
print(f"  Table : {BRONZE_TABLE_FQN}")
print(f"  Rows  : {bronze_count:,}")
print("  Next  : Run 03_silver_layer.py")
print("=" * 60)