In [0]:
from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.functions import explode, col
 
# -------------------------------
# Step 1: Capture Start Time
# -------------------------------
start_time = datetime.now()
status = "Success"
error_message = ""
 
# -------------------------------
# Step 2: Try Block for Ingestion
# -------------------------------
try:
    # Retrieve Parameters
    file_name = dbutils.widgets.get("p_file_name")
    base_name = file_name.split('.')[0]
    table_name = f"bronze_{base_name}"
 
    # Define Paths
    input_path = "abfss://sedpcontainer@sedpstorageaccount.dfs.core.windows.net/raw"
    checkpoint_path = f"abfss://sedpcontainer@sedpstorageaccount.dfs.core.windows.net/_checkpoint/{base_name}"
    output_path = f"abfss://sedpcontainer@sedpstorageaccount.dfs.core.windows.net/Bronze/{base_name}_delta"
    schema_path = f"abfss://sedpcontainer@sedpstorageaccount.dfs.core.windows.net/schema_location/{base_name}"
 
    # Read Stream with Auto Loader
    df = (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.schemaLocation", schema_path)
            .option("cloudFiles.schemaHints", "meter_id STRING, location STRUCT<city: STRING, region: STRING>, readings ARRAY<STRUCT<kWh: DOUBLE, timestamp: STRING>>")
            .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
            .load(input_path)
    )
 
    # Flatten Nested JSON
    df_flat = df.select(
        col("meter_id"),
        col("location.city").alias("city"),
        col("location.region").alias("region"),
        explode("readings").alias("reading")
    ).select(
        "meter_id", "city", "region",
        col("reading.timestamp").alias("timestamp"),
        col("reading.kWh").alias("kWh")
    )
 
    # Write to Bronze Delta Table
    query = (
        df_flat.writeStream
            .format("delta")
            .outputMode("append")
            .trigger(availableNow=True)
            .option("mergeSchema", "true")
            .option("checkpointLocation", checkpoint_path)
            .start(output_path)
    )
    query.awaitTermination()
 
    # Register Delta Table
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    spark.sql(f"CREATE TABLE {table_name} USING DELTA LOCATION '{output_path}'")
    spark.sql(f"SELECT * FROM {table_name}").show()
 
except Exception as e:
    status = "Failed"
    error_message = str(e)
    print(f"Ingestion failed: {error_message}")
    raise
 
# -------------------------------
# Step 3: Audit Logging
# -------------------------------
finally:
    end_time = datetime.now()
 
    audit_row = [Row(
        SourceType="NDJSON",
        FileName=file_name,
        Status=status,
        StartTime=start_time,
        EndTime=end_time
    )]
    audit_df = spark.createDataFrame(audit_row)
 
    # Write audit log to Delta
    audit_log_path = "abfss://sedpcontainer@sedpstorageaccount.dfs.core.windows.net/audit_logs/Pipeline_Run_Audit_Delta"
    audit_df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .save(audit_log_path)
 
    # Register audit table (if not already registered)
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS Pipeline_Run_Audit_Delta
        USING DELTA
        LOCATION '{audit_log_path}'
    """)
 
    # Optional: Preview audit log
    spark.sql("SELECT * FROM Pipeline_Run_Audit_Delta").show()

+--------+------------+------+--------------------+----+
|meter_id|        city|region|           timestamp| kWh|
+--------+------------+------+--------------------+----+
| MT00001|  Manchester|    UK|2025-11-13T08:00:00Z|2.42|
| MT00001|  Manchester|    UK|2025-11-13T08:15:00Z|1.74|
| MT00001|  Manchester|    UK|2025-11-13T08:30:00Z|2.55|
| MT00001|  Manchester|    UK|2025-11-13T08:45:00Z|1.81|
| MT00002|Buenos Aires| LATAM|2025-11-13T08:00:00Z|2.76|
| MT00002|Buenos Aires| LATAM|2025-11-13T08:15:00Z|2.53|
| MT00002|Buenos Aires| LATAM|2025-11-13T08:30:00Z|1.91|
| MT00002|Buenos Aires| LATAM|2025-11-13T08:45:00Z|1.76|
| MT00003|       Paris|    EU|2025-11-13T08:00:00Z|2.98|
| MT00003|       Paris|    EU|2025-11-13T08:15:00Z|2.81|
| MT00003|       Paris|    EU|2025-11-13T08:30:00Z|1.68|
| MT00003|       Paris|    EU|2025-11-13T08:45:00Z|2.26|
| MT00004|   S達o Paulo| LATAM|2025-11-13T08:00:00Z|1.64|
| MT00004|   S達o Paulo| LATAM|2025-11-13T08:15:00Z|2.43|
| MT00004|   S達o Paulo| LATAM|2