In [0]:
# ==============================================================================
# DATABRICKS NOTEBOOK: 01_Ingest_Static_Bronze
# ==============================================================================
# DESCRIPTION:
#   Generic ingestion utility to load raw CSV data from S3 into the Bronze Layer
#   of the Delta Lakehouse. This script implements the "External Table" pattern,
#   decoupling storage (S3) from metadata (Unity Catalog).
#
# USAGE:
#   Designed to be triggered via Databricks Workflows or Airflow.
#   Requires the following parameters to be passed at runtime.
#
# PARAMETERS:
#   - s3_source_path: Full S3 URI of the source CSV file (e.g., s3://bucket/raw/file.csv)
#   - s3_target_path: Target S3 URI where Delta files will be stored.
#   - table_name:     The logical name of the table in the Catalog.
#   - catalog_name:   Target Unity Catalog name (Default: movielens)
#   - schema_name:    Target Schema name (Default: bronze)
# ==============================================================================

from pyspark.sql.utils import AnalysisException

# -------------------------------------------------------------------------
# 1. CONFIGURATION & WIDGET DEFINITION
# -------------------------------------------------------------------------
# Define input widgets to accept parameters from the Orchestrator (Airflow/Jobs).
dbutils.widgets.text("s3_source_path", "", "Source S3 URI (CSV)")
dbutils.widgets.text("s3_target_path", "", "Target S3 URI (Delta Location)")
dbutils.widgets.text("table_name", "", "Target Table Name")
dbutils.widgets.text("catalog_name", "movielens", "Catalog")
dbutils.widgets.text("schema_name", "bronze", "Schema")

# -------------------------------------------------------------------------
# 2. PARAMETER RETRIEVAL & VALIDATION
# -------------------------------------------------------------------------
# Fetch values from the runtime context
s3_source_path = dbutils.widgets.get("s3_source_path")
s3_target_path = dbutils.widgets.get("s3_target_path")
table_name = dbutils.widgets.get("table_name")
catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")

# Construct the fully qualified table name (Three-Level Namespace)
full_table_name = f"{catalog_name}.{schema_name}.{table_name}"

# Guardrails: Ensure strict adherence to architectural standards (S3 Only)
if not s3_source_path.startswith("s3://"):
    raise ValueError(f"CONFIGURATION ERROR: Invalid Source Path '{s3_source_path}'. Must be a valid s3:// URI.")

if not s3_target_path.startswith("s3://"):
    raise ValueError(f"CONFIGURATION ERROR: Invalid Target Path '{s3_target_path}'. Must be a valid s3:// URI.")

print(f"[START] Job initialized for table: {full_table_name}")
print(f"Source: {s3_source_path}")
print(f"Target: {s3_target_path}")

# -------------------------------------------------------------------------
# 3. INGESTION LOGIC (PHYSICAL STORAGE)
# -------------------------------------------------------------------------
def ingest_bronze_layer():
    """
    Reads raw CSV data and persists it to S3 in Delta format.
    
    Architecture Decision:
    - Mode 'FAILFAST': Ensures strict data quality at the gate. If the CSV is malformed, 
      the pipeline halts immediately rather than ingesting corrupt data.
    - Write Mode 'OVERWRITE': Since this handles static reference data (Dimensions), 
      we replace the dataset entirely to ensure idempotency.
    """
    try:
        print(f"‚è≥ Reading raw data from S3...")
        
        # Read Source: strict CSV parsing
        df = (
            spark.read
                 .format("csv")
                 .option("header", "true")
                 # Fail immediately if a row doesn't match the header structure
                 .option("mode", "FAILFAST") 
                 .load(s3_source_path)
        )

        print(f"Persisting data to Delta Lake (Storage)...")
        
        # Write to Storage: Decoupled from Metadata
        (
            df.write
              .format("delta")
              .mode("overwrite") 
              .save(s3_target_path)
        )

        print(f"[SUCCESS] Delta Parquet files written to: {s3_target_path}")

    except Exception as e:
        print(f"[ERROR] Ingestion failed for {table_name}.")
        # Re-raise exception to ensure Airflow/Databricks Job marks the task as FAILED
        raise e

# Execute the ingestion function
ingest_bronze_layer()

# -------------------------------------------------------------------------
# 4. METADATA REGISTRATION (UNITY CATALOG)
# -------------------------------------------------------------------------
# Register the S3 location as a Table in the Metastore.
# This separates the "Compute" (Spark) from the "Definition" (SQL).
print(f"Registering table in Unity Catalog...")

try:
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {full_table_name}
        USING DELTA
        LOCATION '{s3_target_path}'
    """)
    print(f"[SUCCESS] External table registered: {full_table_name}")

except AnalysisException as e:
    print(f"[ERROR] Metastore registration failed.")
    raise e