# 01 — Prepare Data

**Objective:** Build the three foundational Unity Catalog tables that the rest of the pipeline depends on.

| Section | Output Table | Description |
|---------|-------------|-------------|
| **A** | `ground_truth_normalized` | Cleaned, normalized Master Fee Table (431 codes) |
| **B.0** | `client_column_mappings` | AI-generated mapping from client columns to canonical schema |
| **B** | `transaction_code_catalog` | Unique standardized codes from raw data with descriptions and volume |
| **C** | Layer assignment | Adds `layer` column to the catalog (Obvious / Ambiguous / Unknown) |

**Runs on:** Databricks Runtime 15.4 LTS or above.

In [0]:
# ── Configuration ─────────────────────────────────────────────────
CATALOG_NAME = "ciq-bp_dummy-dev"
SCHEMA_NAME  = "default"
MODEL_NAME   = "databricks-claude-opus-4-6"

GT_PATH          = "../data/bank-plus-data/source-of-truth/Master Fee Table(Master).csv"
RAW_NON_POS_PATH = "../data/bank-plus-data/raw/CheckingIQ_NON_POS_Daily_012626_rerun.csv"
RAW_POS_PATH     = "../data/bank-plus-data/raw/CheckingIQ_POS_Daily_012626_rerun.csv"

GT_TABLE       = f"`{CATALOG_NAME}`.`{SCHEMA_NAME}`.ground_truth_normalized"
CATALOG_TABLE  = f"`{CATALOG_NAME}`.`{SCHEMA_NAME}`.transaction_code_catalog"
MAPPINGS_TABLE = f"`{CATALOG_NAME}`.`{SCHEMA_NAME}`.client_column_mappings"

# Generic canonical schema — universal column names for ALL clients.
CANONICAL_SCHEMA = {
    "transaction_code":  "Numeric transaction code (primary key for categorization)",
    "description_1":     "Primary transaction description text",
    "description_2":     "Secondary description / memo line",
    "amount":            "Transaction amount",
    "transaction_date":  "Date the transaction occurred",
    "posting_date":      "Date the transaction was posted",
    "account_number":    "Account identifier",
    "account_status":    "Account status code",
    "internal_account":  "Internal/alternate account number",
    "transaction_desc":  "Transaction type description (may be empty)",
}

print(f"Catalog:        {CATALOG_NAME}")
print(f"Schema:         {SCHEMA_NAME}")
print(f"GT table:       {GT_TABLE}")
print(f"Catalog table:  {CATALOG_TABLE}")
print(f"Mappings table: {MAPPINGS_TABLE}")

In [0]:
# ── Validation: check that input files exist ─────────────────────
import os

for path, label in [
    (GT_PATH, "Master Fee Table"),
    (RAW_NON_POS_PATH, "NON_POS raw data"),
    (RAW_POS_PATH, "POS raw data"),
]:
    if os.path.exists(path):
        print(f"  OK  {label}: {path}")
    else:
        raise FileNotFoundError(f"Missing {label}: {path}")

print("\nAll input files found.")

---
## Section A — Ground Truth Normalization

Read the Master Fee Table, clean it, normalize casing inconsistencies, and save to Unity Catalog.

In [0]:
import pandas as pd
import numpy as np

df_gt_raw = pd.read_csv(GT_PATH, encoding="latin-1")

# Strip whitespace from column names
df_gt_raw.columns = [c.strip() for c in df_gt_raw.columns]

print(f"Raw rows loaded: {len(df_gt_raw)}")
print(f"Columns: {list(df_gt_raw.columns)}")
df_gt_raw.head(5)

In [0]:
df_gt = df_gt_raw.copy()

# Strip whitespace from all string cells
for col in df_gt.columns:
    if df_gt[col].dtype == object:
        df_gt[col] = df_gt[col].astype(str).str.strip()

# Drop rows where External Transaction Code is empty or non-numeric
# These are section headers (e.g. "ATM activities"), generic descriptions
# (no transaction_code), and trailing blank rows.
df_gt = df_gt[
    df_gt["External Transaction Code"].notna()
    & (df_gt["External Transaction Code"].astype(str).str.strip() != "")
    & (df_gt["External Transaction Code"].astype(str).str.strip() != "nan")
].copy()

df_gt["External Transaction Code"] = df_gt["External Transaction Code"].astype(str).str.strip()

# Drop header-leak rows (the header row repeated mid-file at the Fee Items boundary)
df_gt = df_gt[
    df_gt["Scoring Category 1"].astype(str).str.strip() != "Scoring Category 1"
].copy()

# Keep only rows where the transaction_code is a valid integer
df_gt = df_gt[
    df_gt["External Transaction Code"].str.match(r"^\d+$")
].copy()

print(f"After cleaning: {len(df_gt)} rows, {df_gt['External Transaction Code'].nunique()} unique codes")

In [0]:
# ── Normalization maps ────────────────────────────────────────────
# Fix casing inconsistencies found in the spreadsheet.

L1_NORM = {
    "Fee Item":  "Fee item",
    "Fee item":  "Fee item",
    "Non-fee item": "Non-fee item",
}

L2_NORM = {
    "NSF /OD":            "NSF/OD",
    "NSF/OD":             "NSF/OD",
    "Money Movement":     "Money movement",
    "Money movement":     "Money movement",
    "Account Operations": "Account operations",
    "Account operations": "Account operations",
    "All others":         "All others",
    "Service Charges":    "Service Charges",
    "Interchange":        "Interchange",
    "Miscellaneous":      "Miscellaneous",
    "Unclassified":       "Unclassified",
}

L3_NORM = {
    "N/A": None,
    "nan": None,
    "Money Movement":     "Money movement",
    "Account Operations": "Account operations",
}

L4_NORM = {
    "N/A": None,
    "nan": None,
}


def _apply_map(series, norm_map):
    """Map values through a normalization dict, keeping unmapped values as-is."""
    mapped = series.map(norm_map)
    # Where the map returned a value (including explicit None), use it.
    # Where the key was not in the map, keep the original.
    has_mapping = series.isin(norm_map.keys())
    return mapped.where(has_mapping, series)


df_gt["Scoring Category 1"] = _apply_map(df_gt["Scoring Category 1"], L1_NORM)
df_gt["Scoring Category 2"] = _apply_map(df_gt["Scoring Category 2"], L2_NORM)
df_gt["Scoring Category 3"] = _apply_map(df_gt["Scoring Category 3"], L3_NORM)
df_gt["Scoring Category 4"] = _apply_map(df_gt["Scoring Category 4"], L4_NORM)

# Drop rows where L1 ended up as None (shouldn't happen after cleaning, but safety)
df_gt = df_gt[df_gt["Scoring Category 1"].notna()].copy()

print("After normalization:")
print(f"  L1 values: {sorted(df_gt['Scoring Category 1'].dropna().unique())}")
print(f"  L2 values: {sorted(df_gt['Scoring Category 2'].dropna().unique())}")
print(f"  L3 values: {sorted(df_gt['Scoring Category 3'].dropna().unique())}")
l4_vals = df_gt['Scoring Category 4'].dropna().unique()
print(f"  L4 values: {sorted([v for v in l4_vals if v is not None])}")

In [0]:
# ── Rename to canonical column names ──────────────────────────────
df_gt = df_gt.rename(columns={
    "External Transaction Code":        "transaction_code",
    "External Transaction Description": "gt_desc",
    "Scoring Category 1":               "gt_L1",
    "Scoring Category 2":               "gt_L2",
    "Scoring Category 3":               "gt_L3",
    "Scoring Category 4":               "gt_L4",
    "Credit / Debit":                   "gt_credit_debit",
})

gt_cols = ["transaction_code", "gt_desc", "gt_L1", "gt_L2", "gt_L3", "gt_L4", "gt_credit_debit"]
df_gt = df_gt[gt_cols].copy()

# Replace remaining string 'None' / 'nan' artifacts
df_gt.replace({"None": None, "nan": None}, inplace=True)

print(f"Ground truth ready: {len(df_gt)} rows, {df_gt['transaction_code'].nunique()} unique codes")
df_gt.head(10)

In [0]:
# ── Save ground truth to Unity Catalog ────────────────────────────
try:
    sdf_gt = spark.createDataFrame(df_gt)
    sdf_gt.write.mode("overwrite").saveAsTable(GT_TABLE)
    print(f"Saved {len(df_gt)} rows to {GT_TABLE}")
except NameError:
    print("Spark session not found — skipping UC write (run in Databricks).")
    print(f"DataFrame ready with {len(df_gt)} rows.")

In [0]:
import csv

def load_raw_csv(path, label=""):
    """
    Robust CSV loader for BankPlus raw files.
    
    Handles unquoted commas in EFHDS1/EFHDS2 by parsing from both ends:
      - Front 6 columns (ACCTNO → AMT) are always safe
      - Tail columns (Account# → end) are always safe
      - Middle = EFHDS1 + EFHDS2, may contain extra commas
    
    Also filters out NUL bytes (\x00) which are invalid in CSV files.
    
    Returns a DataFrame with the ORIGINAL client column names.
    Column renaming to canonical names is handled separately in Section B.
    """
    with open(path, "rb") as f:
        raw_bytes = f.read()
    
    cleaned_bytes = raw_bytes.replace(b'\x00', b'')
    text = cleaned_bytes.decode('utf-8', errors='replace')
    lines = text.splitlines()
    
    reader = csv.reader(lines)
    header = next(reader)

    header = [c.strip() for c in header]
    n_expected = len(header)
    n_front = 6
    n_tail  = n_expected - 6 - 2

    rows = []
    n_fixed = 0

    reader = csv.reader(lines[1:])
    for line_num, fields in enumerate(reader, start=2):
        n = len(fields)

        if n == n_expected:
            rows.append(fields)

        elif n > n_expected:
            front  = fields[:n_front]
            tail   = fields[n - n_tail:]
            middle = fields[n_front : n - n_tail]

            efhds1 = ",".join(middle[:-1])
            efhds2 = middle[-1]

            rows.append(front + [efhds1, efhds2] + tail)
            n_fixed += 1

    df = pd.DataFrame(rows, columns=header)
    df.columns = [c.strip() for c in df.columns]

    total = len(df)
    print(f"{label} rows: {total:,}")
    print(f"  → Rows with commas in EFHDS1 (fixed): {n_fixed:,}")
    print(f"Columns: {list(df.columns)}")

    return df

---
## Section B.0 — AI Column Mapping

Identify the mapping from client-specific column names to the generic canonical schema
using `ai_query()`. This allows the pipeline to handle different core banking systems.

In [0]:
import csv
import re
import json

def get_column_profiles(path, n_rows=20):
    """
    Load a sample of raw client data and infer basic column types for the AI prompt.
    Returns the original client column names — no renaming is applied.
    """
    df_sample = load_raw_csv(path, label="Profiling")
    df_sample = df_sample.head(n_rows)

    profiles = []
    for col in df_sample.columns:
        vals = df_sample[col].dropna().astype(str).str.strip()
        vals = vals[vals != ""].head(5).tolist()
        
        dtype = "text"
        if all(v.replace(".", "").replace("-", "").isdigit() for v in vals if v):
            if any("." in v for v in vals): dtype = "decimal numbers"
            elif all(len(v) <= 3 for v in vals): dtype = "small integers"
            else: dtype = "integers"
            
        if any("-" in v and len(v) == 10 for v in vals):
            dtype = "dates (YYYY-MM-DD)"

        profiles.append(f"  - Column: \"{col}\"\n    Data type: {dtype}\n    Sample values: {vals}")
    return "\n".join(profiles), list(df_sample.columns)

print("Building column profiles from NON_POS data (raw client column names)...")
profiles_text, client_cols = get_column_profiles(RAW_NON_POS_PATH)
print(f"\nClient columns to map: {client_cols}")

canonical_schema_text = "\n".join(f"  - {col}: {desc}" for col, desc in CANONICAL_SCHEMA.items())

mapping_prompt = f"""You are a data engineering expert specializing in US banking core systems.

A financial institution has provided a transaction data export. Their column names
are abbreviations from their core banking system (e.g., Jack Henry SilverLake,
Fiserv, FIS). Your task is to map each column to our canonical schema.

### Canonical Schema (target — map TO these names)
{canonical_schema_text}

### Client Data Profile (source — map FROM these columns)
{profiles_text}

### Key Banking Domain Knowledge
- Core systems use abbreviations: TRANCD = transaction code, EFHDS = extended
  field header/description, AMT = amount, ACCTNO = account number, etc.
- \"EFHDS1\" and \"EFHDS2\" are Jack Henry's names for description lines 1 and 2.
- Transaction files often have TWO date columns: a transaction date and a posting
  date. They may look similar but serve different purposes.
- \"status\" is typically a small integer (1, 2, 3) indicating account status.
- \"description\" (if mostly empty) is a legacy field for transaction type description.
- There may be TWO account number columns: an external-facing one and an internal one.

### Few-Shot Example (different bank, similar task)
A Fiserv bank had these columns: TxnCode, Desc1, Desc2, TxnAmt, TxnDate,
PostDate, AcctNum, AcctStat, IntAcct, TxnDesc.

Correct mapping:
  TxnCode  → transaction_code
  Desc1    → description_1
  Desc2    → description_2
  TxnAmt   → amount
  TxnDate  → transaction_date
  PostDate → posting_date
  AcctNum  → account_number
  AcctStat → account_status
  IntAcct  → internal_account
  TxnDesc  → transaction_desc

### Instructions
1. MATCH BY DATA VALUES, not just column names. Look at the sample values to
   determine what each column actually contains.
2. Map each client column to exactly one canonical column, or null if no match.
3. Each canonical column can only be used once.
4. Think step by step:
   - Which column has small integers (like 183, 163)? → transaction_code
   - Which column has dollar amounts (like 258.20)? → amount
   - Which column has text descriptions? → description_1 (primary), description_2 (secondary)
   - Which columns have dates? → the one named for transaction date, the other for posting date
   - Which column has alphanumeric account IDs? → account_number
   - Which column has numeric-only account numbers? → internal_account
   - Which column has single-digit status codes? → account_status
   - Which column is mostly empty text? → transaction_desc

Return ONLY a JSON object. Keys = client column names, values = canonical column names (or null).
"""
print("Prompt prepared.")

In [0]:
escaped_prompt = mapping_prompt.replace("'", "''")
def sanitize(c): return re.sub(r'[^a-zA-Z0-9_.-]', '_', c)
san_to_orig = {sanitize(c): c for c in client_cols}
props = {sanitize(c): {"type": ["string", "null"]} for c in client_cols}
schema = json.dumps({"type": "json_schema", "json_schema": {"name": "mapping", "schema": {"type": "object", "properties": props}, "strict": True}})

query = f"SELECT ai_query('{MODEL_NAME}', '{escaped_prompt}', responseFormat => '{schema}') as res"
try:
    print("Calling ai_query...")
    res_raw = spark.sql(query).collect()[0]["res"]
    ai_mapping_san = json.loads(res_raw)
    CLIENT_RENAME_MAP = {san_to_orig[k]: v for k, v in ai_mapping_san.items() if v}
    print(f"Mapping derived for {len(CLIENT_RENAME_MAP)} columns.")
except NameError:
    print("Spark not found — using hardcoded Bank Plus mapping.")
    CLIENT_RENAME_MAP = {"TRANCD": "transaction_code", "EFHDS1": "description_1", "EFHDS2": "description_2", "AMT": "amount", "TRDATE": "transaction_date", "PostingDate": "posting_date", "ACCTNO": "account_number", "status": "account_status", "Account#": "internal_account", "description": "transaction_desc"}

print("Final Rename Map:")
for k, v in CLIENT_RENAME_MAP.items(): print(f"  {k:<15} -> {v}")

In [0]:
from datetime import datetime
mapping_df = pd.DataFrame([{"client_column": k, "canonical_column": v, "run_timestamp": datetime.utcnow().isoformat()} for k, v in CLIENT_RENAME_MAP.items()])
try:
    spark.createDataFrame(mapping_df).write.mode("overwrite").saveAsTable(MAPPINGS_TABLE)
    print(f"Saved mapping to {MAPPINGS_TABLE}")
except NameError: print("Skipping UC write.")

---
## Section B — Transaction Code Catalog

Build a catalog of unique transaction codes from the raw NON_POS and POS daily files.
For each code, capture one sample description and the total transaction volume.

In [0]:
df_non_pos = load_raw_csv(RAW_NON_POS_PATH, label="NON_POS")

# Apply the AI-derived column mapping from Section B.0
df_non_pos = df_non_pos.rename(columns=CLIENT_RENAME_MAP)
df_non_pos["transaction_code"] = df_non_pos["transaction_code"].astype(str).str.strip()
print(f"\nColumns after applying CLIENT_RENAME_MAP: {list(df_non_pos.columns)}")
print(f"NON_POS unique codes: {df_non_pos['transaction_code'].nunique()}")

In [0]:
len(df_non_pos)

In [0]:
df_pos = load_raw_csv(RAW_POS_PATH, label="POS")

# Apply the AI-derived column mapping from Section B.0
df_pos = df_pos.rename(columns=CLIENT_RENAME_MAP)
df_pos["transaction_code"] = df_pos["transaction_code"].astype(str).str.strip()
print(f"\nColumns after applying CLIENT_RENAME_MAP: {list(df_pos.columns)}")
print(f"POS unique codes: {df_pos['transaction_code'].nunique()}")

In [0]:
# ── Build NON_POS catalog entries ─────────────────────────────────
# description_1 is the primary description field (mapped from EFHDS1 for Bank Plus)
non_pos_catalog = (
    df_non_pos
    .groupby("transaction_code")
    .agg(
        description_1=("description_1", "first"),
        volume=("transaction_code", "size"),
    )
    .reset_index()
)
non_pos_catalog["source_file"] = "NON_POS"
non_pos_catalog["description_1"] = non_pos_catalog["description_1"].astype(str).str.strip()

print(f"NON_POS catalog: {len(non_pos_catalog)} unique codes")
non_pos_catalog.sort_values("volume", ascending=False).head(10)

In [0]:
# ── Build POS catalog entries ─────────────────────────────────────
# transaction_desc is the populated description column for POS
pos_catalog = (
    df_pos
    .groupby("transaction_code")
    .agg(
        description_1=("transaction_desc", "first"),
        volume=("transaction_code", "size"),
    )
    .reset_index()
)
pos_catalog["source_file"] = "POS"
pos_catalog["description_1"] = pos_catalog["description_1"].astype(str).str.strip()

print(f"POS catalog: {len(pos_catalog)} unique codes")
pos_catalog.sort_values("volume", ascending=False).head(10)

In [0]:
# ── Combine into a single catalog ─────────────────────────────────
df_catalog = pd.concat([non_pos_catalog, pos_catalog], ignore_index=True)
df_catalog["transaction_code"] = df_catalog["transaction_code"].astype(str)

print(f"Combined catalog: {len(df_catalog)} codes")
print(f"Total transaction volume: {df_catalog['volume'].sum():,}")
df_catalog.sort_values("volume", ascending=False)

---
## Section C — Layer Assignment

Assign each catalog code to a test layer based on how it appears in the ground truth:

| Layer | Name | Rule |
|-------|------|------|
| 1 | Obvious | Exactly 1 unique (L1,L2,L3,L4) mapping in GT |
| 2 | Ambiguous | 2+ distinct mappings in GT |
| 3 | Unknown | transaction_code absent from GT entirely |

In [0]:
# Count distinct mappings per transaction_code in ground truth
gt_mapping_counts = (
    df_gt
    .groupby("transaction_code")
    .apply(lambda g: g[["gt_L1", "gt_L2", "gt_L3", "gt_L4"]].drop_duplicates().shape[0])
    .reset_index(name="n_mappings")
)

multi_codes  = set(gt_mapping_counts.loc[gt_mapping_counts["n_mappings"] > 1, "transaction_code"])
single_codes = set(gt_mapping_counts.loc[gt_mapping_counts["n_mappings"] == 1, "transaction_code"])
all_gt_codes = set(df_gt["transaction_code"].unique())

print(f"GT codes with 1 mapping (Layer 1):  {len(single_codes)}")
print(f"GT codes with 2+ mappings (Layer 2): {len(multi_codes)}")
print(f"Multi-mapping codes: {sorted(multi_codes)}")

In [0]:
def assign_layer(trancd):
    if trancd not in all_gt_codes:
        return 3  # Unknown
    if trancd in multi_codes:
        return 2  # Ambiguous
    return 1      # Obvious


df_catalog["layer"] = df_catalog["transaction_code"].apply(assign_layer)

# ── Summary ───────────────────────────────────────────────────────
layer_summary = (
    df_catalog
    .groupby("layer")
    .agg(codes=("transaction_code", "nunique"), total_volume=("volume", "sum"))
    .reset_index()
)
layer_summary["pct_volume"] = (
    layer_summary["total_volume"] / layer_summary["total_volume"].sum() * 100
).round(1)

layer_names = {1: "Obvious", 2: "Ambiguous", 3: "Unknown"}
layer_summary["name" ] = layer_summary["layer"].map(layer_names)

print("Layer assignment summary:")
print(layer_summary[["layer", "name", "codes", "total_volume", "pct_volume"]].to_string(index=False))
print(f"\nTotal codes: {len(df_catalog)}")

In [0]:
# ── Show codes per layer ──────────────────────────────────────────
for layer_num, layer_name in layer_names.items():
    layer_df = df_catalog[df_catalog["layer"] == layer_num].sort_values("volume", ascending=False)
    print(f"\n{'='*60}")
    print(f"Layer {layer_num} — {layer_name} ({len(layer_df)} codes)")
    print(f"{'='*60}")
    for _, row in layer_df.iterrows():
        print(f"  transaction_code={row['transaction_code']:>5} | vol={row['volume']:>7,} | {row['source_file']:<7} | {str(row['description_1'])[:50]}")

In [0]:
# ── Save catalog (with layers) to Unity Catalog ───────────────────
try:
    df_catalog_to_save = df_catalog.copy()
    df_catalog_to_save["transaction_code"] = df_catalog_to_save["transaction_code"].astype(int)
    
    sdf_catalog = spark.createDataFrame(df_catalog_to_save)
    sdf_catalog.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(CATALOG_TABLE)
    print(f"Saved {len(df_catalog)} rows to {CATALOG_TABLE}")
except NameError:
    print("Spark session not found — skipping UC write (run in Databricks).")
    print(f"DataFrame ready with {len(df_catalog)} rows.")

---
## Validation

Verify that all Unity Catalog tables were created successfully.

In [0]:
# ── Validate UC tables ────────────────────────────────────────────
try:
    for table_name, expected_label in [
        (GT_TABLE, "ground_truth_normalized"),
        (CATALOG_TABLE, "transaction_code_catalog"),
    ]:
        count = spark.sql(f"SELECT COUNT(*) as cnt FROM {table_name}").collect()[0]["cnt"]
        print(f"  OK  {expected_label}: {count} rows")

    # Verify the catalog has a layer column
    catalog_cols = [f.name for f in spark.table(CATALOG_TABLE).schema.fields]
    assert "layer" in catalog_cols, "Missing 'layer' column in catalog table"
    print(f"  OK  catalog has 'layer' column")

    print("\nAll validations passed.")
except NameError:
    print("Spark session not found — skipping UC validation (run in Databricks).")
    print("Local DataFrames are ready:")
    print(f"  df_gt:      {len(df_gt)} rows, {df_gt['transaction_code'].nunique()} unique codes")
    print(f"  df_catalog: {len(df_catalog)} rows, columns: {list(df_catalog.columns)}")