In [0]:
from __future__ import annotations
import json
from typing import Any, Mapping
from pyspark.sql import functions as F

from dbx_utils.logging import getLogger

logger = getLogger(__name__)

# --------------------------------------------------------------------
# Widgets / Inputs
# --------------------------------------------------------------------
# Same contract as "3. download api":
# - endpoint_payload: one element from the For Each array (id, endpoint, params, job_settings)
# - run_folder: base folder created in step 1, e.g. "/Volumes/cat/schema/vol/20251210123456"
dbutils.widgets.text("endpoint_payload", "")
dbutils.widgets.text("run_folder", "")

In [0]:
endpoint_payload_raw = dbutils.widgets.get("endpoint_payload")
run_folder = dbutils.widgets.get("run_folder")

if not endpoint_payload_raw:
    raise ValueError("Widget 'endpoint_payload' is required (JSON string).")

if not run_folder:
    raise ValueError("Widget 'run_folder' is required (base volume path for this run).")


# --------------------------------------------------------------------
# Helper: resolve target table from job_settings
# --------------------------------------------------------------------
def _resolve_target_table(job_settings: Mapping[str, Any]) -> str:
    """
    Resolve the fully-qualified target table name from job_settings.

    Supported patterns (all optional, but at least one must work):

    1) Fully-qualified:
        job_settings["target_table"] = "catalog.schema.table"

    2) Split fields:
        job_settings["target_catalog"] / job_settings["catalog"]
        job_settings["target_schema"]  / job_settings["schema"]
        job_settings["target_table"]   / job_settings["table"]

    Raises ValueError if a valid full name cannot be constructed.
    """
    # 1) Direct fully-qualified target_table
    tt = job_settings.get("target_table")
    if isinstance(tt, str) and "." in tt:
        # Assume user provided "catalog.schema.table" or "schema.table"
        return tt

    # 2) Split fields with fallbacks
    catalog = job_settings.get("target_catalog") or job_settings.get("catalog")
    schema = job_settings.get("target_schema") or job_settings.get("schema")
    table = (
        job_settings.get("target_table")
        or job_settings.get("table")
    )

    if catalog and schema and table:
        return f"{catalog}.{schema}.{table}"

    raise ValueError(
        "Could not resolve target Delta table from job_settings. "
        "Expected either 'target_table' with a qualified name, or "
        "a combination of target_catalog/catalog, target_schema/schema, "
        "and target_table/table."
    )


# --------------------------------------------------------------------
# Parse payload
# --------------------------------------------------------------------
try:
    payload: Mapping[str, Any] = json.loads(endpoint_payload_raw)
except json.JSONDecodeError as exc:
    raise ValueError(
        f"Failed to parse 'endpoint_payload' as JSON: {endpoint_payload_raw!r}"
    ) from exc

endpoint_id = payload.get("id")
endpoint = payload.get("endpoint")
params = payload.get("params") or {}
job_settings = payload.get("job_settings") or {}

if endpoint_id is None:
    raise ValueError("Endpoint payload is missing required field 'id'.")
if not endpoint:
    raise ValueError("Endpoint payload is missing required field 'endpoint'.")

logger.info("Starting load-to-Delta for endpoint id=%s", endpoint_id)
logger.info("Endpoint URL: %s", endpoint)
logger.info("Run folder: %s", run_folder)

# Target Delta table from job_settings
target_table = _resolve_target_table(job_settings)
logger.info("Resolved target Delta table: %s", target_table)

# --------------------------------------------------------------------
# Determine folder that contains the downloaded JSON files
# --------------------------------------------------------------------
# Must match the logic used in "3. download api"
output_subfolder = job_settings.get("output_subfolder") or str(endpoint_id)
download_folder = f"{run_folder}/{output_subfolder}"

logger.info(
    "Using download folder for endpoint id=%s: %s",
    endpoint_id,
    download_folder,
)

# Quick check: does the folder exist and contain anything?
try:
    files = dbutils.fs.ls(download_folder)
except Exception:
    logger.warning(
        "Download folder %s does not exist or is not accessible. Skipping.",
        download_folder,
    )
    dbutils.notebook.exit(
        json.dumps(
            {
                "endpoint_id": endpoint_id,
                "endpoint": endpoint,
                "download_folder": download_folder,
                "target_table": target_table,
                "status": "skipped_missing_folder",
            }
        )
    )

if not files:
    logger.warning(
        "Download folder %s is empty. Nothing to load into %s.",
        download_folder,
        target_table,
    )
    dbutils.notebook.exit(
        json.dumps(
            {
                "endpoint_id": endpoint_id,
                "endpoint": endpoint,
                "download_folder": download_folder,
                "target_table": target_table,
                "status": "skipped_empty_folder",
            }
        )
    )

# --------------------------------------------------------------------
# Read all JSON files as RAW TEXT (one row per file)
# --------------------------------------------------------------------
# We *intentionally* do NOT try to infer JSON schema here.
# We read each file as a single string so the table schema is stable:
#   - raw_json: string
#   - source_file: string
#   - endpoint_id: long
#   - endpoint: string
#   - params_json: string
#   - job_settings_json: string
#   - ingested_at: timestamp
logger.info(
    "Reading downloaded JSON files as raw text from %s",
    download_folder,
)

df_raw = (
    spark.read.format("text")
    .option("wholetext", "true")  # one row per file
    .load(download_folder)
    .withColumnRenamed("value", "raw_json")
)

# Add metadata columns
df_raw = (
    df_raw
    .withColumn("source_file", F.input_file_name())
    .withColumn("endpoint_id", F.lit(int(endpoint_id)))
    .withColumn("endpoint", F.lit(str(endpoint)))
    .withColumn("params_json", F.lit(json.dumps(params)))
    .withColumn("job_settings_json", F.lit(json.dumps(job_settings)))
    .withColumn("ingested_at", F.current_timestamp())
)

row_count = df_raw.count()
logger.info(
    "Prepared %d rows for loading into Delta table %s from folder %s",
    row_count,
    target_table,
    download_folder,
)

if row_count == 0:
    logger.warning(
        "No rows produced from %s after reading as text. Skipping write.",
        download_folder,
    )
    dbutils.notebook.exit(
        json.dumps(
            {
                "endpoint_id": endpoint_id,
                "endpoint": endpoint,
                "download_folder": download_folder,
                "target_table": target_table,
                "status": "skipped_no_rows",
            }
        )
    )

# --------------------------------------------------------------------
# Write into Delta table (APPEND, stable string-only schema)
# --------------------------------------------------------------------
logger.info(
    "Writing %d rows into Delta table %s (mode=append)",
    row_count,
    target_table,
)

(
    df_raw.write
    .format("delta")
    .mode("append")  # create table if it doesn't exist, append otherwise
    .saveAsTable(target_table)
)

logger.info(
    "Completed load-to-Delta for endpoint id=%s into table %s.",
    endpoint_id,
    target_table,
)

result_payload = {
    "endpoint_id": endpoint_id,
    "endpoint": endpoint,
    "download_folder": download_folder,
    "target_table": target_table,
    "rows_written": row_count,
    "status": "success",
}

dbutils.notebook.exit(json.dumps(result_payload))
