In [19]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Koise — Blue Lantern: Enrichment pipeline
-----------------------------------------
- Reads raw statements in data/raw/
- Normalizes -> [date, description, amount, account]
- Creates merchant_key
- Applies categories via (1) overrides in config/categories.yaml and (2) data/processed/merchant_map.csv
- Optionally classifies unknown merchants with GPT (once) and appends to merchant_map
- Flags non-spend flows and is_necessity
- Writes enriched CSV for Power BI

Run:
  uv run python scripts/enrich_transactions.py
  # or: python scripts/enrich_transactions.py

Requires:
  pip install pandas pyyaml python-dateutil openpyxl (for .xlsx)
  # pdfplumber is optional for PDFs: pip install pdfplumber
"""


'\nKoise — Blue Lantern: Enrichment pipeline\n-----------------------------------------\n- Reads raw statements in data/raw/\n- Normalizes -> [date, description, amount, account]\n- Creates merchant_key\n- Applies categories via (1) overrides in config/categories.yaml and (2) data/processed/merchant_map.csv\n- Optionally classifies unknown merchants with GPT (once) and appends to merchant_map\n- Flags non-spend flows and is_necessity\n- Writes enriched CSV for Power BI\n\nRun:\n  uv run python scripts/enrich_transactions.py\n  # or: python scripts/enrich_transactions.py\n\nRequires:\n  pip install pandas pyyaml python-dateutil openpyxl (for .xlsx)\n  # pdfplumber is optional for PDFs: pip install pdfplumber\n'

In [20]:
import os
os.chdir(r"C:\Users\kosis\Downloads\Automation\spending-dashboard")

In [21]:
from __future__ import annotations
import os, re, csv, json, glob
from datetime import datetime
from typing import List, Dict, Any

import pandas as pd
import yaml

In [22]:

# ============== SETTINGS ==============
PROJECT_ROOT = os.path.dirname(os.path.dirname(__file__)) if "__file__" in globals() else os.getcwd()
PATH_RAW       = os.path.join(PROJECT_ROOT, "data", "raw")
PATH_INTERIM   = os.path.join(PROJECT_ROOT, "data", "interim")
PATH_PROCESSED = os.path.join(PROJECT_ROOT, "data", "processed")
PATH_DOCS      = os.path.join(PROJECT_ROOT, "docs")
PATH_CONFIG    = os.path.join(PROJECT_ROOT, "config", "categories.yaml")
PATH_MMAP      = os.path.join(PATH_PROCESSED, "merchant_map.csv")

USE_GPT = False  # <-- flip to True when you want to call GPT for unknown merchants

# If you turn on GPT, set your env var OPENAI_API_KEY before running.
# Model & batch size:
GPT_MODEL = "gpt-4o-mini"  # light, cheap; swap to gpt-4o for extra quality
GPT_BATCH = 20

# ======================================

In [None]:

def ensure_dirs():
    for p in [PATH_INTERIM, PATH_PROCESSED, PATH_DOCS]:
        os.makedirs(p, exist_ok=True)

def read_yaml(path: str) -> Dict[str, Any]:
    with open(path, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)

def try_read_csv_like(path: str) -> pd.DataFrame | None:
    # 1) normal read
    for enc in ("utf-8", "latin1"):
        try:
            return pd.read_csv(path, encoding=enc)
        except Exception:
            pass
    # 2) python engine w/ sep=None
    for enc in ("utf-8", "latin1"):
        try:
            return pd.read_csv(path, sep=None, engine="python", encoding=enc)
        except Exception:
            pass
    # 3) sniff
    try:
        with open(path, "r", encoding="latin1", errors="ignore") as f:
            sample = f.read(4096)
            dialect = csv.Sniffer().sniff(sample)
            f.seek(0)
            return pd.read_csv(f, dialect.delimiter)
    except Exception:
        return None

def try_read_xlsx(path: str) -> pd.DataFrame | None:
    try:
        return pd.read_excel(path, engine="openpyxl")
    except Exception:
        return None
def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normalize to columns: date, description, amount
    Handles messy cases:
      - 'Amount Debit'/'Amount Credit' pairs
      - 'Credit'/'Debit' pairs
      - various date column names (Posting/Transaction/etc.)
      - date recovery from Memo/Description if no explicit date column
    """
    df = df.copy()
    df.columns = [str(c).strip() for c in df.columns]

    # ---------- Amount ----------
    # First: unified money cleaner
    def to_num(s):
        s = "" if pd.isna(s) else str(s)
        s = s.replace("$", "").replace(",", "").strip()
        if "(" in s and ")" in s:
            s = s.replace("(", "").replace(")", "")
            try: return -float(s)
            except: return pd.NA
        try: return float(s)
        except: return pd.NA

    amount_col = None

    # Case A: explicit Amount column
    for c in df.columns:
        if re.search(r"\bamount\b", c, re.I) and c.lower() not in ("amount debit","amount credit"):
            amount_col = c
            break

    # Case B: Amount Debit / Amount Credit pair (credit positive)
    if amount_col is None:
        has_debit  = any(re.fullmatch(r"(?i)amount debit", c) for c in df.columns)
        has_credit = any(re.fullmatch(r"(?i)amount credit", c) for c in df.columns)
        if has_debit or has_credit:
            debit_col  = next((c for c in df.columns if re.fullmatch(r"(?i)amount debit", c)), None)
            credit_col = next((c for c in df.columns if re.fullmatch(r"(?i)amount credit", c)), None)
            debit  = df[debit_col].map(to_num)  if debit_col  else 0
            credit = df[credit_col].map(to_num) if credit_col else 0
            df["__amount"] = (credit.fillna(0) - debit.fillna(0))
            amount_col = "__amount"

    # Case C: generic Debit / Credit pair
    if amount_col is None:
        debit_col  = next((c for c in df.columns if re.search(r"\bdebit\b", c, re.I)), None)
        credit_col = next((c for c in df.columns if re.search(r"\bcredit\b", c, re.I)), None)
        if debit_col or credit_col:
            debit  = df[debit_col].map(to_num)  if debit_col  else 0
            credit = df[credit_col].map(to_num) if credit_col else 0
            df["__amount"] = (credit.fillna(0) - debit.fillna(0))
            amount_col = "__amount"

    # If still nothing, try any money-like column
    if amount_col is None:
        money_like = [c for c in df.columns if re.search(r"amount|amt|\$\s*", c, re.I)]
        if money_like:
            amount_col = money_like[0]

    # ---------- Description ----------
    desc_col = None
    for c in df.columns:
        if re.search(r"description|details|transaction description|payee|memo", c, re.I):
            desc_col = c
            break
    if desc_col is None:
        # fallback: pick the widest text column
        text_cols = [c for c in df.columns if df[c].astype(str).str.len().mean() > 6]
        desc_col = text_cols[0] if text_cols else df.columns[0]

    # ---------- Date ----------
    date_col = None
    # Strong candidates first
    for c in df.columns:
        if re.fullmatch(r"(?i)(date|transaction date|posting date|posted date|trans date)", c):
            date_col = c
            break
    # Any column with 'date' in the name
    if date_col is None:
        dateish = [c for c in df.columns if re.search(r"date", c, re.I)]
        if dateish:
            date_col = dateish[0]

    # Build output
    out = pd.DataFrame()
    out["description"] = df[desc_col].astype(str)

    # Amount
    if amount_col is not None:
        out["amount"] = df[amount_col].map(to_num)
    else:
        out["amount"] = pd.NA

    # Date: direct if present, else extract from text (Memo/Description)
    if date_col is not None:
        out["date"] = pd.to_datetime(df[date_col], errors="coerce")
    else:
        # Try pull from memo/description like: "Date 07/19/25" or "07/19/2025"
        pat = re.compile(r"(?:Date\s*)?(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})")
        def extract_date(s):
            m = pat.search(str(s))
            if m:
                return pd.to_datetime(m.group(1), errors="coerce")
            return pd.NaT
        out["date"] = df.get("Memo", pd.Series([None]*len(df))).apply(extract_date)
        # if still NaT, try description
        mask = out["date"].isna()
        out.loc[mask, "date"] = out.loc[mask, "description"].apply(extract_date)

    # Final coercion & tidy
    try:
        out["date"] = pd.to_datetime(out["date"], format="mixed", errors="coerce")
    except Exception:
        out["date"] = pd.to_datetime(out["date"], errors="coerce")
    out["date"] = out["date"].dt.date

    # Clean description
    out["description"] = out["description"].fillna("").astype(str).str.strip()

    # Drop rows with no useful info
    out = out.dropna(subset=["description"], how="all")
    return out.reset_index(drop=True)

    # Clean amount
    def clean_amt(v):
        s = str(v) if pd.notna(v) else ""
        s = s.replace("$","").replace(",","").strip()
        neg = "(" in s and ")" in s
        s = s.replace("(","").replace(")","")
        try:
            val = float(s)
            return -val if neg else val
        except:
            return pd.NA
    out["amount"] = out["amount"].apply(clean_amt)

    # Coerce date
    out["date"] = pd.to_datetime(out["date"], errors="coerce").dt.date

    # Description as string
    out["description"] = out["description"].astype(str)

    return out.dropna(subset=["description"]).reset_index(drop=True)

def merchant_key_from_description(s: str) -> str:
    s = (s or "").upper().strip()

    # remove common noise phrases first
    s = re.sub(r"\bAPPLE PAY(MENT)?(?:\s+ENDING\s+IN\s+\d+)?\b", " ", s)
    s = re.sub(r"\bGOOGLE PAY\b", " ", s)
    s = re.sub(r"\bWALLET\b", " ", s)
    s = re.sub(r"\bAPPLE CASH\b", " ", s)

    # remove digits & keep core letters and a few separators
    s = re.sub(r"\d+", " ", s)
    s = re.sub(r"[^A-Z &'\-]", " ", s)

    # drop generic tokens that don’t help merchant identity
    s = re.sub(r"\b(ONLINE|USA|US|STORE|CARD|VISA|MC|AMEX|DISCOVER|AUTOMATIC|PAYMENT|THANK|SEE|DETAILS|FULL|BALANCE)\b", " ", s)

    # squeeze spaces
    s = re.sub(r"\s+", " ", s).strip()
    return s


def load_raw_transactions() -> pd.DataFrame:
    frames: List[pd.DataFrame] = []
    for path in glob.glob(os.path.join(PATH_RAW, "*")):
        base = os.path.basename(path)
        account = os.path.splitext(base)[0].lower()
        df = None
        if base.lower().endswith(".csv"):
            df = try_read_csv_like(path)
        elif base.lower().endswith((".xlsx", ".xls")):
            df = try_read_xlsx(path)
        else:
            print(f"[skip] unsupported (not CSV/XLSX): {base}")
            continue

        if df is None or df.empty:
            print(f"[skip] could not parse: {base}")
            continue

        norm = normalize_columns(df)
        missing = [c for c in ["date","description","amount"] if c not in norm.columns or norm[c].isna().all()]
        if missing:
            print(f"[warn] {base} missing/empty columns: {missing} — keeping what we have.")
        norm["account"] = account
        frames.append(norm)

    if not frames:
        raise RuntimeError("No readable files in data/raw/.")
    all_tx = pd.concat(frames, ignore_index=True)
    all_tx["merchant_key"] = all_tx["description"].apply(merchant_key_from_description)
    return all_tx


def load_or_init_merchant_map() -> pd.DataFrame:
    # canonical columns
    cols = [
        "merchant_key",
        "display_name",
        "category",
        "subcategory",
        "tags",
        "confidence",
        "source",
        "first_seen",
        "last_seen",
    ]
    if os.path.exists(PATH_MMAP):
        mm = pd.read_csv(PATH_MMAP)
        # add any missing columns with NA defaults
        for c in cols:
            if c not in mm.columns:
                mm[c] = pd.NA
        # order columns consistently
        mm = mm[cols].copy()
    else:
        mm = pd.DataFrame(columns=cols)
    return mm


def yaml_contains_category(tx: pd.DataFrame, cfg: dict) -> pd.Series:
    """
    Returns a Series with YAML-derived categories by substring match on merchant_key.
    More specific/longer patterns win. First match wins.
    """
    mapping = cfg.get("mapping", {}) or {}
    # flatten to [(pattern_key, category, display_name, length)]
    rules = []
    for cat, items in mapping.items():
        if not isinstance(items, list):
            continue
        for raw in items:
            pat = merchant_key_from_description(str(raw))
            if pat:
                rules.append((pat, cat, str(raw), len(pat)))
    # longest patterns first so 'AMAZON PRIME' beats 'AMAZON'
    rules.sort(key=lambda x: x[3], reverse=True)

    cat = pd.Series(index=tx.index, dtype="object")
    disp = pd.Series(index=tx.index, dtype="object")

    for pat, c, dispname, _ in rules:
        # cheap substring check
        mask = tx["merchant_key"].str.contains(re.escape(pat), na=False)
        # only fill where still empty
        fill = mask & cat.isna()
        cat.loc[fill] = c
        disp.loc[fill] = dispname

    tx["_yaml_category"] = cat
    tx["_yaml_display"]  = disp
    return cat


def apply_rules_and_cache(tx, cfg, mm):
    # 1) YAML contains
    _ = yaml_contains_category(tx, cfg)  # fills tx['_yaml_category'], tx['_yaml_display']
    has_yaml = tx["_yaml_category"].notna()

    # 2) merchant_map exact (cache)
    mm_required = ["merchant_key","category","display_name","subcategory","tags","confidence","source"]
    for c in mm_required:
        if c not in mm.columns:
            mm[c] = pd.NA
    mm_slim = mm[mm_required].drop_duplicates()

    merged = tx.merge(mm_slim, on="merchant_key", how="left", suffixes=("","_mm"))

    # ensure suffix cols exist
    for c in ["category_mm","display_name_mm","subcategory_mm","tags_mm","confidence_mm","source_mm"]:
        if c not in merged.columns:
            merged[c] = pd.NA

    # final picks: YAML wins; else merchant_map
    merged["category_final"] = merged["_yaml_category"].where(has_yaml, merged["category_mm"])
    merged["display_name_final"] = merged["_yaml_display"].where(has_yaml, merged["display_name_mm"])
    merged["subcategory_final"]  = merged["subcategory_mm"]
    merged["tags_final"]         = merged["tags_mm"]
    merged["confidence_final"]   = merged["confidence_mm"]
    merged["source_final"]       = merged["source_mm"].where(~has_yaml, "yaml")

    return merged

def mark_non_spend(enriched: pd.DataFrame) -> pd.DataFrame:
    enriched = enriched.copy()
    non_spend_cats = {
        "credit_card_payment", "transfer", "loan_payment",
        "income", "refund", "internal_move"
    }
    enriched["is_non_spend_flow"] = (
        enriched["is_non_spend_flow"]
        | enriched["category_final"].isin(non_spend_cats)
    ).fillna(False)

    pay_phrases = enriched["description"].str.contains(
        r"AUTOMATIC PAYMENT|DIRECTPAY|PAYMENT\s*(THANK|YOU)|FULL BALANCE|ACCTVERIFY|TRANSFER",
        case=False, na=False
    )
    enriched.loc[pay_phrases, "is_non_spend_flow"] = True
    return enriched


# -------- GPT section (optional) --------
def gpt_classify_merchants(merchant_rows: List[Dict[str,Any]], category_set: List[str]) -> List[Dict[str,Any]]:
    """
    merchant_rows: [{"merchant_key": "...", "samples": ["raw desc 1", "raw desc 2", ...]}]
    Returns list of dicts with: merchant_key, display_name, category, subcategory, tags, confidence, source
    """
    # Placeholder — no external calls when USE_GPT=False
    if not USE_GPT:
        return []

    import os
    from openai import OpenAI
    client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

    out: List[Dict[str,Any]] = []
    SYSTEM = (
        "You are a meticulous financial transaction classifier. "
        "Use only the provided CATEGORY_SET. Return JSONL, one object per line. "
        "Fields: merchant_key, display_name, category, subcategory, tags (comma list), confidence (0-1)."
    )

    # Chunk in batches
    for i in range(0, len(merchant_rows), GPT_BATCH):
        batch = merchant_rows[i:i+GPT_BATCH]
        rows_text = []
        for r in batch:
            mk = r["merchant_key"]
            samples = "; ".join(r.get("samples", [])[:3])
            rows_text.append(f'merchant_key="{mk}", raw="{samples}"')

        USER = "CATEGORY_SET = " + json.dumps(category_set) + "\nClassify:\n" + "\n".join(rows_text)
        resp = client.chat.completions.create(
            model=GPT_MODEL,
            temperature=0,
            messages=[{"role":"system","content":SYSTEM},
                      {"role":"user","content":USER}]
        )
        text = resp.choices[0].message.content.strip()
        # Parse JSONL lines
        for line in text.splitlines():
            line = line.strip()
            if not line: 
                continue
            try:
                obj = json.loads(line)
                obj["source"] = "gpt"
                out.append(obj)
            except Exception:
                # best-effort parsing; skip bad lines
                pass
    return out

In [24]:

# --------------- MAIN ---------------
def main():
    ensure_dirs()

    # 1) Load config + merchant_map
    cfg = read_yaml(PATH_CONFIG)
    mm = load_or_init_merchant_map()

    # 2) Read and normalize all raw statements
    tx = load_raw_transactions()
    tx.to_csv(os.path.join(PATH_INTERIM, "all_transactions_normalized.csv"), index=False)

    # 3) Apply rules + cache
    applied = apply_rules_and_cache(tx, cfg, mm)

    # 4) Build unknown merchant list (for GPT or review)
    unknown = applied[applied["category_final"].isna()].copy()
    unknown_keys = unknown["merchant_key"].dropna().unique().tolist()

    # Group samples per merchant_key for better GPT context
    samples = (unknown
               .groupby("merchant_key")["description"]
               .apply(lambda s: list(pd.Series(s).dropna().astype(str).head(3)))
               .reset_index()
               .rename(columns={"description":"samples"}))
    to_classify = samples.to_dict(orient="records")

    # 5) Optional GPT classify
    category_set = list({*cfg.get("necessities", []),
                         *cfg.get("discretionary", []),
                         *cfg.get("non_spend_flows", []),
                         *list((cfg.get("mapping") or {}).keys())})
    gpt_results = gpt_classify_merchants(to_classify, category_set)

    # 6) Update merchant_map with GPT results
    if gpt_results:
        gpt_df = pd.DataFrame(gpt_results)
        now = datetime.utcnow().date().isoformat()
        gpt_df["first_seen"] = now
        gpt_df["last_seen"]  = now

        # merge/update existing records
        mm_existing = mm.set_index("merchant_key")
        for _, row in gpt_df.iterrows():
            mk = row["merchant_key"]
            if mk in mm_existing.index:
                # update fields conservatively
                for col in ["display_name","category","subcategory","tags","confidence","source","last_seen"]:
                    mm_existing.at[mk, col] = row.get(col, mm_existing.at[mk, col])
            else:
                mm_existing.loc[mk] = row
        mm = mm_existing.reset_index()

        # Re-apply rules + cache after GPT
        applied = apply_rules_and_cache(tx, cfg, mm)

    # 7) Flags
    applied = compute_flags(applied, cfg)

    # 8) Output: merchant_map (updated), transactions_enriched
    mm.to_csv(PATH_MMAP, index=False)

    enriched_cols = [
        "date","account","description","merchant_key",
        "display_name_final","category_final","subcategory_final","tags_final","confidence_final","source_final",
        "amount","is_necessity","is_non_spend_flow"
    ]
    for c in enriched_cols:
        if c not in applied.columns:
            applied[c] = pd.NA
    enriched = applied[enriched_cols].copy()
    enriched = enriched.sort_values(["date","account"], na_position="last")

    enriched_path = os.path.join(PATH_PROCESSED, "transactions_enriched.csv")
    enriched.to_csv(enriched_path, index=False)

    # 9) Review queue for any still-unknown items
    still_unknown = enriched[enriched["category_final"].isna()].copy()
    if not still_unknown.empty:
        still_unknown.to_csv(os.path.join(PATH_DOCS, "review_unknowns.csv"), index=False)

    print("✅ Enrichment complete.")
    print(f" - Normalized: {os.path.join(PATH_INTERIM, 'all_transactions_normalized.csv')}")
    print(f" - Merchant map: {PATH_MMAP}")
    print(f" - Enriched: {enriched_path}")
    if not still_unknown.empty:
        print(f" - Review unknowns: {os.path.join(PATH_DOCS, 'review_unknowns.csv')}")

if __name__ == "__main__":
    main()


[skip] unsupported (not CSV/XLSX): apple_credit_1m.pdf
[warn] chase_checking_6m.CSV missing/empty columns: ['date', 'amount'] — keeping what we have.
[skip] unsupported (not CSV/XLSX): petal_credit_1m.pdf
[skip] could not parse: ssscu_checking_6m.CSV
[skip] could not parse: ssscu_credit_6m.CSV
✅ Enrichment complete.
 - Normalized: C:\Users\kosis\Downloads\Automation\spending-dashboard\data\interim\all_transactions_normalized.csv
 - Merchant map: C:\Users\kosis\Downloads\Automation\spending-dashboard\data\processed\merchant_map.csv
 - Enriched: C:\Users\kosis\Downloads\Automation\spending-dashboard\data\processed\transactions_enriched.csv
 - Review unknowns: C:\Users\kosis\Downloads\Automation\spending-dashboard\docs\review_unknowns.csv


  out["date"] = pd.to_datetime(df[date_col], errors="coerce")
  all_tx = pd.concat(frames, ignore_index=True)
