Two State Hidden Markov Model by Timothy Gunn <br> July 26th, 2025 <br>
The goal of this program is to implement a two state Hidden Markov Model from scratch to practice probabilistic modeling and unsupervised learning. Given a sequence of observed emissions, the model uses the forward-backward algorithm and the Baum-Welch algorithm to recover the underlying state transition and emission probabilities without any prior knowledge of these probabilities. Although there are only 2 emission symbols and 2 hidden states in the dataset, the same algorithms still apply to data with more emissions and hidden states.

In [None]:
#Import necessary libraries
!pip install hmmlearn
import numpy as np
from hmmlearn import hmm

In [None]:
#Define the true transition matrix between hidden states
#State 0 stays in 0 with 90% probability, switches to 1 with 10%
#State 1 stays in 1 with 90% probability, switches to 0 with 10%
transmat_true = np.array([
  [0.9, 0.1],
  [0.1, 0.9]
])

#Define the true emission probabilities
#State 0 emits observation 0 with 80% probability and 1 with 20%
#State 1 emits observation 1 with 70% probability and 0 with 30%
emission_probs_true = np.array([
  [0.8, 0.2],  # State 0: 80% chance of 0
  [0.3, 0.7]   # State 1: 70% chance of 1
])

#Define the initial state probabilities (uniform in this case)
startprob_true = np.array([0.5, 0.5])


#Create a Categorical Hidden Markov Model with 2 hidden states
model_true = hmm.CategoricalHMM(
  n_components=2, #Number of hidden states
)

#Manually set the parameters
model_true.startprob_ = startprob_true
model_true.transmat_ = transmat_true
model_true.emissionprob_ = emission_probs_true

#Sample 500 observations and corresponding hidden states from the model
X, Z = model_true.sample(n_samples=500)

#Output shape and a preview of the generated data
print("Generated Data Shape:", X.shape)
print("First 10 Observations (X):", X.flatten()[:10])
print("First 10 Hidden States (Z):", Z[:10])

Generated Data Shape: (500, 1)
First 10 Observations (X): [1 0 1 1 0 0 1 0 0 0]
First 10 Hidden States (Z): [1 1 0 0 0 0 1 0 0 0]


In [None]:
#Setrandom seed
np.random.seed(42)

#Compute forward probabilities (The full probability of the emitted sequence from t=0 to t=t and the probability of being in state Zt)
#Forward Probability = Pr(X1...Xt, Zt)
def forward(observations, a, b, pi):
  t = len(observations) #Number of time steps
  n = a.shape[0] #Number of hidden states
  alpha = np.zeros((t,n)) #Initial alpha matrix (stores the forward probabilities)

  #Initialization step for t=0
  #alpha[0,i] = P(obs[0], state_0=i) = P(state_0=i) * P(obs[0] | state_0=i)
  alpha[0] = pi * b[:, observations[0]]

  #Induction step
  #For each time step
  for i in range(1,t):
    #For each possible current state j
    for j in range(n):
      #alpha[t,j] = P(obs[0:t+1], state_t=j)
      alpha[i,j] = np.sum(alpha[i-1] * a[:, j]) * b[j, observations[i]]

  return alpha

#Compute backward probabilities (The probability of seeing the rest of the sequence Xn given Z_n-1)
def backward(observations, a, b):
  t = len(observations) #Number of time steps
  n = a.shape[0] #Number of hidden states
  beta = np.zeros((t,n)) #Initialize beta matrix (stores backward probabilities)

  #Initialization step for t=t-1 (the last time step)
  #beta[T-1,i] = 1 for all states i (no future observations to consider)
  beta[-1] = np.ones(n)

  #Induction step
  #For each time step (from t-2 to 0)
  for i in range(t - 2, -1, -1):
    #For each possible current state j
    for j in range(n):
      #beta[t,i] = P(obs[t+1:T] | state_t=i)
      beta[i,j] = np.sum(a[j] * b[:, observations[i+1]] * beta[i+1])

  return beta

#Compute posterior probabilities (gamma and xi)
def FindGammaXi(observations,a, b, pi):
  #Number of time steps and hidden states
  t = len(observations)
  n = a.shape[0]

  #Compute forward and backward probabilities
  alpha = forward(observations, a, b, pi)
  beta = backward(observations, a, b)

  #Compute gamma (State posterior probabilities)
  #gamma[t, j] = P(Q_t = j | observations, model)
  gamma = (alpha * beta) / np.sum(alpha * beta, axis=1, keepdims=True)

  #Initialize xi (Joint state-transition probabilities)
  #xi[t, j, k] = P(Q_t = j, Q_t+1 = k | observations, model)
  #This matrix stores the probabilities of being in state j at time t
  #and the probabilities of transitioning to state k at time t+1, given the entire observation sequence
  xi = np.zeros((t-1, n, n))

  #For each time step
  for i in range(t-1):
    #Calculate the denominator d for the xi formula
    #The denominiator represents the probability of the entire observation sequence P(observations|model)
    d = np.sum(alpha[i][:, None] * a * b[:, observations[i+1]] * beta[i+1])
    #Calculate xi for each possible state transition (j to k) at time i
    for j in range(n): #Current state at time i
      for k in range(n): #Next state at time i+1
        xi[i,j,k] = (alpha[i, j] * a[j,k] * b[k, observations[i+1]] * beta[i+1,k]) / d

  return gamma, xi

#Use the gamma and chi values to update a,b, and pi
def updateParameters(observations, gamma, xi):
  n = gamma.shape[1] #Number of hidden states
  m = np.max(observations) + 1 #Number possible observation symbols
  t = len(observations) #Number of time steps

  #Re-Estimate initial state probabilities (pi)
  #The new initial probability for a given state is just the probability of being
  #in that state at the first time step (t=0) given the observations
  pi = gamma[0]

  #Re-Estimate the transition probabilities (a)
  #a_new[j, k] = (Expected number of transitions from state j to state k) / (Expected number of times in state j)
  a = np.sum(xi, axis=0) / np.sum(gamma[:-1], axis=0, keepdims=True).T

  #Re-Estimate emission prbabilities (b)
  #Initialize b matrix with zeros
  b = np.zeros((n,m))

  #Iterate through each possible observation symbol
  for i in range(m):
    #Create a boolean mask that is true when the observation at time t is equal to i
    mask = (observations == i)
    #Sum gamma[t, j] only for those time steps t where observations[t] was i
    #This gives the expected number of times in state j while emitting i
    b[:,i] = np.sum(gamma[mask], axis=0)

  #Sum gamma[t, j] over all time steps t (from 0 to t-1)
  #This gives the total expected number of times in state j
  b /= np.sum(gamma, axis=0, keepdims=True).T

  return a, b, pi

#Implent the Baum-Welch algorithm to learn the parameters
def baum_welch(observations, n, m, iterations=10):
  #Use Dirichlet distribution to initialize transition matrix, emission matrix, and the initial state distribution
  a = np.random.dirichlet(np.ones(n), size=n)
  b = np.random.dirichlet(np.ones(m), size=m)
  pi = np.random.dirichlet(np.ones(n))

  #Perform Expectation-Maximization
  for i in range(iterations):
    #Expectation step
    gamma, xi = FindGammaXi(observations, a, b, pi)
    #Maximization step
    a, b, pi = updateParameters(observations, gamma, xi)

  return a, b, pi

#Implement the Viterbi algorithm to find the most likely sequence of hidden states
def viterbi(a, b, pi):
  #Number of time steps and hidden states
  t = len(observations)
  n = a.shape[0]

  #delta[i, j] = max probability of any path that ends in state j at time i
  delta = np.zeros((t,n))

  #psi[i, j] = the state at time i-1 that led to state j at time i with the highest probability
  psi = np.zeros((t, n), dtype=int)

  #Initilization step
  #Set initial probabilities for each state based on starting probability and emission probability of first observation
  delta[0] = pi * b[:, observations[0]]

  #Recursion step
  for i in range(1,t): #For each time step
    for j in range(n): #For each possible current state
      temp = delta[i-1] * a[:,j] #Probability of each previous state transitioning to j
      psi[i,j] = np.argmax(temp) #Most likely previous state
      delta[i,j] = np.max(temp) * b[j, observations[i]] #max probability path to state j at time i

  #Termination step
  path = np.zeros(t, dtype=int)

  #Start backtracking from the most probable final state
  path[-1] = np.argmax([delta[-1]])

  #Backtracking
  for i in reversed(range(1,t)):
    path[i-1] = psi[i, path[i]] #Backtrack to previous state using psi

  return path



observations = X.flatten()
a, b, pi = baum_welch(observations, n=2, m=2, iterations=1000)
hiddenStates = viterbi(a, b, pi)

print("Estimated transition matrix:\n", a)
print("Actual transition matrix:\n", transmat_true)
print("Estimated emission matrix:\n", b)
print("Actual emission matrix:\n", emission_probs_true)
print("Estimated initial distribution:\n", pi)
print("Actual initial distribution:\n", startprob_true)
print("First 10 predicted hidden states:", hiddenStates[:10])
print("First 10 true hidden States:", Z[:10])





Estimated transition matrix:
 [[0.91792298 0.08207702]
 [0.14673815 0.85326185]]
Actual transition matrix:
 [[0.9 0.1]
 [0.1 0.9]]
Estimated emission matrix:
 [[0.75022477 0.24977523]
 [0.27702502 0.72297498]]
Actual emission matrix:
 [[0.8 0.2]
 [0.3 0.7]]
Estimated initial distribution:
 [0. 1.]
Actual initial distribution:
 [0.5 0.5]
First 10 predicted hidden states: [1 1 1 1 0 0 0 0 0 0]
First 10 true hidden States: [1 1 0 0 0 0 1 0 0 0]
