In [None]:
# 1. Read each *staged* local file (from `pending_ingest_queue`), detect the institution id column,
# 2. extract unique institution IDs, and emit per-institution work items.

# Constraints:
# - NO SFTP connection
# - NO API calls
# - NO volume writes

#Input table:
#- `staging_sst_01.default.pending_ingest_queue`

#Output table:
#- `staging_sst_01.default.institution_ingest_plan`
#- Columns: `file_fingerprint`, `file_name`, `local_path`, `institution_id`, `inst_col`, `file_size`, `file_modified_time`, `planned_at`


In [0]:
%pip install pandas python-box pyyaml paramiko
%restart_python

In [0]:
import os
import re
import yaml
from box import Box
from datetime import datetime, timezone

from pyspark.sql import functions as F
from pyspark.sql import types as T
from databricks.connect import DatabricksSession

from helper import CustomLogger, ensure_plan_table, extract_institution_ids

try:
    dbutils  # noqa: F821
except NameError:
    from unittest.mock import MagicMock

    dbutils = MagicMock()
spark = DatabricksSession.builder.getOrCreate()


In [0]:
logger = CustomLogger()

# Config (kept consistent with prior notebooks)
with open("gcp_config.yaml", "rb") as f:
    _cfg = Box(yaml.safe_load(f))

CATALOG = "staging_sst_01"
DEFAULT_SCHEMA = "default"

QUEUE_TABLE = f"{CATALOG}.{DEFAULT_SCHEMA}.pending_ingest_queue"
PLAN_TABLE = f"{CATALOG}.{DEFAULT_SCHEMA}.institution_ingest_plan"

logger.info("Loaded config and initialized logger.")


In [0]:
# moved to helper.py: ensure_plan_table


In [0]:
# moved to helper.py: normalize_col


In [0]:
# Same hard-coded renames from the current script (kept identical)
RENAMES = {
    "attemptedgatewaymathyear1": "attempted_gateway_math_year_1",
    "attemptedgatewayenglishyear1": "attempted_gateway_english_year_1",
    "completedgatewaymathyear1": "completed_gateway_math_year_1",
    "completedgatewayenglishyear1": "completed_gateway_english_year_1",
    "gatewaymathgradey1": "gateway_math_grade_y_1",
    "gatewayenglishgradey1": "gateway_english_grade_y_1",
    "attempteddevmathy1": "attempted_dev_math_y_1",
    "attempteddevenglishy1": "attempted_dev_english_y_1",
    "completeddevmathy1": "completed_dev_math_y_1",
    "completeddevenglishy1": "completed_dev_english_y_1",
}

INST_COL_PATTERN = re.compile(r"(?=.*institution)(?=.*id)", re.IGNORECASE)

# moved to helper.py: detect_institution_column


In [0]:
# moved to helper.py: extract_institution_ids


In [0]:
ensure_plan_table(spark, PLAN_TABLE)

# Pull queued staged files (Script 1 output)
if not spark.catalog.tableExists(QUEUE_TABLE):
    logger.info(f"Queue table {QUEUE_TABLE} not found. Exiting (no-op).")
    dbutils.notebook.exit("NO_QUEUE_TABLE")

queue_df = spark.read.table(QUEUE_TABLE)

if queue_df.limit(1).count() == 0:
    logger.info("pending_ingest_queue is empty. Exiting (no-op).")
    dbutils.notebook.exit("NO_QUEUED_FILES")


In [0]:
# Avoid regenerating plans for files already expanded
existing_fp = (
    spark.table(PLAN_TABLE).select("file_fingerprint").distinct()
    if spark.catalog.tableExists(PLAN_TABLE)
    else None
)
if existing_fp is not None:
    queue_df = queue_df.join(existing_fp, on="file_fingerprint", how="left_anti")

if queue_df.limit(1).count() == 0:
    logger.info(
        "All queued files have already been expanded into institution work items. Exiting (no-op)."
    )
    dbutils.notebook.exit("NO_NEW_EXPANSION_WORK")


In [0]:
queued_files = queue_df.select(
    "file_fingerprint",
    "file_name",
    F.col("local_tmp_path").alias("local_path"),
    "file_size",
    "file_modified_time",
).collect()

logger.info(
    f"Expanding {len(queued_files)} staged file(s) into per-institution work items..."
)

work_items = []
missing_files = []

for r in queued_files:
    fp = r["file_fingerprint"]
    file_name = r["file_name"]
    local_path = r["local_path"]

    if not local_path or not os.path.exists(local_path):
        missing_files.append((fp, file_name, local_path))
        continue

    try:
        inst_col, inst_ids = extract_institution_ids(
            local_path, renames=RENAMES, inst_col_pattern=INST_COL_PATTERN
        )
        if inst_col is None:
            logger.warning(
                f"No institution id column found for file={file_name} fp={fp}. Skipping this file."
            )
            continue

        if not inst_ids:
            logger.warning(
                f"Institution column found but no IDs present for file={file_name} fp={fp}. Skipping."
            )
            continue

        now_ts = datetime.now(timezone.utc)
        for inst_id in inst_ids:
            work_items.append(
                {
                    "file_fingerprint": fp,
                    "file_name": file_name,
                    "local_path": local_path,
                    "institution_id": inst_id,
                    "inst_col": inst_col,
                    "file_size": r["file_size"],
                    "file_modified_time": r["file_modified_time"],
                    "planned_at": now_ts,
                }
            )

        logger.info(
            f"file={file_name} fp={fp}: found {len(inst_ids)} institution id(s) using column '{inst_col}'"
        )

    except Exception as e:
        logger.exception(f"Failed expanding file={file_name} fp={fp}: {e}")
        # We don't write manifests here per your division; fail fast so workflow can surface issue.
        raise


In [0]:
if missing_files:
    # This usually indicates the cluster changed or /tmp was cleared.
    # Fail fast so the workflow stops (downstream cannot proceed without the staged files).
    msg = (
        "Some staged files are missing on disk (likely /tmp cleared or different cluster). "
        + "; ".join([f"fp={fp} file={fn} path={lp}" for fp, fn, lp in missing_files])
    )
    logger.error(msg)
    raise FileNotFoundError(msg)

if not work_items:
    logger.info("No work items generated from staged files. Exiting (no-op).")
    dbutils.notebook.exit("NO_WORK_ITEMS")

schema = T.StructType(
    [
        T.StructField("file_fingerprint", T.StringType(), False),
        T.StructField("file_name", T.StringType(), False),
        T.StructField("local_path", T.StringType(), False),
        T.StructField("institution_id", T.StringType(), False),
        T.StructField("inst_col", T.StringType(), False),
        T.StructField("file_size", T.LongType(), True),
        T.StructField("file_modified_time", T.TimestampType(), True),
        T.StructField("planned_at", T.TimestampType(), False),
    ]
)

df_plan = spark.createDataFrame(work_items, schema=schema)
df_plan.createOrReplaceTempView("incoming_plan_rows")

# Idempotent upsert: unique per (file_fingerprint, institution_id)
spark.sql(
    f"""
    MERGE INTO {PLAN_TABLE} AS t
    USING incoming_plan_rows AS s
    ON  t.file_fingerprint = s.file_fingerprint
    AND t.institution_id   = s.institution_id
    WHEN MATCHED THEN UPDATE SET
      t.file_name          = s.file_name,
      t.local_path         = s.local_path,
      t.inst_col           = s.inst_col,
      t.file_size          = s.file_size,
      t.file_modified_time = s.file_modified_time,
      t.planned_at         = s.planned_at
    WHEN NOT MATCHED THEN INSERT *
    """
)

count_out = df_plan.count()
logger.info(f"Wrote/updated {count_out} institution work item(s) into {PLAN_TABLE}.")
dbutils.notebook.exit(f"WORK_ITEMS={count_out}")
