# example code for training and evaluating hmm models


## define imports and constants


In [None]:
import sys
from pathlib import Path

# get project root for file paths and add project root to python path so imports work from notebooks folder
PROJECT_ROOT = Path("..").resolve()
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

from hmm.preprocess import build_sequences_from_csv
from hmm.hmm_model import (
    train_supervised_hmm,
    train_unsupervised_hmm,
    sequence_accuracy,
    map_hidden_states_to_outcomes,
    convert_hidden_to_observed,
    infer_num_symbols,
)
from hmm.hmm_viterbi import viterbi_on_sequences

import numpy as np
import matplotlib.pyplot as plt

# constants
DATA_PATH = str(PROJECT_ROOT / "data" / "allseasons.csv")  # path where game data is stored
HOLDOUT_SEASONS = (2018, 2024)  # test set seasons
NUM_STATES = 2  # 0 = loss, 1 = win (if 2 states, else state to win/loss mapping is done later)
NUM_BINS = 9

# notebook plotting defaults
plt.style.use("seaborn-v0_8")
%matplotlib inline


## load and inspect sequences


In [32]:
# build sequences using specified holdout seasons
(
    train_states,
    train_obs,
    test_states,
    test_obs,
    train_seasons,
    test_seasons,
) = build_sequences_from_csv(
    DATA_PATH,
    holdout_seasons=HOLDOUT_SEASONS,
    num_bins=NUM_BINS,
)

print(f"num train sequences: {len(train_states)}")
print(f"num test sequences: {len(test_states)}")

# compute basic length statistics for sequences
train_lengths = [s.size for s in train_states]
test_lengths = [s.size for s in test_states]

print()
print("train sequence length stats (min / max / mean):",
      np.min(train_lengths), np.max(train_lengths), np.mean(train_lengths))
print("test sequence length stats (min / max / mean):",
      np.min(test_lengths), np.max(test_lengths), np.mean(test_lengths))


num train sequences: 686
num test sequences: 60

train sequence length stats (min / max / mean): 63 84 80.08746355685132
test sequence length stats (min / max / mean): 81 83 81.9


## train supervised hmm


In [33]:
# choose training routine based on requested number of states
num_symbols = infer_num_symbols(train_obs)
unique_train_labels = sorted(
    {int(label) for seq in train_states for label in np.unique(seq)}
)
use_unsupervised = NUM_STATES > len(unique_train_labels)

if use_unsupervised:
    pi, A, B, log_likelihoods = train_unsupervised_hmm(
        obs_sequences=train_obs,
        num_states=NUM_STATES,
        num_symbols=num_symbols,
        smoothing=1e-3,
        max_iters=75,
        tol=1e-3,
        random_state=0,
    )
    train_mode = "unsupervised (baum-welch)"
else:
    pi, A, B = train_supervised_hmm(
        state_sequences=train_states,
        obs_sequences=train_obs,
        num_states=NUM_STATES,
        num_symbols=num_symbols,
    )
    log_likelihoods = None
    train_mode = "supervised mle"

print(f"training mode: {train_mode}")
if log_likelihoods is not None:
    print(
        f"final log-likelihood: {log_likelihoods[-1]:.2f} after {len(log_likelihoods)} iterations"
    )

print("initial state distribution (pi):")
print(pi)
print()

print("transition matrix (A):")
print(A)
print()

print("emission matrix (B):")
print(B)
print()

print("shapes -> pi:", pi.shape, ", A:", A.shape, ", B:", B.shape)


training mode: supervised mle
initial state distribution (pi):
[0.49127907 0.50872093]

transition matrix (A):
[[0.54212292 0.45787708]
 [0.45811682 0.54188318]]

emission matrix (B):
[[0.22567145 0.19593857 0.16373098 0.13989373 0.1109251  0.08446757
  0.05517141 0.02420118]
 [0.02347332 0.05560812 0.08592328 0.11117985 0.13796492 0.16500473
  0.19572021 0.22512555]]

shapes -> pi: (2,) , A: (2, 2) , B: (2, 8)


In [34]:
# map hidden hmm states to win/loss outcomes when using unsupervised training
state_mapping = None
if use_unsupervised:
    train_hidden_paths = viterbi_on_sequences(
        emissions=B,
        initials=pi,
        transitions=A,
        obs_sequences=train_obs,
    )
    state_mapping = map_hidden_states_to_outcomes(
        hidden_sequences=train_hidden_paths,
        true_sequences=train_states,
        num_hidden_states=NUM_STATES,
    )
    mapped_train_paths = convert_hidden_to_observed(train_hidden_paths, state_mapping)
    train_macro_acc = sequence_accuracy(train_states, mapped_train_paths)
    print("hidden state to outcome mapping:", state_mapping)
    print(f"train accuracy after mapping: {train_macro_acc:.3f}")
else:
    print("supervised training uses direct win/loss states, no mapping needed.")



supervised training uses direct win/loss states, no mapping needed.


## run viterbi decoding on test data and compute accuracy


In [36]:
# run viterbi on test observation sequences
raw_pred_paths = viterbi_on_sequences(
    emissions=B,
    initials=pi,
    transitions=A,
    obs_sequences=test_obs,
)
if state_mapping is not None:
    pred_paths = convert_hidden_to_observed(raw_pred_paths, state_mapping)
else:
    pred_paths = raw_pred_paths

# compute overall test accuracy
overall_acc = sequence_accuracy(test_states, pred_paths)
print("overall test accuracy:", overall_acc)

# compute per-sequence accuracies and keep track of seasons and indices
per_seq_accs = []
per_seq_seasons = []
per_seq_indices = []

for idx, (season, y_true, y_pred) in enumerate(
    zip(test_seasons, test_states, pred_paths)
):
    if y_true.size == 0 or y_pred.size == 0:
        continue
    T = min(y_true.size, y_pred.size)
    acc = np.mean(y_true[:T] == y_pred[:T])
    per_seq_accs.append(acc)
    per_seq_seasons.append(season)
    per_seq_indices.append(idx)

per_seq_accs = np.array(per_seq_accs)
per_seq_seasons = np.array(per_seq_seasons)
per_seq_indices = np.array(per_seq_indices, dtype=int)

print("mean per-sequence accuracy:", per_seq_accs.mean())
print("median per-sequence accuracy:", np.median(per_seq_accs))


overall test accuracy: 0.7273097273097273
mean per-sequence accuracy: 0.727310718332529
median per-sequence accuracy: 0.7317073170731707


## visualize per-sequence accuracy distribution


In [None]:
# plot histogram of per-sequence accuracies
fig, ax = plt.subplots(figsize=(6, 4))
ax.hist(per_seq_accs, bins=20, color="steelblue", edgecolor="black", alpha=0.8)
ax.set_xlabel("per-sequence accuracy")
ax.set_ylabel("count")
ax.set_title("distribution of per-sequence accuracies")
plt.tight_layout()
plt.show()


## confusion matrix of predicted vs true state


In [None]:
# build confusion matrix over all test games
num_outcomes = len(unique_train_labels)
confusion = np.zeros((num_outcomes, num_outcomes), dtype=int)

for y_true, y_pred in zip(test_states, pred_paths):
    if y_true.size == 0 or y_pred.size == 0:
        continue
    T = min(y_true.size, y_pred.size)
    for t in range(T):
        confusion[y_true[t], y_pred[t]] += 1

print("confusion matrix (rows = true, cols = predicted):")
print(confusion)

fig, ax = plt.subplots(figsize=(4, 4))
im = ax.imshow(confusion, cmap="Blues")

ax.set_xticks(range(num_outcomes))
ax.set_yticks(range(num_outcomes))
labels = ["loss (0)", "win (1)"][:num_outcomes]
ax.set_xticklabels(labels)
ax.set_yticklabels(labels)
ax.set_xlabel("predicted state")
ax.set_ylabel("true state")
ax.set_title("confusion matrix")

for i in range(num_outcomes):
    for j in range(num_outcomes):
        ax.text(j, i, confusion[i, j], ha="center", va="center", color="black")

fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
plt.tight_layout()
plt.show()


## example season: true vs predicted states


In [None]:
# select an example test sequence to visualize
example_idx = 0

true_seq = test_states[example_idx]
pred_seq = pred_paths[example_idx]

T = min(true_seq.size, pred_seq.size)
x = np.arange(T)

fig, ax = plt.subplots(figsize=(10, 3))
ax.step(x, true_seq[:T], where="post", label="true state", linewidth=2)
ax.step(x, pred_seq[:T] + 0.02, where="post", label="predicted state", linestyle="--")

ax.set_ylim(-0.2, 1.2)
ax.set_yticks([0, 1])
ax.set_yticklabels(["loss", "win"])
ax.set_xlabel("game index in season")
ax.set_title("example test sequence: true vs predicted states")
ax.legend(loc="upper right")
plt.tight_layout()
plt.show()


## best and worst sequences by accuracy


In [None]:
# sort sequences by accuracy and print a few best and worst examples
k = 5  # number of sequences to display in each group

if per_seq_accs.size == 0:
    print("no non-empty test sequences to display")
else:
    sorted_idx = np.argsort(per_seq_accs)
    k = min(k, per_seq_accs.size)

    print("worst sequences:")
    for rank, idx in enumerate(sorted_idx[:k], start=1):
        seq_acc = per_seq_accs[idx]
        season = per_seq_seasons[idx]
        original_index = per_seq_indices[idx]
        seq_len = test_states[original_index].size
        print(
            f"  {rank}. season {season}, index {original_index}, "
            f"length {seq_len}, accuracy {seq_acc:.3f}"
        )

    print()
    print("best sequences:")
    for rank, idx in enumerate(sorted_idx[-k:][::-1], start=1):
        seq_acc = per_seq_accs[idx]
        season = per_seq_seasons[idx]
        original_index = per_seq_indices[idx]
        seq_len = test_states[original_index].size
        print(
            f"  {rank}. season {season}, index {original_index}, "
            f"length {seq_len}, accuracy {seq_acc:.3f}"
        )


## aggregate performance by season


In [None]:
# compute per-season mean accuracy
if per_seq_accs.size == 0:
    print("no per-sequence accuracies available for season aggregation")
else:
    unique_seasons = np.unique(per_seq_seasons)
    season_means = []
    season_counts = []

    print("per-season accuracy summary:")
    for season in unique_seasons:
        mask = per_seq_seasons == season
        season_accs = per_seq_accs[mask]
        mean_acc = season_accs.mean()
        count = season_accs.size
        season_means.append(mean_acc)
        season_counts.append(count)
        print(
            f"  season {season}: mean accuracy {mean_acc:.3f} "
            f"over {count} sequences"
        )

    season_means = np.array(season_means)
    season_counts = np.array(season_counts)


In [None]:
# bar chart of mean accuracy by season
if per_seq_accs.size == 0:
    print("no data to plot for per-season accuracy")
else:
    fig, ax = plt.subplots(figsize=(8, 4))
    ax.bar(unique_seasons, season_means, color="steelblue", edgecolor="black")
    ax.set_xlabel("season start year")
    ax.set_ylabel("mean per-sequence accuracy")
    ax.set_title("hmm performance by season")
    plt.tight_layout()
    plt.show()
