###### **Parameters**

In [None]:
# Input values here
source_layer: str = ""            # e.g. 'bronze', 'silver', 'gold'
source_system: str = ""           # e.g. 'lcvista', 'intacct'
source_table_name: str = ""       # logical name, e.g. 'staff'
target_table_name: str = ""       # e.g. 'lcvista.staff'
primary_keys_json: str = ""       # JSON array string, e.g. '["id"]'
watermark_column: str = ""        # optional, e.g. 'modified'
max_watermark_cuttoff: str = ""   # optional ISO UTC, e.g. '2023-12-01T00:00:00Z'
schema_json: str = ""             # optional StructType JSON string
transform_mode: str = "PASS"      # PASS | FLATTEN | CUSTOM
transform_notebook: str = ""      # optional if transform_mode='CUSTOM'

# Set to False to skip the final display
preview: bool = True


###### **Dynamic Path Resolution**

In [None]:
ctx = notebookutils.runtime.context
wsid = ctx.get("currentWorkspaceId")

if not wsid:
    raise ValueError("Missing `currentWorkspaceId` from notebook runtime context.")

try:
    ops_artifact = notebookutils.lakehouse.get(name="Ops", workspaceId=wsid)
except Exception as e:
    raise RuntimeError(
        f"Could not resolve `Ops` lakehouse in this workspace ({wsid=}). Error: {repr(e)}"
    ) from e

if ops_artifact is None:
    raise ValueError("Ops artifact is None")

props = ops_artifact.get("properties") or {}
abfs = props.get("abfsPath")
if not abfs:
    raise ValueError("Ops artifact missing `properties.abfsPath`")

# Delta table path (schema-enabled lakehouse): Tables/<schema>/<table>
table_specs_path = f"{abfs}/Tables/ops/table_specs"

In [None]:
# Imports
import json
from datetime import datetime, timezone

from delta.tables import DeltaTable
from pyspark.sql import functions as F, types as T


def _table_specs_schema() -> T.StructType:
    return T.StructType(
        [
            T.StructField("source_layer", T.StringType(), nullable=False),
            T.StructField("source_system", T.StringType(), nullable=False),
            T.StructField("source_table_name", T.StringType(), nullable=False),
            T.StructField("target_table_name", T.StringType(), nullable=False),
            T.StructField("primary_keys_json", T.StringType(), nullable=False),
            T.StructField("watermark_column", T.StringType(), nullable=True),
            T.StructField("max_watermark_cuttoff", T.TimestampType(), nullable=True),
            T.StructField("schema_json", T.StringType(), nullable=True),
            T.StructField("transform_mode", T.StringType(), nullable=False),
            T.StructField("transform_notebook", T.StringType(), nullable=True),
        ]
    )


def _parse_iso_utc_ts(value: str | None) -> datetime | None:
    """Parse ISO UTC timestamps like '2023-12-01T00:00:00Z' (or with fractional seconds).
    Returns a naive datetime in UTC (Spark-friendly), or None if blank.
    """
    s = (value or "").strip()
    if not s:
        return None
    # Accept trailing 'Z' shorthand for UTC
    if s.endswith("Z"):
        s = s[:-1] + "+00:00"
    try:
        dt = datetime.fromisoformat(s)
    except ValueError as e:
        raise ValueError(
            f"Invalid ISO timestamp for max_watermark_cuttoff: {value!r}. "
            "Expected e.g. '2023-12-01T00:00:00Z'"
        ) from e

    if dt.tzinfo is None:
        # Assume naive input is already UTC
        return dt
    return dt.astimezone(timezone.utc).replace(tzinfo=None)


def _normalize_json_array_of_strings(value: str) -> str:
    """Validate and normalize PK JSON like '["id", "line_no"]'."""
    s = (value or "").strip()
    if not s:
        raise ValueError("primary_keys_json is required (non-empty JSON array string).")
    try:
        arr = json.loads(s)
    except json.JSONDecodeError as e:
        raise ValueError(f"primary_keys_json is not valid JSON: {value!r}") from e
    if not isinstance(arr, list) or not arr or not all(isinstance(x, str) and x.strip() for x in arr):
        raise ValueError(
            "primary_keys_json must be a non-empty JSON array of non-empty strings, "
            "e.g. ['id'] or ['id','line_no']"
        )
    return json.dumps([x.strip() for x in arr])


def _req(name: str, value: str) -> str:
    v = (value or "").strip()
    if not v:
        raise ValueError(f"{name} is required")
    return v


# Required values
source_layer = _req("source_layer", source_layer)
source_system = _req("source_system", source_system)
source_table_name = _req("source_table_name", source_table_name)
target_table_name = _req("target_table_name", target_table_name)
primary_keys_json = _normalize_json_array_of_strings(primary_keys_json)

# Optional values
watermark_column = (watermark_column or "").strip() or None
max_watermark_dt = _parse_iso_utc_ts(max_watermark_cuttoff)
schema_json = (schema_json or "").strip() or None
transform_mode = (transform_mode or "").strip().upper() or "PASS"
transform_notebook = (transform_notebook or "").strip() or None

valid_modes = {"PASS", "FLATTEN", "CUSTOM"}
if transform_mode not in valid_modes:
    raise ValueError(f"transform_mode must be one of {sorted(valid_modes)}")
if transform_mode == "CUSTOM" and not transform_notebook:
    raise ValueError("transform_notebook is required when transform_mode='CUSTOM'")


# Single-row source DF (the new/updated spec row)
src_df = spark.createDataFrame(
    [
        {
            "source_layer": source_layer,
            "source_system": source_system,
            "source_table_name": source_table_name,
            "target_table_name": target_table_name,
            "primary_keys_json": primary_keys_json,
            "watermark_column": watermark_column,
            "max_watermark_cuttoff": max_watermark_dt,
            "schema_json": schema_json,
            "transform_mode": transform_mode,
            "transform_notebook": transform_notebook,
        }
    ],
    schema=_table_specs_schema(),
)

display(src_df)  # preview row to be merged


###### **Merge Data**

In [None]:
# Target delta table at ABFSS path
tgt = DeltaTable.forPath(spark, table_specs_path)

merge_cond = (
    "t.source_layer = s.source_layer "
    "AND t.source_system = s.source_system "
    "AND t.source_table_name = s.source_table_name"
)

(
    tgt.alias("t")
       .merge(src_df.alias("s"), merge_cond)
       .whenMatchedUpdate(set={
            "target_table_name": "s.target_table_name",
            "primary_keys_json": "s.primary_keys_json",
            "watermark_column": "s.watermark_column",
            "max_watermark_cuttoff": "s.max_watermark_cuttoff",
            "schema_json": "s.schema_json",
            "transform_mode": "s.transform_mode",
            "transform_notebook": "s.transform_notebook",
            "_updated_ts": "current_timestamp()",
        })
       .whenNotMatchedInsert(values={
            "source_layer": "s.source_layer",
            "source_system": "s.source_system",
            "source_table_name": "s.source_table_name",
            "target_table_name": "s.target_table_name",
            "primary_keys_json": "s.primary_keys_json",
            "watermark_column": "s.watermark_column",
            "max_watermark_cuttoff": "s.max_watermark_cuttoff",
            "schema_json": "s.schema_json",
            "transform_mode": "s.transform_mode",
            "transform_notebook": "s.transform_notebook",
            "_created_ts": "current_timestamp()",
            "_updated_ts": "current_timestamp()",
        })
       .execute()
)


###### **Preview Table**

In [None]:
if not preview:
    notebookutils.notebook.exit("")

df = spark.read.format("delta").load(table_specs_path)
display(df)
