In [None]:
#!/usr/bin/env python
# coding: utf-8

# ============================================================
# nb_gold_fact_transactions_v1_0_final — Gold Fact: Transactions
#
# Contract-first + data-driven execution (banking-ready)
# - No runtime args / widgets
# - Reads execution context from gold_log_steps (latest RUNNING for this notebook)
# - Option A: keep error_code; derive is_chip_used; keep is_success
# - Dedup keep_latest by ingestion_ts
# - Strict dimensional conformance:
#     ORPHAN_DIM_USER, ORPHAN_DIM_CARD, ORPHAN_DIM_MCC, ORPHAN_DIM_DATE
#   Non-conforming rows excluded from Gold and logged as anomalies (append-only)
# - Idempotent rebuild (TRUNCATE + APPEND) partitioned by txn_month
# - Returns deterministic metrics to dispatcher via exit_payload
# ============================================================

# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.

In [None]:
%run ./nb_gold_utils

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import json

# -----------------------------
# 0) Data-driven execution context
# -----------------------------
LOG_STEPS_TABLE = "gold_log_steps"
THIS_NOTEBOOK   = "nb_gold_fact_transactions"

def _json_load_safe(s: str) -> dict:
    try:
        return json.loads(s) if s else {}
    except Exception:
        return {}

def read_ctx_from_steps() -> dict:
    df = (
        spark.table(LOG_STEPS_TABLE)
        .filter((F.col("notebook_name") == THIS_NOTEBOOK) & (F.col("status") == "RUNNING"))
        .orderBy(F.col("start_ts").desc())
        .limit(1)
    )
    rows = df.collect()
    if not rows:
        raise ValueError(
            f"[{THIS_NOTEBOOK}] No RUNNING step found in {LOG_STEPS_TABLE}. "
            "Dispatcher must write RUNNING ctx before execution."
        )

    payload = _json_load_safe(rows[0]["payload_json"])
    ctx = payload.get("ctx", {})
    if not isinstance(ctx, dict) or not ctx:
        raise ValueError(f"[{THIS_NOTEBOOK}] RUNNING step payload_json has no ctx.")

    # Hard validations (banking-grade)
    if str(ctx.get("notebook_name", "")).strip() != THIS_NOTEBOOK:
        raise ValueError(f"[{THIS_NOTEBOOK}] ctx.notebook_name mismatch: {ctx.get('notebook_name')}")
    if str(ctx.get("gold_run_id", "")).strip() == "":
        raise ValueError(f"[{THIS_NOTEBOOK}] ctx.gold_run_id missing")
    if str(ctx.get("step_exec_id", "")).strip() == "":
        raise ValueError(f"[{THIS_NOTEBOOK}] ctx.step_exec_id missing")
    if str(ctx.get("entity_code", "")).strip() == "":
        raise ValueError(f"[{THIS_NOTEBOOK}] ctx.entity_code missing")

    return ctx

ctx = read_ctx_from_steps()

gold_run_id  = normalize_run_id(ctx["gold_run_id"])
step_exec_id = ctx["step_exec_id"]
entity_code  = ctx["entity_code"]
load_mode    = ctx.get("load_mode", "")
as_of_date   = ctx.get("as_of_date", "")

print(f"gold_run_id   = {gold_run_id}")
print(f"step_exec_id  = {step_exec_id}")
print(f"entity_code   = {entity_code}")
print(f"load_mode     = {load_mode}")
print(f"as_of_date    = {as_of_date}")

In [None]:
# -----------------------------
# 1) Tables / constants
# -----------------------------
SILVER_TABLE = "silver_transactions"
GOLD_TABLE   = "gold_fact_transactions"

DIM_USER = "gold_dim_user"
DIM_CARD = "gold_dim_card"
DIM_MCC  = "gold_dim_mcc"
DIM_DATE = "gold_dim_date"

ENTITY       = GOLD_TABLE
SOURCE_TABLE = SILVER_TABLE

In [None]:
# -----------------------------
# 2) Read Silver (input sanity — minimal required)
# -----------------------------
df_silver = spark.table(SILVER_TABLE)

INPUT_REQUIRED = [
    "transaction_id","txn_ts","txn_date","txn_month","client_id","card_id",
    "merchant_id","mcc_code","amount","use_chip","merchant_city","merchant_state",
    "zip","error_code","is_success",
    "source_file","ingestion_date","ingestion_ts","record_hash"
]
assert_required_columns(df_silver, INPUT_REQUIRED, ctx=f"{SILVER_TABLE} (input)")

df_work = df_silver.select(*[F.col(c) for c in INPUT_REQUIRED])
row_in = df_work.count()

In [None]:
# -----------------------------
# 3) Derivations (Gold business-friendly) — Option A
# -----------------------------
df_work = df_work.withColumn(
    "use_chip",
    F.when(F.col("use_chip").isNull(), F.lit(None).cast("string"))
     .otherwise(F.trim(F.col("use_chip").cast("string")))
)

df_work = df_work.withColumn(
    "is_chip_used",
    F.when(F.col("use_chip").isNull(), F.lit(None).cast("boolean"))
     .when(F.upper(F.col("use_chip")).contains("CHIP"), F.lit(True))
     .otherwise(F.lit(False))
)

In [None]:
# -----------------------------
# 4) Dedup keep_latest (transaction_id by ingestion_ts)
# -----------------------------
w = Window.partitionBy("transaction_id").orderBy(F.col("ingestion_ts").desc())

df_dedup = (
    df_work
    .withColumn("_rn", F.row_number().over(w))
    .filter(F.col("_rn") == 1)
    .drop("_rn")
)

row_after_dedup = df_dedup.count()
dedup_dropped = row_in - row_after_dedup

# Banking-grade uniqueness post-dedup
assert_unique_key(df_dedup, ["transaction_id"], ctx=f"{ENTITY} (post-dedup)")

In [None]:
# -----------------------------
# 5) Load dimensions (keys only)
# -----------------------------
df_users = spark.table(DIM_USER).select(F.col("client_id").cast("BIGINT").alias("client_id")).dropDuplicates()
df_cards = spark.table(DIM_CARD).select(F.col("card_id").cast("BIGINT").alias("card_id")).dropDuplicates()
df_mcc   = spark.table(DIM_MCC).select(F.col("mcc_code").cast("STRING").alias("mcc_code")).dropDuplicates()

# gold_dim_date business key: date_value
df_dates = spark.table(DIM_DATE).select(F.col("date_value").cast("DATE").alias("txn_date")).dropDuplicates()

In [None]:
# -----------------------------
# 6) Conformance + anomalies (append-only, audit-ready)
#    Strict rule: exclude non-conforming rows from Gold fact.
# -----------------------------
df_ok_1, anom_user = split_orphans_left_anti(
    fact_df=df_dedup,
    dim_df=df_users,
    fact_key="client_id",
    dim_key="client_id",
    rule_id="GOLD.TXN.CONF.001",
    anom_type="ORPHAN_DIM_USER",
    entity=ENTITY,
    gold_run_id=gold_run_id,
    source_table=SOURCE_TABLE,
    severity="HIGH",
    natural_key_cols_for_event=["transaction_id","client_id"]
)

df_ok_2, anom_card = split_orphans_left_anti(
    fact_df=df_ok_1,
    dim_df=df_cards,
    fact_key="card_id",
    dim_key="card_id",
    rule_id="GOLD.TXN.CONF.002",
    anom_type="ORPHAN_DIM_CARD",
    entity=ENTITY,
    gold_run_id=gold_run_id,
    source_table=SOURCE_TABLE,
    severity="HIGH",
    natural_key_cols_for_event=["transaction_id","card_id"]
)

df_ok_3, anom_mcc = split_orphans_left_anti(
    fact_df=df_ok_2,
    dim_df=df_mcc,
    fact_key="mcc_code",
    dim_key="mcc_code",
    rule_id="GOLD.TXN.CONF.003",
    anom_type="ORPHAN_DIM_MCC",
    entity=ENTITY,
    gold_run_id=gold_run_id,
    source_table=SOURCE_TABLE,
    severity="MEDIUM",
    natural_key_cols_for_event=["transaction_id","mcc_code"]
)

df_conformed, anom_date = split_orphans_left_anti(
    fact_df=df_ok_3,
    dim_df=df_dates,
    fact_key="txn_date",
    dim_key="txn_date",
    rule_id="GOLD.TXN.CONF.004",
    anom_type="ORPHAN_DIM_DATE",
    entity=ENTITY,
    gold_run_id=gold_run_id,
    source_table=SOURCE_TABLE,
    severity="HIGH",
    natural_key_cols_for_event=["transaction_id","txn_date"]
)

anom_all = anom_user.unionByName(anom_card).unionByName(anom_mcc).unionByName(anom_date)

# Single action on anomalies
anom_count = anom_all.count()
if anom_count > 0:
    write_anomaly_events(anom_all, table_name="gold_anomaly_event")
    write_anomaly_kpis(
        anom_all,
        gold_run_id=gold_run_id,
        entity=ENTITY,
        table_name="gold_anomaly_kpi",
        sample_limit=10
    )

row_conformed = df_conformed.count()
row_rejected = row_after_dedup - row_conformed

print(f"row_in={row_in}, row_after_dedup={row_after_dedup}, row_conformed={row_conformed}, row_rejected={row_rejected}, anom_count={anom_count}")

In [None]:
# -----------------------------
# 7) Add Gold technical columns + minor standardizations
# -----------------------------
df_conformed = df_conformed.withColumn("error_code", F.coalesce(F.col("error_code"), F.lit(0)).cast("int"))

df_gold_raw = (
    df_conformed
    .withColumn("gold_run_id", F.lit(gold_run_id))
    .withColumn("gold_load_ts", F.current_timestamp())
)

In [None]:
# -----------------------------
# 8) Load Gold contract (YAML) + canonical projection + assertions
# -----------------------------
contract = load_gold_contract(GOLD_TABLE)  # Files/governance/gold/gold_fact_transactions.yaml
df_final = project_to_gold_contract(df_gold_raw, contract)

apply_gold_contract_assertions(
    df=df_final,
    contract=contract,
    ctx=f"{ENTITY} (final)",
    enforce_types=True,
    enforce_not_null=True,
    enforce_unique=True
)

In [None]:
# -----------------------------
# 9) Rebuild idempotent (TRUNCATE + APPEND, partitioned)
# -----------------------------
# partition column must exist in df_final and in the contract (txn_month)
rebuild_gold_table(df_final, table_name=GOLD_TABLE, partition_cols=["txn_month"])

row_out = df_final.count()
partition_count = df_final.select("txn_month").distinct().count()

print(f"Loaded {GOLD_TABLE} successfully. row_out={row_out}, partition_count={partition_count}")

In [None]:
# -----------------------------
# 10) Exit payload (consumed by dispatcher)
# -----------------------------
def exit_payload(**kwargs):
    payload = {"status": "SUCCESS", **kwargs}
    mssparkutils.notebook.exit(json.dumps(payload, ensure_ascii=False))
    
exit_payload(
    status="SUCCESS",
    gold_run_id=gold_run_id,
    step_exec_id=step_exec_id,
    entity_code=entity_code,
    entity=ENTITY,
    target_table=GOLD_TABLE,
    row_in=row_in,
    row_out=row_out,
    row_rejected=row_rejected,
    dedup_dropped=dedup_dropped,
    partition_count=partition_count,
    anom_count=anom_count
)
