# Hidden Markov Models (HMM) — Scikit-Learn Style


**Goals**
- Understand HMMs and the three classic problems (Forward, Viterbi, EM)
- Explore a discrete dataset and a continuous (Gaussian) dataset
- Train and decode using a scikit-learn–compatible API (`hmmlearn`) if available
- Fall back to a pure-NumPy discrete HMM implementation otherwise


In [None]:

import numpy as np, matplotlib.pyplot as plt
np.random.seed(42)
try:
    from hmmlearn.hmm import MultinomialHMM, GaussianHMM
    HMMLEARN_AVAILABLE = True
except Exception as e:
    HMMLEARN_AVAILABLE = False
    print("hmmlearn not available:", e)


## Generate Synthetic Datasets (Discrete & Gaussian)

In [None]:

def sample_discrete_hmm(T=600):
    pi = np.array([0.5, 0.3, 0.2])
    A  = np.array([[0.85, 0.10, 0.05],
                   [0.10, 0.80, 0.10],
                   [0.05, 0.15, 0.80]])
    B  = np.array([
        [0.30, 0.25, 0.20, 0.10, 0.05, 0.04, 0.03, 0.03],
        [0.10, 0.10, 0.15, 0.25, 0.20, 0.05, 0.10, 0.05],
        [0.05, 0.05, 0.05, 0.10, 0.15, 0.25, 0.20, 0.15],
    ])
    n_states, n_obs = A.shape[0], B.shape[1]
    states = np.zeros(T, dtype=int)
    obs    = np.zeros(T, dtype=int)
    states[0] = np.random.choice(n_states, p=pi)
    obs[0]    = np.random.choice(n_obs, p=B[states[0]])
    for t in range(1, T):
        states[t] = np.random.choice(n_states, p=A[states[t-1]])
        obs[t]    = np.random.choice(n_obs, p=B[states[t]])
    return obs, states, pi, A, B

def sample_gaussian_hmm(T=600, d=4):
    pi = np.array([0.4, 0.4, 0.2])
    A  = np.array([[0.90, 0.08, 0.02],
                   [0.08, 0.88, 0.04],
                   [0.05, 0.10, 0.85]])
    mus = np.array([np.zeros(d), np.ones(d), np.concatenate([np.full(d//2,-2.0), np.full(d-d//2, 2.0)])])
    covs= np.stack([np.eye(d)*0.4, np.eye(d)*0.5, np.eye(d)*0.7])
    states = np.zeros(T, dtype=int)
    X = np.zeros((T,d))
    states[0] = np.random.choice(3, p=pi)
    X[0] = np.random.multivariate_normal(mus[states[0]], covs[states[0]])
    for t in range(1,T):
        states[t] = np.random.choice(3, p=A[states[t-1]])
        X[t] = np.random.multivariate_normal(mus[states[t]], covs[states[t]])
    return X, states, pi, A, mus, covs

disc_obs, disc_states_true, disc_pi, disc_A, disc_B = sample_discrete_hmm()
cont_X, cont_states_true, cont_pi, cont_A, cont_mus, cont_covs = sample_gaussian_hmm()
print("Discrete obs sample:", disc_obs[:12])


## Quick EDA (Discrete)

In [None]:

def heatmap(M, title):
    plt.figure(figsize=(5,4))
    plt.imshow(M, aspect='auto')
    plt.title(title); plt.colorbar(); plt.tight_layout(); plt.show()

heatmap(disc_A, "True Transition Matrix A")
heatmap(disc_B, "True Emission Matrix B")

counts = np.bincount(disc_obs, minlength=disc_B.shape[1])
plt.figure()
plt.bar(np.arange(disc_B.shape[1]), counts/counts.sum())
plt.title("Observation frequencies"); plt.xlabel("obs id"); plt.ylabel("freq"); plt.show()


## Pure NumPy Discrete HMM (Forward, Backward, Viterbi, EM)

In [None]:

def normalize_rows(M):
    M = M.copy().astype(float)
    row_sums = M.sum(axis=1, keepdims=True)
    row_sums[row_sums==0] = 1.0
    return M / row_sums

class DiscreteHMMNumpy:
    def __init__(self, n_components, n_observations, max_iter=80, tol=1e-4, seed=0):
        rng = np.random.default_rng(seed)
        self.n_components = n_components
        self.n_observations = n_observations
        self.max_iter = max_iter
        self.tol = tol
        self.startprob_ = normalize_rows(rng.random((1,n_components)))[0]
        self.transmat_  = normalize_rows(rng.random((n_components,n_components)))
        self.emissionprob_ = normalize_rows(rng.random((n_components,n_observations)))

    def _forward(self, obs):
        T, N = len(obs), self.n_components
        alpha = np.zeros((T,N)); scale = np.zeros(T)
        alpha[0] = self.startprob_ * self.emissionprob_[:, obs[0]]
        scale[0] = alpha[0].sum(); alpha[0] /= scale[0] + 1e-12
        for t in range(1,T):
            alpha[t] = (alpha[t-1] @ self.transmat_) * self.emissionprob_[:, obs[t]]
            scale[t] = alpha[t].sum(); alpha[t] /= scale[t] + 1e-12
        return alpha, scale, float(np.sum(np.log(scale + 1e-12)))

    def _backward(self, obs, scale):
        T, N = len(obs), self.n_components
        beta = np.zeros((T,N)); beta[-1] = 1.0 / (scale[-1] + 1e-12)
        for t in range(T-2, -1, -1):
            beta[t] = (self.transmat_ @ (self.emissionprob_[:, obs[t+1]] * beta[t+1]))
            beta[t] /= (scale[t] + 1e-12)
        return beta

    def score(self, obs):
        _, _, logp = self._forward(obs)
        return logp

    def predict(self, obs):
        T, N = len(obs), self.n_components
        logA = np.log(self.transmat_ + 1e-12)
        logB = np.log(self.emissionprob_ + 1e-12)
        logpi= np.log(self.startprob_ + 1e-12)
        delta = np.zeros((T,N)); psi = np.zeros((T,N), dtype=int)
        delta[0] = logpi + logB[:, obs[0]]
        for t in range(1,T):
            for j in range(N):
                vals = delta[t-1] + logA[:,j]
                psi[t,j] = int(np.argmax(vals))
                delta[t,j] = np.max(vals) + logB[j, obs[t]]
        states = np.zeros(T, dtype=int)
        states[-1] = int(np.argmax(delta[-1]))
        for t in range(T-2,-1,-1):
            states[t] = psi[t+1, states[t+1]]
        return states

    def fit(self, obs):
        T = len(obs); last = -1e18
        for _ in range(self.max_iter):
            alpha, scale, logp = self._forward(obs)
            beta = self._backward(obs, scale)
            gamma = alpha * beta
            gamma /= gamma.sum(axis=1, keepdims=True)
            xi = np.zeros_like(self.transmat_)
            for t in range(T-1):
                numer = (alpha[t][:,None] * self.transmat_ * self.emissionprob_[:, obs[t+1]][None,:] * beta[t+1][None,:])
                denom = numer.sum()
                if denom > 0: xi += numer / (denom + 1e-12)
            self.startprob_ = gamma[0] / (gamma[0].sum() + 1e-12)
            self.transmat_  = normalize_rows(xi)
            newB = np.zeros_like(self.emissionprob_)
            for k in range(self.n_observations):
                mask = (obs == k)
                if mask.any(): newB[:,k] = gamma[mask].sum(axis=0)
            self.emissionprob_ = normalize_rows(newB)
            if logp - last < self.tol: break
            last = logp
        return self


## Train & Decode (Discrete)

In [None]:

T = len(disc_obs); split = int(0.7*T)
train_obs = disc_obs[:split]; test_obs = disc_obs[split:]
test_states_true = disc_states_true[split:]
K = 3; M = 8

if HMMLEARN_AVAILABLE:
    model = MultinomialHMM(n_components=K, n_iter=100, tol=1e-3, init_params="ste")
    model.n_features = M
    model.fit(train_obs.reshape(-1,1))
    logp_train = model.score(train_obs.reshape(-1,1))
    logp_test  = model.score(test_obs.reshape(-1,1))
    pred = model.predict(test_obs.reshape(-1,1))
else:
    model = DiscreteHMMNumpy(n_components=K, n_observations=M, max_iter=80, tol=1e-4, seed=7)
    model.fit(train_obs)
    logp_train = model.score(train_obs)
    logp_test  = model.score(test_obs)
    pred = model.predict(test_obs)

print("Train log-likelihood:", round(logp_train,2))
print("Test  log-likelihood:", round(logp_test,2))

# permutation-invariant accuracy
def align_acc(true_s, pred_s):
    K = len(np.unique(true_s))
    C = np.zeros((K,K), dtype=int)
    for t,p in zip(true_s, pred_s): C[t,p]+=1
    import itertools
    best=0
    for perm in itertools.permutations(range(K)):
        score = sum(C[i,perm[i]] for i in range(K))
        best = max(best, score)
    return best/len(pred_s)

print("Viterbi accuracy (permutation-invariant):", round(align_acc(test_states_true, pred)*100,1), "%")


## Gaussian HMM (Continuous)

In [None]:

from sklearn.decomposition import PCA
X2 = PCA(n_components=2).fit_transform(cont_X)
plt.figure(); plt.scatter(X2[:,0], X2[:,1], s=8); plt.title("Continuous features (PCA 2D)"); plt.show()

if HMMLEARN_AVAILABLE:
    ghmm = GaussianHMM(n_components=3, covariance_type="full", n_iter=100, tol=1e-3)
    ghmm.fit(cont_X)
    logp = ghmm.score(cont_X)
    pred_g = ghmm.predict(cont_X)
    print("GaussianHMM log-likelihood:", round(logp,2))
else:
    print("hmmlearn not available -> skipping GaussianHMM fit.")


## Exercises


1) Change the emission matrix to make each state emit unique symbols; decode again.  
2) Add Dirichlet pseudocounts in the M-step and inspect stability on short sequences.  
3) Try K in {2,3,4,5}, compute BIC on the test split, and pick the best K.  
4) For the Gaussian case, compare diag vs full covariances if `hmmlearn` is available.
