In [3]:
from pyspark.sql.functions import to_date, col, lit


control_df = spark.table("cdc_control") \
                  .filter("name = 'nb_etl_silver'")
last_version = control_df.collect()[0]["last_processed_version"]

current_version = spark.sql("DESCRIBE HISTORY tbl_bronze_raw")\
                       .selectExpr("max(version) as v")\
                       .first()["v"]

if current_version > last_version:

    changes_df = spark.sql(f"""
    select  
        TRY_CAST(SUBSTRING(order_id, 7) AS INT) as order_id, 
        CAST(TIMESTAMP as DATE) as partition_date,
        timestamp,
        TRY_CAST(SUBSTRING(restaurant_id, 12) AS INT) as restaurant_id, 
        TRY_CAST((CASE register_type
            WHEN 'drive'      THEN 1
            WHEN 'kiosk'      THEN 2
            WHEN 'stationary' THEN 3
            ELSE 0
        END) AS INT) AS register_type_id,
        CAST(substring_index(register_id, '_', -1) AS INT) as register_id,
        CAST(substring_index(employee_id, '_', -1) AS INT) AS employee_id,
        TRY_CAST((CASE order_size
            WHEN 'small'      THEN 1
            WHEN 'large'      THEN 2
            ELSE 0
        END) AS INT) AS order_size_id,
        items, 
        total_amount, 
        processing_time, 
        IsWeather 
    FROM table_changes('tbl_bronze_raw', {last_version}) tc
    WHERE _change_type = 'insert'
    AND total_amount >= 0
    """)

    orders_df = changes_df.drop("items", "IsWeather")

    
    # extract fields list from struct<…> items.
    item_struct = changes_df.schema["items"].dataType
    item_names  = [f.name for f in item_struct.fields]

    # build the list of columns for SELECT
    select_cols = [col("order_id")]  # order_id everywhere
    for name in item_names:
        select_cols.append(
            col(f"items.{name}").alias(f"{name}_qty")
        )

    # create generic items_df
    items_df = changes_df.select(*select_cols, col("partition_date"))


    weather_struct = changes_df.schema["IsWeather"].dataType
    weather_names  = [f.name for f in weather_struct.fields]

    select_cols = [col("order_id")]  # order_id everywhere

    for name in weather_names:
        select_cols.append(
            col(f"IsWeather.{name}").alias(name)
        )
    weather_df = changes_df.select(*select_cols, col("partition_date"))    

    batch_id = int(current_version)

    # add batch_id to all df
    orders_df  = orders_df.withColumn("batch_id",  lit(batch_id))
    items_df   = items_df.withColumn("batch_id",   lit(batch_id))
    weather_df = weather_df.withColumn("batch_id", lit(batch_id))

    written = []  # track of table names that have already been written

    try:
        orders_df.write \
            .format("delta") \
            .mode("append") \
            .option("mergeSchema", "false") \
            .saveAsTable("stg_silver_orders")

        written.append("stg_silver_orders")
        
        items_df.write \
            .format("delta") \
            .mode("append") \
            .option("mergeSchema", "false") \
            .saveAsTable("stg_silver_items")

        written.append("stg_silver_items")

        weather_df.write \
            .format("delta") \
            .mode("append") \
            .option("mergeSchema", "false") \
            .saveAsTable("stg_silver_weather")

        written.append("stg_silver_weather")

        spark.sql(f"""
        UPDATE cdc_control
        SET last_processed_version = {current_version}
        WHERE name = 'nb_etl_silver'
        """)

    except Exception as e:
        print(f"[ERROR] Batch {batch_id} failed, rolling back: {e}")
        # selective rollback only for the tables that were successfully saved
        for t in written:
            spark.sql(f"DELETE FROM {t} WHERE batch_id = {batch_id}")
        raise

else:
    print("No new changes to process.")    

StatementMeta(, 7f2df7cd-5ee7-4393-a651-dfca05d9676b, 5, Finished, Available, Finished)