In [None]:
import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent))

import data.data_source as data_source
import time
import math
import uuid
from datetime import date
from dateutil.relativedelta import relativedelta
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import env

import pandas as pd
import mlflow

# ─── CONFIG ───────────────────────────────────────────────────────────────────
experiment_name = f"Populate Tsy Inventory [{env}]"
mlflow.set_experiment(experiment_name)

BATCH_SIZE   = 1_000
MAX_WORKERS  = 8
ARTIFACT_DIR = "artifacts/results"

# ─── HELPERS ──────────────────────────────────────────────────────────────────
def find_mount_root(start: Path, target: str = "mnt") -> Path:
    p = start.resolve()
    while p.name != target and p.parent != p:
        p = p.parent
    if p.name != target:
        raise FileNotFoundError(f"‘{target}’ not found in {start}")
    return p

def sanitize_quantity(val):
    try:
        q = float(val)
        return 0.0 if (math.isnan(q) or math.isinf(q)) else round(q * 0.01, 2)
    except:
        return 0.0

def quote(val):
    if val is None:
        return "NULL"
    if isinstance(val, float) and (math.isnan(val) or math.isinf(val)):
        return "NULL"
    if hasattr(val, "tzinfo") and pd.isna(val):  # Timestamp check
        return "NULL"

    s = str(val)
    if s.strip().lower() in {"nan", "null", ""}:
        return "NULL"

    # escape single quotes by doubling them
    escaped = s.replace("'", "''")
    return "'{}'".format(escaped)

def chunks(seq, size):
    for i in range(0, len(seq), size):
        yield seq[i : i + size]

# ─── DATA & DB ────────────────────────────────────────────────────────────────
def fetch_active_bonds(ds, snapshot_date: date):
    sql = f"""
      SELECT *
        FROM treasury_auction_results
       WHERE security_type IN ('Note','Bill','Bond')
         AND issue_date   < '{snapshot_date}'
         AND maturity_date> '{snapshot_date}'
    """
    return ds.query(sql).to_pandas().to_dict(orient="records")

def upsert_inventory(ds, rows, snapshot_date: date):
    if not rows:
        mlflow.log_metric("rows_inserted", 0)
        return

    cols = [
      "inventory_date","cusip","quantity","security_type","security_term",
      "issue_date","maturity_date","int_rate","int_payment_frequency",
      "series","price_per100","auction_date"
    ]
    col_list = ", ".join(cols)
    set_list = ", ".join(f"{c}=EXCLUDED.{c}" for c in cols if c not in ("inventory_date","cusip"))

    for batch in chunks(rows, BATCH_SIZE):
        values = ["(" + ", ".join(quote(r.get(c)) for c in cols) + ")" for r in batch]
        template = """
        INSERT INTO bond_inventory ({col_list})
        VALUES
          {batch_values}
        ON CONFLICT (inventory_date, cusip) DO UPDATE SET
          {set_list};
        """
        sql = template.format(
          col_list    = col_list,
          batch_values= ",\n  ".join(values),
          set_list    = set_list
        )
        ds.query(sql)

# ─── WORKER ───────────────────────────────────────────────────────────────────
def run_snapshot(snapshot_date: date):
    # Give each thread its own DS/client
    ds = data_source.get_data_source()
    run_id = str(uuid.uuid4())
    with mlflow.start_run(run_name=f"Inventory {snapshot_date}", nested=True):
        mlflow.set_tag("snapshot_date", snapshot_date.isoformat())
        start = time.time()

        raw = fetch_active_bonds(ds, snapshot_date)
        mlflow.log_metric("rows_fetched", len(raw))

        # build inventory
        inv = []
        for r in raw:
            q = round(sanitize_quantity(r.get("comp_accepted")) / 1_000_000) * 5_000
            if q > 0 and r.get("cusip"):
                inv.append({
                    "inventory_date":  snapshot_date,
                    "cusip":           r["cusip"],
                    "quantity":        q,
                    "security_type":   r["security_type"],
                    "security_term":   r["security_term"],
                    "issue_date":      r["issue_date"],
                    "maturity_date":   r["maturity_date"],
                    "int_rate":        r["int_rate"],
                    "int_payment_frequency": r["int_payment_frequency"],
                    "series":          r["series"],
                    "price_per100":    r["price_per100"],
                    "auction_date":    r["auction_date"],
                })
        # dedupe
        seen = set()
        dedup = []
        for row in inv:
            key = (row["inventory_date"], row["cusip"])
            if key not in seen:
                seen.add(key)
                dedup.append(row)

        upsert_inventory(ds, dedup, snapshot_date)
        elapsed = time.time() - start
        mlflow.log_metric("rows_inserted", len(dedup))
        mlflow.log_metric("run_duration_sec", elapsed)

        # write out CSV and log artifact
        df = pd.DataFrame(dedup)
        mnt = find_mount_root(Path.cwd())
        out = mnt / ARTIFACT_DIR / f"bond_inventory_{snapshot_date}.csv"
        out.parent.mkdir(parents=True, exist_ok=True)
        df.to_csv(out, index=False)
        mlflow.log_artifact(str(out), artifact_path="bond_inventory")

# ─── ENTRYPOINT ──────────────────────────────────────────────────────────────
def main(days: int, max_workers: int = MAX_WORKERS):
    today = date.today()
    snapshots = [ today - relativedelta(days=i) for i in range(days) ]

    # one outer run for the whole backfill
    with mlflow.start_run(run_name="Backfill Inventory", nested=False):
        mlflow.log_param("days", days)
        # parallel
        with ThreadPoolExecutor(max_workers=max_workers) as exe:
            futures = { exe.submit(run_snapshot, d): d for d in snapshots }
            for f in as_completed(futures):
                d = futures[f]
                try:
                    f.result()
                except Exception as e:
                    print(f"⚠️ Snapshot {d} failed:", e)

if __name__ == "__main__":
    main(days=365*30)


🏃 View run Inventory 2024-12-12 at: http://127.0.0.1:8768/#/experiments/1463/runs/98fb6f3ff44c47d69bd29aab3df185f9
🧪 View experiment at: http://127.0.0.1:8768/#/experiments/1463
getting data source for sandbox
🏃 View run Inventory 2024-12-11 at: http://127.0.0.1:8768/#/experiments/1463/runs/271995564e014fbb8b86ee706443e2ef
🧪 View experiment at: http://127.0.0.1:8768/#/experiments/1463
getting data source for sandbox
🏃 View run Inventory 2024-12-10 at: http://127.0.0.1:8768/#/experiments/1463/runs/11b5c41760e44326a5933ff24af6d211
🧪 View experiment at: http://127.0.0.1:8768/#/experiments/1463
getting data source for sandbox
getting data source for sandbox
getting data source for sandbox
getting data source for sandbox
getting data source for sandbox
getting data source for sandbox
getting data source for sandbox
getting data source for sandbox
getting data source for sandbox
🏃 View run Inventory 2024-12-09 at: http://127.0.0.1:8768/#/experiments/1463/runs/616aea7aa88d480493cb436d8f2408d9