In [0]:
# pyspark
from pyspark.sql import functions as F
from pyspark.sql import DataFrame

# stdlib
from datetime import datetime, timezone
import uuid

# raid (project-specific)
from projects.raid.schema.champindex.schema import (
    DATA_SCHEMA,
    BRONZE_COLS,
    SCHEMA_VERSION,
)

# shared logging / ops
from shared.logging.ingest_logging import write_log_best_effort
from shared.logging.ingest_log_constants import IngestStatus
from shared.logging.ingest_log_context import LogContext
from shared.logging.ingest_log_builders import (
    run_event,
    file_success_events,
)

# -----------------------
# Config
# -----------------------
TARGET_TABLE_FQN = "raid.bronze_champindex"
PIPELINE_NAME = "01_ingest_bronze"
LAYER = "bronze"

RUN_ID = str(uuid.uuid4())
PIPELINE_RUN_TS = datetime.now(timezone.utc)

LANDING_BASE = "/Volumes/workspace/raid/champindex"
SOURCE_GLOB = f"{LANDING_BASE}/AccountName=*/champindex_*.csv"

SCHEMA_LOCATION = "/Volumes/workspace/raid/_system/autoloader_schemas/champindex_bronze"
CHECKPOINT_LOCATION = "/Volumes/workspace/raid/_system/checkpoints/01_ingest_bronze/champindex"

CSV_OPTS = {
    "header": "true",
    "sep": ";",
    "quote": '"',
    "escape": '"',
    "mode": "PERMISSIVE",
}

# Initialise log context
LOG_CTX = LogContext(
    target_table_fqn=TARGET_TABLE_FQN,
    layer=LAYER,
    pipeline_name=PIPELINE_NAME,
    run_id=RUN_ID,
    checkpoint_location=CHECKPOINT_LOCATION,
    schema_location=SCHEMA_LOCATION,
    base_context={
        "trigger": "availableNow",
        "source_glob": SOURCE_GLOB,
    },
)

# -----------------------
# Helpers (path parsing)
# -----------------------
source_file_col = F.col("_metadata.file_path")
file_name_col = F.element_at(F.split(source_file_col, "/"), -1)

AccountName_col = F.regexp_extract(source_file_col, r"/AccountName=([^/]+)/", 1)

ddmmyyyy_col = F.regexp_extract(file_name_col, r"^champindex_[^_]+_(\d{8})_\d+\.csv$", 1)
snapshot_date_col = F.try_to_date(ddmmyyyy_col, "ddMMyyyy")

# -----------------------
# Auto Loader read
# -----------------------
raw_stream = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", SCHEMA_LOCATION)
        .option("cloudFiles.rescuedDataColumn", "_rescued_data")
        .option("ignoreMissingFiles", "true")
        .options(**CSV_OPTS)
        .schema(DATA_SCHEMA)
        .load(LANDING_BASE)
)

# Shape bronze once (this DF is what the foreachBatch receives)
bronze_df = (
    raw_stream
        .withColumn("source_file", source_file_col)
        .withColumn("AccountName", AccountName_col)
        .withColumn("snapshot_ts", F.current_timestamp())
        .withColumn("snapshot_date", snapshot_date_col)
        .withColumn("schema_version", F.lit(SCHEMA_VERSION))
        .withColumn("run_id", F.lit(RUN_ID))   # IMPORTANT: before select(*BRONZE_COLS)
        .select(*BRONZE_COLS)
)

# =====================================================================
# END OF INGESTION / TRANSFORMATION LOGIC
# ---------------------------------------------------------------------
# Below this point:
#   - No schema or parsing changes should be made
#   - DataFrames are considered FINAL for writing
#   - Only execution concerns are handled:
#       * writing to bronze
#       * ingestion logging
#       * error handling
# =====================================================================

def process_batch(batch_df: DataFrame, batch_id: int) -> None:
    started_ts = F.current_timestamp()
    run_ts = F.lit(PIPELINE_RUN_TS)

    # EMPTY -> RUN EMPTY then return
    is_empty = batch_df.limit(1).count() == 0
    if is_empty:
        write_log_best_effort(
            run_event(
                spark, LOG_CTX,
                pipeline_run_ts_col=run_ts,
                batch_id=int(batch_id),
                status=IngestStatus.EMPTY,
                started_ts_col=started_ts,
                finished_ts_col=F.current_timestamp(),
                message="No new data files in this batch",
            )
        )
        return

    # RUN STARTED
    write_log_best_effort(
        run_event(
            spark, LOG_CTX,
            pipeline_run_ts_col=run_ts,
            batch_id=int(batch_id),
            status=IngestStatus.STARTED,
            started_ts_col=started_ts,
        )
    )

    try:
        # Write bronze (unchanged)
        (
            batch_df.write
              .format("delta")
              .mode("append")
              .saveAsTable(TARGET_TABLE_FQN)
        )

        finished_ts = F.current_timestamp()

        # FILE SUCCESS rows
        write_log_best_effort(
            file_success_events(
                batch_df, LOG_CTX,
                pipeline_run_ts_col=run_ts,
                batch_id=int(batch_id),
                started_ts_col=started_ts,
                finished_ts_col=finished_ts,
                rescued_col="_rescued_data",
            )
        )

        # RUN SUCCESS
        write_log_best_effort(
            run_event(
                spark, LOG_CTX,
                pipeline_run_ts_col=run_ts,
                batch_id=int(batch_id),
                status=IngestStatus.SUCCESS,
                started_ts_col=started_ts,
                finished_ts_col=finished_ts,
                message="Batch completed successfully",
            )
        )

    except Exception as e:
        finished_ts = F.current_timestamp()

        write_log_best_effort(
            run_event(
                spark, LOG_CTX,
                pipeline_run_ts_col=run_ts,
                batch_id=int(batch_id),
                status=IngestStatus.FAILED,
                started_ts_col=started_ts,
                finished_ts_col=finished_ts,
                message=str(e)[:4000],
                error_class=e.__class__.__name__,
            )
        )
        raise

query = (
    bronze_df.writeStream
      .foreachBatch(process_batch)
      .option("checkpointLocation", CHECKPOINT_LOCATION)
      .trigger(availableNow=True)
      .start()
)

query.awaitTermination()


In [0]:
%sql
SELECT * 
FROM ops.ingest_log