In [None]:
# 02_analysis.ipynb 
# Imports and basic setup for analysis
"""
02_analysis.ipynb builds on top of that pipeline and focuses on evaluation and interpretation: 
    regime stability across seeds and samples, macroeconomic alignment, and simple ROC-style analysis 
    for downturn detection.
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pathlib import Path
from itertools import combinations
from sklearn.metrics import roc_curve, auc

from hmm_utils import (     # from mini-project style HMM
    baum_welch_train,      
    viterbi_decode,        
    forward_pass,          
    backward_pass,        
    compute_posteriors,   
    count_num_params,
    compute_aic_bic,
)

plt.rcParams["figure.figsize"] = (10, 4)
plt.rcParams["axes.grid"] = True

DATA_PATH = Path("data") / "fhfa_hpi_raw.csv"   # adjust to your actual file name

In [None]:
# Load and preprocess HPI data (same idea as in final_pipeline)

def load_and_preprocess_hpi(filepath):
    """
    Load FHFA HPI data and build a clean time series with log returns.

    Feel free to copy the exact version from final_pipeline.ipynb.
    Here we assume:
      - there is a 'date' column
      - there is an 'hpi' column with the index level
    """
    df_raw = pd.read_csv(filepath)

    df = df_raw.copy()

    # parse date if present
    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])
        df = df.sort_values("date")
    else:
        df = df.sort_index()

    if "hpi" not in df.columns:
        raise ValueError("Please rename your HPI level column to 'hpi' or update this loader.")

    df["log_hpi"] = np.log(df["hpi"])
    df["ret"] = df["log_hpi"].diff()

    df = df.dropna(subset=["ret"]).reset_index(drop=True)
    return df


df = load_and_preprocess_hpi(DATA_PATH)
df.head()

In [None]:
# Wrapper to train HMM for a single K and random seed

def train_hmm_with_seed(observations, K, max_iters=100, n_restarts=3, seed=0):
    """
    Train a Gaussian HMM with K states for a given random seed.

    This is basically the same pattern as in final_pipeline.
    It uses baum_welch_train and viterbi_decode from the mini-project style code.
    """
    T = len(observations)

    best_params, loglik_trace = baum_welch_train(
        observations,
        K,
        max_iters=max_iters,
        tol=1e-6,
        n_restarts=n_restarts,
        seed=seed,
    )

    final_loglik = loglik_trace[-1]
    num_params = count_num_params(K)
    aic, bic = compute_aic_bic(final_loglik, num_params, T)
    viterbi_path = viterbi_decode(observations, best_params)

    result = {
        "K": K,
        "params": best_params,
        "loglik_trace": loglik_trace,
        "final_loglik": final_loglik,
        "AIC": aic,
        "BIC": bic,
        "viterbi_path": viterbi_path,
        "seed": seed,
    }
    return result

In [None]:
# Regime stability across multiple random seeds for a fixed K (for example, K = 3)

observations = df["ret"].values

K_target = 3
seeds = [0, 1, 2, 3, 4]   # you can extend this if you want

results_multi_seed = []

for s in seeds:
    print(f"Training HMM with K={K_target}, seed={s}...")
    res = train_hmm_with_seed(observations, K_target, max_iters=100, n_restarts=2, seed=s)
    results_multi_seed.append(res)

print("Done training multiple seeds.")

In [None]:
# Compute pairwise similarity of Viterbi paths (Hamming-based agreement)

def path_agreement(path1, path2):
    """
    Simple Hamming-style agreement between two integer paths.

    Note: This does not try to fix label switching.
    For this course project, a rough measure is fine.
    """
    assert len(path1) == len(path2)
    return np.mean(path1 == path2)


n_runs = len(results_multi_seed)
agreement_matrix = np.zeros((n_runs, n_runs))

for i, j in combinations(range(n_runs), 2):
    p_i = results_multi_seed[i]["viterbi_path"]
    p_j = results_multi_seed[j]["viterbi_path"]
    score = path_agreement(p_i, p_j)
    agreement_matrix[i, j] = score
    agreement_matrix[j, i] = score

np.fill_diagonal(agreement_matrix, 1.0)

agreement_matrix

In [None]:
# Plot a heatmap of path agreement across seeds

import seaborn as sns  # if you prefer, you can use plain matplotlib, but seaborn is nice for heatmaps

labels = [f"seed={r['seed']}" for r in results_multi_seed]

plt.figure(figsize=(6, 5))
sns.heatmap(agreement_matrix, annot=True, fmt=".2f",
            xticklabels=labels, yticklabels=labels,
            vmin=0.0, vmax=1.0, cmap="viridis")
plt.title(f"Viterbi path agreement (K={K_target})")
plt.tight_layout()
plt.show()

In [None]:
# Look at means and variances of each state across seeds

all_means = []
all_vars = []

for res in results_multi_seed:
    params = res["params"]
    all_means.append(params["means"])
    all_vars.append(params["vars"])

all_means = np.array(all_means)  # shape: (n_runs, K)
all_vars = np.array(all_vars)

print("State means across seeds:\n", all_means)
print("\nState variances across seeds:\n", all_vars)

In [None]:
# boxplots for means and variances

fig, axes = plt.subplots(1, 2, figsize=(10, 4))

axes[0].boxplot(all_means, labels=[f"state {k}" for k in range(K_target)])
axes[0].set_title("Means per state across seeds")

axes[1].boxplot(all_vars, labels=[f"state {k}" for k in range(K_target)])
axes[1].set_title("Variances per state across seeds")

plt.tight_layout()
plt.show()

In [None]:
# Macro alignment: define some rough macro episodes and inspect regime usage

# Choose one representative run for macro analysis
best_res = results_multi_seed[0]   # you can also pick the best BIC one
path = best_res["viterbi_path"]

df_macro = df.copy()
df_macro["regime"] = path  # 0..K-1

# Example macro windows (you should tune these dates to real episodes)
macro_windows = [
    ("Pre-crisis",  "2000-01-01", "2006-12-31"),
    ("Crisis",      "2007-01-01", "2011-12-31"),
    ("Post-crisis", "2012-01-01", "2019-12-31"),
    ("Recent",      "2020-01-01", "2025-12-31"),
]

summary_rows = []

for name, start, end in macro_windows:
    mask = (df_macro["date"] >= start) & (df_macro["date"] <= end)
    sub = df_macro[mask]
    if len(sub) == 0:
        continue

    counts = sub["regime"].value_counts(normalize=True).sort_index()
    row = {"Period": name, "T": len(sub)}
    for k in range(K_target):
        row[f"regime_{k}_share"] = counts.get(k, 0.0)
    summary_rows.append(row)

df_macro_summary = pd.DataFrame(summary_rows)
df_macro_summary

In [None]:
# Plot the returns with regime bands and mark macro windows for visual alignment

state_colors = ["tab:blue", "tab:orange", "tab:green", "tab:red"]

fig, ax = plt.subplots(figsize=(10, 4))

ax.plot(df_macro["date"], df_macro["ret"], label="log-return")
ax.set_ylabel("log-return")
ax.set_title(f"HMM regimes (K={K_target}) with macro windows")

# regime bands
for t in range(len(df_macro)):
    ax.axvspan(
        df_macro["date"].iloc[t],
        df_macro["date"].iloc[t] + pd.Timedelta("1D"),
        color=state_colors[df_macro["regime"].iloc[t]],
        alpha=0.06,
    )

# vertical lines for macro window boundaries
for name, start, end in macro_windows:
    ax.axvline(pd.to_datetime(start), color="k", linestyle="--", alpha=0.4)
    ax.axvline(pd.to_datetime(end), color="k", linestyle="--", alpha=0.2)
    ax.text(pd.to_datetime(start), ax.get_ylim()[1],
            name, rotation=90, verticalalignment="bottom", fontsize=8)

plt.tight_layout()
plt.show()

In [None]:
# Construct a simple downturn proxy label

df_down = df_macro.copy()

# rolling 3-month cumulative return
df_down["ret_3m"] = df_down["ret"].rolling(window=3).sum()

# downturn label: 1 if 3-month sum < 0, else 0
df_down["downturn_label"] = (df_down["ret_3m"] < 0).astype(int)

df_down[["date", "ret", "ret_3m", "downturn_label"]].head(10)

In [None]:
# Get smoothed posterior gamma_t(k) and build a "downturn score" -1

# Use the same params as best_res
params = best_res["params"]

# compute posteriors (gamma_t(k))
log_alpha, loglik = forward_pass(observations, params)
log_beta = backward_pass(observations, params)
gamma = compute_posteriors(log_alpha, log_beta)   # shape (T, K)

gamma[:5]

In [None]:
# Pick the "down-like" regime as the state with smallest mean -2

means = params["means"]
down_state = np.argmin(means)
down_state

In [None]:
# Build a simple downturn score from smoothed posterior -3

downturn_score = gamma[:, down_state]   # 1D array of probabilities

df_down["downturn_score"] = downturn_score
df_down[["date", "downturn_label", "downturn_score"]].head()

In [None]:
# ROC curve and AUC for the downturn label vs HMM downturn_score

# Drop early rows where ret_3m is NaN
mask_valid = ~df_down["ret_3m"].isna()
y_true = df_down.loc[mask_valid, "downturn_label"].values
y_score = df_down.loc[mask_valid, "downturn_score"].values

fpr, tpr, thresholds = roc_curve(y_true, y_score)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, label=f"ROC curve (AUC = {roc_auc:.3f})")
plt.plot([0, 1], [0, 1], "k--", label="random")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC for downturn detection using HMM regime probability")
plt.legend()
plt.tight_layout()
plt.show()

print(f"AUC for downturn detection: {roc_auc:.3f}")

In [None]:
# Simple "early warning" check: shift the score by 1 period into the past

lead = 1  # you can also try 2 or 3
df_down[f"downturn_score_lead{lead}"] = df_down["downturn_score"].shift(lead)

mask_valid_lead = (~df_down["ret_3m"].isna()) & (~df_down[f"downturn_score_lead{lead}"].isna())
y_true_lead = df_down.loc[mask_valid_lead, "downturn_label"].values
y_score_lead = df_down.loc[mask_valid_lead, f"downturn_score_lead{lead}"].values

fpr_lead, tpr_lead, _ = roc_curve(y_true_lead, y_score_lead)
roc_auc_lead = auc(fpr_lead, tpr_lead)

plt.figure()
plt.plot(fpr, tpr, label=f"Same-period (AUC={roc_auc:.3f})")
plt.plot(fpr_lead, tpr_lead, label=f"Lead {lead} step (AUC={roc_auc_lead:.3f})")
plt.plot([0, 1], [0, 1], "k--", label="random")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC comparison: same-period vs early warning")
plt.legend()
plt.tight_layout()
plt.show()