# Hidden Markov Models

This notebook demonstrates a basic HMM to model the Eisner task: https://www.cs.jhu.edu/~jason/papers/eisner.tnlp02.pdf

In [3]:
import numpy as np


class HMM():
    def __init__(self, pi, A, B):
        """Initializes an HMM initial state parameters, transition probabilities, and
        observation probabilities.
        
        Parameters
        ----------
        pi : numpy array, shape = [n_states]
             Initial state probabilities.
        A : numpy array, shape = [n_states, n_states]
            Transition matrix.
        B : numpy array, shape = [n_states, n_observations]
            Emission prbability matrix.
        """
        self.pi_ = pi
        self.A_ = A
        self.B_ = B
        self.n_states_ = A.shape[0]
        
    def forward(self, O):
        """Computes the likelihood of a given observation sequence.
        
        Parameters
        ----------
        O : numpy array, shape = [seq_length]
        """
        seq_length = O.shape[0]
        alpha = np.zeros((seq_length, self.n_states_))
        
        # Initialization
        alpha[0] = self.pi_ * self.B_[:, O[0]]
            
        # Recursive step
        for t in range(1, seq_length):
            for s in range(self.n_states_):
                alpha[t, s] = alpha[t-1].dot(self.A_[:, s]) * self.B_[s, O[t]]
                    
        return alpha

    def backward(self, O):
        """Computes the backward pass.
        
        Parameters
        ----------
        O : numpy array, shape = [seq_length]
        """
        seq_length = O.shape[0]
        beta = np.zeros((seq_length, self.n_states_))
        beta[-1] = 1
        
        # Recursive step
        for t in range(seq_length-2, -1, -1):
            for s in range(self.n_states_):
                beta[t, s] = np.sum(beta[t+1] * self.A_[s] * self.B_[:, O[t+1]])
                    
        return beta

    def forward_backward(self, O):
        """Computes the forward-backward algorithm.
        
        Parameters
        ----------
        O : numpy array, shape = [seq_length]
        """
        alpha = self.forward(O)
        beta = self.backward(O)
        xi = np.zeros((len(O), self.n_states_))

        for t in range(len(O)):
            xi[t] = alpha[t] * beta[t] / np.sum(alpha[t] * beta[t])

        return xi
    
    def viterbi(self, O):
        """Computes the most likely state sequence given an observation sequence.
        
        Parameters
        ----------
        O : numpy array, shape = [seq_length]
        """
        seq_length = O.shape[0]
        probs = np.zeros((self.n_states_, seq_length))
        backpointer = np.zeros((self.n_states_, seq_length))
        
        # Initialization
        for s in range(self.n_states_):
            probs[s, 0] = self.pi_[s] * self.B_[s, O[0]]
            backpointer[s, 0] = 0
            
        # Recursive step
        for t in range(1, seq_length):
            for s in range(self.n_states_):
                temp = np.zeros((self.n_states_))
                for sp in range(self.n_states_):
                    temp[sp] = probs[sp, t-1] * self.A_[sp, s] * self.B_[s, O[t]]
                    
                probs[s, t] = np.max(temp)
                backpointer[s, t] = np.argmax(temp)
                
        # Termination step
        best_path_prob = np.max(probs[:, -1])
        best_path_pointer = np.argmax(probs[:, -1])
        
        print(probs)
        
        bestpath = np.zeros((seq_length), dtype=np.int32)
        bestpath[-1] = best_path_pointer
        
        for i in range(seq_length-2, -1, -1):
            print(i)
            bestpath[i] = backpointer[bestpath[i+1], i]
                    
        return bestpath, best_path_prob

In [11]:
pi = np.array([.2, .8])

A = np.array([
    [.5, .5],
    [.4, .6]
])

B = np.array([
    [.5, .4, .1],
    [.2, .4, .4]
])

model = HMM(pi, A, B)
seq = np.array([2, 0, 2])
print(model.forward(seq))

# Compute the expected statistics
xi = model.forward_backward(seq)
gamma = np.sum(xi, axis=1)

print(xi)
print(gamma)

# print(model.viterbi(seq))

[[0.02     0.32    ]
 [0.069    0.0404  ]
 [0.005066 0.023496]]
[[0.06337091 0.93662909]
 [0.6039493  0.3960507 ]
 [0.17736853 0.82263147]]
[1. 1. 1.]


In [14]:
import numpy as np

def forward(obs_seq, A, B, pi):
    N = A.shape[0]
    T = len(obs_seq)
    alpha = np.zeros((T, N))
    alpha[0] = pi * B[:, obs_seq[0]]

    for t in range(1, T):
        for j in range(N):
            alpha[t, j] = alpha[t - 1].dot(A[:, j]) * B[j, obs_seq[t]]

    return alpha

def backward(obs_seq, A, B):
    N = A.shape[0]
    T = len(obs_seq)
    beta = np.zeros((T, N))
    beta[-1] = 1

    for t in range(T - 2, -1, -1):
        for i in range(N):
            beta[t, i] = np.sum(beta[t + 1] * A[i] * B[:, obs_seq[t + 1]])

    return beta

def forward_backward(obs_seq, A, B, pi):
    alpha = forward(obs_seq, A, B, pi)
    beta = backward(obs_seq, A, B)
    xi = np.zeros((len(obs_seq), A.shape[0]))

    for t in range(len(obs_seq)):
        xi[t] = alpha[t] * beta[t] / np.sum(alpha[t] * beta[t])

    return xi

def forward_backward(obs_seq, A, B, pi):
    alpha = forward(obs_seq, A, B, pi)
    beta = backward(obs_seq, A, B)
    T = len(obs_seq)
    N = A.shape[0]
    
    xi = np.zeros((T - 1, N, N))
    gamma = np.zeros((T, N))

    for t in range(T - 1):
        xi[t] = np.outer(alpha[t], beta[t + 1]) * A * B[:, obs_seq[t + 1]] / np.sum(alpha[t] * beta[t])
        gamma[t] = alpha[t] * beta[t] / np.sum(alpha[t] * beta[t])

    gamma[-1] = alpha[-1] * beta[-1] / np.sum(alpha[-1] * beta[-1])

    return xi, gamma

def train_hmm(obs_seq, A, B, pi, max_iter=100, tol=1e-6):
    prev_log_prob = float('-inf')
    
    for _ in range(max_iter):
        # E-step: compute xi and gamma using forward_backward function
        xi, gamma = forward_backward(obs_seq, A, B, pi)
        
        # M-step: update HMM parameters
        pi = gamma[0]
        A = np.sum(xi, axis=0) / np.sum(gamma[:-1], axis=0)[:, np.newaxis]
        B = np.zeros_like(B)
        
        for k in range(B.shape[1]):
            B[:, k] = np.sum(gamma[obs_seq == k], axis=0) / np.sum(gamma, axis=0)
        
        # Check for convergence
        log_prob = np.sum(np.log(np.sum(forward(obs_seq, A, B, pi), axis=1)))
        if np.abs(log_prob - prev_log_prob) < tol:
            break
        
        prev_log_prob = log_prob

    return A, B, pi

# Example usage:
obs_seq = [0, 1, 1, 2, 0, 1, 2, 0, 1, 0, 2]
A = np.array([[0.54, 0.46], [0.49, 0.51]])
B = np.array([[0.16, 0.26, 0.58], [0.25, 0.28, 0.47]])
pi = np.array([0.5, 0.5])

xi, gamma = forward_backward(obs_seq, A, B, pi)
print(gamma)


[[0.38932342 0.61067658]
 [0.49017761 0.50982239]
 [0.49836355 0.50163645]
 [0.561503   0.438497  ]
 [0.40706802 0.59293198]
 [0.49420342 0.50579658]
 [0.56127965 0.43872035]
 [0.40668454 0.59331546]
 [0.48647146 0.51352854]
 [0.4067412  0.5932588 ]
 [0.56245851 0.43754149]]
