# Grade Transition Matrices

Build 1‑year grade transition matrices from anonymized data.
Outputs go to `model_fits/outputs/transitions/`.

In [None]:
from pathlib import Path
import os
import numpy as np
import pandas as pd

In [None]:
INPUT_PATH = "anonymized.csv"
OUT_DIR = "model_fits/outputs/transitions"

# Auto-detect anonymized.csv if not in current working directory
if not Path(INPUT_PATH).exists():
    candidates = list(Path.cwd().glob("**/anonymized.csv"))
    if candidates:
        INPUT_PATH = str(candidates[0])
    else:
        raise FileNotFoundError("anonymized.csv not found. Set INPUT_PATH to the full path.")

print("Using INPUT_PATH:", INPUT_PATH)
Path(OUT_DIR).mkdir(parents=True, exist_ok=True)

In [None]:
GRADE_ORDER = ["A", "B", "C", "D"]

def _norm_key(s: str) -> str:
    return " ".join(s.strip().lower().replace("_", " ").split())


def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    col_map = {_norm_key(c): c for c in df.columns}
    def _get(name: str) -> str:
        k = _norm_key(name)
        return col_map.get(k, name)

    rename = {}
    rename[_get("Adj strategy")] = "Adj Strategy"
    rename[_get("Adj Strategy")] = "Adj Strategy"
    rename[_get("Quarter of Transaction Date")] = "Quarter"
    rename[_get("Year of Transaction Date")] = "Year"
    rename[_get("FundID")] = "FundID"
    rename[_get("Grade")] = "Grade"
    rename[_get("Current Grade")] = "Grade_Current"
    rename[_get("CurrentGrade")] = "Grade_Current"
    rename[_get("Grade Current")] = "Grade_Current"
    rename[_get("Grade_Current")] = "Grade_Current"
    return df.rename(columns=rename)


def parse_quarter(q) -> float:
    if pd.isna(q):
        return np.nan
    if isinstance(q, (int, np.integer, float, np.floating)):
        return float(q)
    s = str(q).strip().upper()
    if s.startswith("Q"):
        s = s[1:]
    try:
        return float(s)
    except Exception:
        return np.nan


def add_quarter_end(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["Quarter"] = df["Quarter"].apply(parse_quarter)
    df["Year"] = pd.to_numeric(df["Year"], errors="coerce")
    m = df["Year"].notna() & df["Quarter"].notna()
    years = df.loc[m, "Year"].astype(int)
    quarters = df.loc[m, "Quarter"].astype(int)
    df.loc[m, "quarter_end"] = pd.PeriodIndex(year=years, quarter=quarters, freq="Q").to_timestamp("Q")
    return df


def apply_current_grade(df: pd.DataFrame, context: str = "") -> pd.DataFrame:
    df = df.copy()
    if "Grade_Current" in df.columns:
        df["Grade"] = df["Grade_Current"]
        if context:
            print(f"Using Grade_Current for {context}.")
        return df
    if all(c in df.columns for c in ["Grade", "FundID", "quarter_end"]):
        df["Grade"] = df["Grade"].astype(str).str.strip()
        df.loc[df["Grade"].isin(["", "nan", "None", "NaN", "<NA>"]), "Grade"] = np.nan
        df = df.sort_values(["FundID", "quarter_end"])
        df["Grade_Current"] = df.groupby("FundID")["Grade"].ffill()
        df["Grade"] = df["Grade_Current"]
        if context:
            print(f"Computed Grade_Current (forward fill) for {context}.")
    return df


def transition_matrix(df: pd.DataFrame, out_path: str) -> None:
    counts = (df.groupby(["Grade", "Grade_next"]).size()
                .unstack(fill_value=0)
                .reindex(index=GRADE_ORDER, columns=GRADE_ORDER, fill_value=0))
    probs = counts.div(counts.sum(axis=1).replace(0, np.nan), axis=0).fillna(0.0)
    probs.to_csv(out_path)


def slug(s: str) -> str:
    return "".join(ch.lower() if ch.isalnum() else "_" for ch in str(s)).strip("_")

In [None]:
# --- Load + compute ---

df = pd.read_csv(INPUT_PATH, engine="python")
df = normalize_columns(df)
df = add_quarter_end(df)
df = apply_current_grade(df, context="transitions")
df = df.dropna(subset=["FundID", "Grade", "quarter_end"])
df["Grade"] = df["Grade"].astype(str).str.strip()
df = df[df["Grade"].isin(GRADE_ORDER)]

# Map grade at t and t+4 quarters
base = df[["FundID", "quarter_end", "Grade", "Adj Strategy"]].drop_duplicates()
base["next_qe"] = (base["quarter_end"].dt.to_period("Q") + 4).dt.to_timestamp("Q")
next_df = base[["FundID", "quarter_end", "Grade"]].rename(columns={
    "quarter_end": "next_qe",
    "Grade": "Grade_next",
})
merged = base.merge(next_df, on=["FundID", "next_qe"], how="left").dropna(subset=["Grade_next"])

out_all = Path(OUT_DIR) / "grade_transition_1y_all.csv"
transition_matrix(merged, str(out_all))

if "Adj Strategy" in merged.columns:
    for strat, g in merged.groupby("Adj Strategy", dropna=False):
        out = Path(OUT_DIR) / f"grade_transition_1y_{slug(strat)}.csv"
        transition_matrix(g, str(out))

print("Wrote transitions to:", OUT_DIR)