In [19]:
#!/usr/bin/env python3
import sys
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta

import numpy as np
import pandas as pd
import mlflow

import data.data_source as data_source
from config import env
from utils.artifact_saver import get_artifact_path

# ─── CONFIG ─────────────────────────────────────────────────────────────────

mlflow.set_experiment(f"Populate IR Cone Metrics [{env}]")

CURVE_TYPE     = "US Treasury Par"
TENORS         = [1/12, 0.125, 2/12, 0.25, 4/12, 0.5, 1, 2, 3, 5, 7, 10, 20, 30]
FIT_YEARS      = 1
BACKDATE_DAYS  = 365
BATCH_SIZE     = 500

ds = data_source.get_data_source()

# ─── COLUMN LISTS ───────────────────────────────────────────────────────────

COLUMNS = [
  "curve_type","model_type","curve_date","days_forward","tenor_num","tenor_str",
  "forecast_p01","forecast_p05","forecast_p10","forecast_p50",
  "forecast_p90","forecast_p95","forecast_p99",
  "realized_date","realized_rate",
  "forecast_error","absolute_error","relative_error",
  "percentile_rank","inside_cone",
  "n_obs_fit","total_variance","trace_covariance"
]
UPSERT_COLS = COLUMNS[5:]  # everything after tenor_str

# ─── UTILITY ─────────────────────────────────────────────────────────────────

def percentile_rank(real: float, pct_map: dict[int,float]) -> float:
    xs, ys = zip(*sorted(pct_map.items()))
    return float(np.interp(real, ys, xs))

# ─── POPULATOR ───────────────────────────────────────────────────────────────

def populate(days: int) -> list[tuple]:
    today     = date.today()
    end_date  = today - timedelta(days=1)
    asof_dates = [end_date - timedelta(days=d) for d in range(1, days+1)]
    min_asof   = min(asof_dates)
    max_asof   = max(asof_dates)

    print('# 1) BULK FETCH HISTORY')
    hist_start = max(min_asof - relativedelta(years=FIT_YEARS), date(2010,1,1))
    tstr       = ",".join(map(str, TENORS))
    hist_q = f"""
      SELECT curve_date, tenor_num, rate
        FROM rate_curves
       WHERE curve_type = '{CURVE_TYPE}'
         AND curve_date BETWEEN '{hist_start}' AND '{end_date}'
         AND tenor_num IN ({tstr})
    """
    hist_df = ds.query(hist_q).to_pandas()
    hist_df['curve_date'] = pd.to_datetime(hist_df['curve_date'])
    hist_pivot = hist_df.pivot(
        index='curve_date', columns='tenor_num', values='rate'
    )

    print('# 2) BULK FETCH CONES')
    cones_q = f"""
      SELECT curve_date, model_type, days_forward, tenor_num, tenor_str, cone_type, rate
        FROM rate_cones
       WHERE curve_type = '{CURVE_TYPE}'
         AND curve_date BETWEEN '{min_asof}' AND '{max_asof}'
         AND tenor_num IN ({tstr})
    """
    cones_raw = ds.query(cones_q).to_pandas()
    cones_raw['curve_date'] = pd.to_datetime(cones_raw['curve_date'])
    cones_df = cones_raw.pivot_table(
        index=['curve_date','model_type','days_forward','tenor_num','tenor_str'],
        columns='cone_type', values='rate'
    ).reset_index()

    print('# 3) BULK FETCH REALIZED RATES + FORWARD-FILL')
    cones_df['realized_date'] = cones_df['curve_date'] + pd.to_timedelta(cones_df['days_forward'], unit='d')
    real_dates = cones_df['realized_date'].drop_duplicates()
    dates_sql  = ",".join(f"'{d}'" for d in real_dates)
    real_q = f"""
      SELECT curve_date AS realized_date, tenor_num, rate
        FROM rate_curves
       WHERE curve_type = '{CURVE_TYPE}'
         AND curve_date IN ({dates_sql})
         AND tenor_num IN ({tstr})
    """
    real_df = ds.query(real_q).to_pandas()
    real_df['realized_date'] = pd.to_datetime(real_df['realized_date'])

    # build complete grid and forward-fill missing
    idx = pd.MultiIndex.from_product(
        [real_dates, TENORS], names=['realized_date','tenor_num']
    )
    real_complete = real_df.set_index(['realized_date','tenor_num'])['rate'].reindex(idx)
    real_filled = real_complete.groupby(level='tenor_num').ffill().reset_index(name='rate')
    real_map = {
        (row['realized_date'], row['tenor_num']): row['rate']
        for _, row in real_filled.iterrows()
    }

    print('# 4) COMPUTE & ASSEMBLE ROWS')
    rows = []
    for _, r in cones_df.iterrows():
        asof  = r['curve_date']
        start = max(
            asof - relativedelta(years=FIT_YEARS),
            pd.Timestamp('2010-01-01')
        )
        hist_slice = hist_pivot.loc[start:asof]

        cov_mat   = hist_slice.cov().values
        n_obs     = len(hist_slice)
        total_var = float(cov_mat.sum())
        trace_var = float(np.trace(cov_mat))

        pct = { pct: r.get(f'{pct}%') for pct in [1,5,10,50,90,95,99] }
        f50   = pct[50]
        dfwd  = int(r['days_forward'])
        tnum  = r['tenor_num']
        real  = real_map.get((r['realized_date'], tnum))
        err   = (f50 - real) if real is not None else None

        rows.append((
            CURVE_TYPE, r['model_type'], asof, dfwd, tnum, r['tenor_str'],
            pct[1], pct[5], pct[10], pct[50], pct[90], pct[95], pct[99],
            r['realized_date'], real,
            err,
            abs(err) if err is not None else None,
            (err/real) if (err is not None and real) else None,
            percentile_rank(real, pct) if real is not None else None,
            bool(real is not None and pct[5] <= real <= pct[95]),
            n_obs, total_var, trace_var
        ))

    print('# 5) BATCH UPSERT')
    set_clause = ",\n    ".join(f"{c} = EXCLUDED.{c}" for c in UPSERT_COLS)
    for i in range(0, len(rows), BATCH_SIZE):
        batch = rows[i:i+BATCH_SIZE]
        vals_list = []
        for row in batch:
            cells = []
            for v in row:
                if pd.isna(v):
                    cells.append("NULL")
                elif isinstance(v, (str, date, pd.Timestamp)):
                    cells.append(f"'{v}'")
                elif isinstance(v, bool):
                    cells.append("TRUE" if v else "FALSE")
                else:
                    cells.append(str(v))
            vals_list.append(f"({','.join(cells)})")
        vals = ",\n".join(vals_list)

        sql = f"""
            INSERT INTO rate_cone_diagnostics ({','.join(COLUMNS)})
            VALUES
            {vals}
            ON CONFLICT (model_type, curve_date, days_forward, tenor_num) DO UPDATE
            SET
            {set_clause}
            """
        ds.query(sql)

    return rows

# ─── ENTRYPOINT ──────────────────────────────────────────────────────────────

if __name__ == '__main__':
    days = BACKDATE_DAYS
    with mlflow.start_run():
        mlflow.log_param("backdate_days", days)
        rows  = populate(days)
        count = len(rows)
        mlflow.log_metric("rows_upserted", count)

        # artifact
        df = pd.DataFrame(rows, columns=COLUMNS)
        fp = get_artifact_path("rate_cone_diagnostics_backfill.csv")
        df.to_csv(fp, index=False)
        mlflow.log_artifact(fp, artifact_path="rate_cone_diagnostics")

    print(f"✅ Done: upserted {count} rows into rate_cone_diagnostics.")

getting data source for sandbox
# 1) BULK FETCH HISTORY
# 2) BULK FETCH CONES
# 3) BULK FETCH REALIZED RATES + FORWARD-FILL
# 4) COMPUTE & ASSEMBLE ROWS
# 5) BATCH UPSERT
🏃 View run aged-pug-709 at: http://127.0.0.1:8768/#/experiments/1524/runs/91eef4f5cbe044598bb042b4386732c9
🧪 View experiment at: http://127.0.0.1:8768/#/experiments/1524
✅ Done: upserted 35555 rows into rate_cone_diagnostics.
