# CO495 ASML Coursework 2 - Hidden Markov Models

In this coursework, you are asked to implement filtering, smoothing, and optionally Viterbi decoding for discrete and continuous valued HMMs. Input data and initialization is provided and should be used for reproducibility.

In [114]:
import numpy as np
from collections import namedtuple
from scipy.stats import norm
from numpy import linalg as LA

The functions below are here to guide you in your implementation of the EM and Viterbi algorithms for Hidden Markov Models. We follow Section 17.4 of _Machine Learning: A Probabilistic Perspective_ by Kevin Murphy (2012).

You should write vectorized modular code to promote re-usability, efficiency, and readability.

Your task is to complete the implementation and to report the results obtained from the provided initialization. You are strongly encouraged to explore different initialization schemes for the algorithms.

## Helper functions and classes

You may use this function in your implementation.

In [115]:
def normalize(A, dim=None, precision=1e-9):
    """This function is taken from Kevin Murphy's code for Machine Learning: a Probabilistic Perspective.

    Make the entries of a (multidimensional) array sum to 1
    A, z = normalize(A) normalize the whole array, where z is the normalizing constant
    A, z = normalize(A, dim)
    If dim is specified, we normalize the specified dimension only.
    dim=0 means each column sums to one
    dim=1 means each row sums to one


    Set any zeros to one before dividing.
    This is valid, since s=0 iff all A(i)=0, so
    we will get 0/1=0

    Adapted from https://github.com/probml/pmtk3"""
    z = A.sum(dim)
    # If z is a scalar, z.shape is an empty tuple and evaluated to False
    if z.shape:
        z[np.abs(z) < precision] = 1
    elif np.abs(z) < precision:
        return 0, 1
    
    return A / z, z

The initial values are provided as namedtuples (initialization.A is the initial value for A)

In [116]:
InitGaussian = namedtuple('InitGaussian', ['A', 'Means', 'Variances', 'pi'])
InitMultinomial = namedtuple('InitMultinomial', ['A', 'B', 'pi'])

In [117]:
#Here we load data to do test seperately.
with np.load('init_gaussian.npz') as f:
    init_g = InitGaussian(f['arr_0'], f['arr_1'], f['arr_2'], f['arr_3'])
    
with np.load('init_multinomial.npz') as f:
    init_m = InitMultinomial(f['arr_0'], f['arr_1'], f['arr_2'])

with np.load('data_gaussian.npz') as f:
    Y_c, S_c = f['arr_0'], f['arr_1']

with np.load('data_multinomial.npz') as f:
    Y_d, S_d = f['arr_0'], f['arr_1']



## Filtering and Smoothing

Break down your implementation according to the functions below. Feel free to create additional ones whenever you see fit, but the general flow of the algorithm should be made apparent

### Observation model

The core of EM estimation on HMM operates on vectors of probabilities, so the main difference between EM for Gaussian HMM and multinomial HMM is the computation of the observation probabilities and which parameters to estimate.

Complete the two functions below to compute the probabilities of the data for a given observation model and use them in the rest of the algorithm. Your filtering and smoothing steps should be model agnostic.

In [118]:
def computeSmallB_Gaussian(Y, Means, Variances, Nhidden, T):
    """Compute the probabilities for the data points Y for a Gaussian observation model 
        with parameters Means and Variances.
        
        Input parameters:
            - Y: the data
            - Means: vector of the current estimates of the means
            - Variances: vector of the current estimates of the variances
            - Nhidden: number of hidden states
            - T: length of the sequence
        Output:
            - b: vector of observation probabilities
    """

    b = np.zeros((Nhidden, T))

    for i in range(T):
        for j in range(Nhidden):
            b[j][i] = norm.pdf(Y[i], loc = Means[j], scale = np.sqrt(Variances[j]))
    return b

In [119]:
def computeSmallB_Discrete(Y, B):
    """Compute the probabilities for the data points Y for a multinomial observation model 
        with observation matrix B
        
        Input parameters:
            - Y: the data
            - B: matrix of observation probabilities
        Output:
            - b: vector of observation probabilities
    """
    
    b = np.zeros((B.shape[0], len(Y)))
    
    for i in range(len(Y)):
        for j in range(B.shape[0]):
            b[j][i] = B[j][Y[i] - 1]
  
    
    return b

### Smoothing and filtering: Estimation step

The E step involves smoothing and filtering. Refer to the course notes and/or to the recommended readings to implement these steps in the functions below.

In [120]:
def BackwardFiltering(A, b, N, T, Z):
    """Perform backward filtering.
        Input parameters:
            - A: estimated transition matrix (between states)
            - b: estimated observation probabilities (local evidence vector)
            - N: number of hidden states
            - T: length of the sequence
        Output:
            - beta: filtered probabilities
    """
    beta = np.zeros((N, T))
    
    beta[:, T - 1] = np.array([1,1])

    
    for t in range(T-2, -1, -1):
        
        inner = beta[:, t + 1] * b[:, t+ 1] 
        outter = np.dot(A, inner.reshape(N, 1))
        
        beta[:, t] = outter.reshape(N)
        
        beta[:, t] = beta[:, t] / Z[t + 1]
        
    return beta

In [121]:
def ForwardFiltering(A, b, pi, N, T):
    """Filtering using the forward algorithm (Section 17.4.2 of K. Murphy's book)
    Input:
      - A: estimated transition matrix
      - b: estimated observation probabilities (local evidence vector)
      - pi: initial state distribution pi(j) = p(z_1 = j)
    Output:
      - Filtered belief state at time t: alpha = p(z_t|x_1:t)
      - log p(x_1:T)
      - Z: normalization constant"""
    
    alpha = np.zeros((N, T))
    pi = pi.reshape(N)
    
    logProb = 0.0
    
    norm, z = normalize(pi * b[:, 0])
    logProb += np.log(z)
    alpha[:, 0] = norm
    Z = np.zeros(T)
    
    Z[0] = z
    
    for t in range(1, T):
        inner = np.dot(A.transpose(), alpha[:, t - 1].reshape(N,1))
        alpha[:, t] = b[:, t] * inner.reshape(N)
        
        norm, z = normalize(alpha[:, t])
        
        logProb += np.log(z)
        
        Z[t] = z
            
        alpha[:, t] = norm

            
    return alpha, logProb, Z

In [122]:
def ForwardBackwardSmoothing(A, b, pi, N, T):
    """Smoothing using the forward-backward algorithm.
    Input:
      - A: estimated transition matrix
      - b: local evidence vector (observation probabilities)
      - pi: initial distribution of states
      - N: number of hidden states
      - T: length of the sequence
    Output:
      - alpha: filtered belief state as defined in ForwardFiltering
      - beta: conditional likelihood of future evidence as defined in BackwardFiltering
      - gamma: gamma_t(j) proportional to alpha_t(j) * beta_t(j)
      - lp: log probability defined in ForwardFiltering
      - Z: constant defined in ForwardFiltering"""
    
    
    alpha, lp, Z = ForwardFiltering(A, b, pi, N, T)
    beta = BackwardFiltering(A, b, N, T, Z)
    gamma = alpha * beta
    return alpha, beta, gamma, lp, Z

Use the output of SmoothedMarginals in the maximization step for A.

In [123]:
def SmoothedMarginals(A, b, alpha, beta, T, Nhidden):
    "Two-sliced smoothed marginals p(z_t = i, z_t+1 = j | x_1:T)"
    
    marginal = np.zeros((Nhidden, Nhidden, T-1));

    for t in range(T-1):
        marginal[:, :, t] = normalize(A * np.dot(alpha[:, t].reshape(Nhidden,1), (b[:, t+1] * beta[:, t+1]).reshape(1,2)))[0]
    return marginal

## EM estimation

Implement the main algorithm in the skeletons below.
How can you measure the performance of your model and choose an appropriate convergence criterion?
_Hint: the logProb returned by the ForwardBackwardSmoothing function can be used_.

In [124]:
def EM_Maximisation_gaussian(Y, gamma, smoothedMarginal, Nhidden, T, N):
    
    norm_term = np.zeros(Nhidden)
    miu_term = np.zeros(Nhidden)
    for l in range(N):
        for t in range(T):
            for k in range(Nhidden):
                norm_term[k] += gamma[l][k][t]
                miu_term[k] += gamma[l][k][t] * Y[l][t]
                
    Means = miu_term / norm_term
    
    sigma_term = np.zeros(Nhidden)
    for l in range(N):
        for t in range(T):
            for k in range(Nhidden):
                sigma_term[k] += gamma[l][k][t] * (Y[l][t] - Means[k])**2
    
    Variances = sigma_term / norm_term
    
    pi_term = np.zeros(Nhidden)
    pi_norm = 0.0
    
    for l in range(N):
        for k in range(Nhidden):
            pi_term[k] += gamma[l][k][0]
        for r in range(Nhidden):
            pi_norm += gamma[l][r][0]
    
    pi = pi_term / pi_norm
    
    trans = np.zeros((Nhidden, Nhidden))
    trans_norm = np.zeros(Nhidden)
    
    for l in range(N):
        for t in range(T - 1):
            for j in range(Nhidden):
                for k in range(Nhidden):
                    trans[j][k] += smoothedMarginal[l][j][k][t]
                for r in range(Nhidden):
                    trans_norm[j] += smoothedMarginal[l][j][r][t]
    for i in range(Nhidden):
        trans[i,:] = trans[i,:] / trans_norm[i] 
    
    return trans, Means, Variances, pi
            
        

### Gaussian observation model

In [125]:
def EM_estimate_gaussian(Y, Nhidden, Niter, epsilon, init):
    
    # Dimensions of the data
    N, T = Y.shape
    
    # Initialization
    
    # Initial transition matrix should be stochastic (rows sum to 1)
    A = init.A
    
    # Initial means and variances of the emission probabilities
    Means = init.Means
    Variances = init.Variances;
    
    # Class prior
    pi = init.pi
    
    

    ###############################################
    # EM algorithm
    
    
    i = 0
    # Initialize convergence criterion here
    
    while i<Niter: # and condition on criterion and precision epsilon
        # Iterate here
        
        gamma = []
        smoothedMarginal = []
        
        for j in range(N):
            b = computeSmallB_Gaussian(Y[j], Means, Variances, Nhidden, T)
            
            alpha, beta, gamma_j, logProb, Z = ForwardBackwardSmoothing(A, b, pi, Nhidden, T)
                       
            gamma.append(gamma_j)
            
            sMar = SmoothedMarginals(A, b, alpha, beta, T, Nhidden)
            
            smoothedMarginal.append(sMar)
            
            
        A_n, Means_n, Variances_n, pi_n = EM_Maximisation_gaussian(Y, gamma, smoothedMarginal, Nhidden, T, N)
        
        if LA.norm(A_n - A) < epsilon or LA.norm(Means_n - Means) < epsilon or  \
                  LA.norm(Variances_n - Variances) < epsilon or LA.norm(pi_n - pi) < epsilon:
            break
        else:
            #print (pi)
            A = A_n
            Means = Means_n
            Variances = Variances_n
            pi = pi_n
            i += 1
            
#     print (i)       
    return A, Means, Variances, pi

The cell below is a test cell for EM estimation of Gaussian

In [126]:
# A_g, Means_g, Variances_g, Pi_g = EM_estimate_gaussian(Y_c, 2, 100, 1e-6, init_g)
# print (A_g)
# print (Means_g)
# print (Variances_g) 
# print (Pi_g)

### Multinomial observation model

In the maximization step for B you will have to compute a quantity involving indicators on the values of Y. One efficient way to do it is to pre-compute a representation of Y using _one-hot encoding_. In MATLAB:

```% X sparse coding
Nv = length(unique(Y));
X = zeros(T, Nv);
for i=1:T
    X(i, Y(i)) = 1;
end
% Maximization: emission matrix
B1 = B1 + gamma * X;```

In [127]:
def EM_Maximisation_multinomial(Y, gamma, smoothedMarginal, Nhidden, T, N):
    Nv = len(np.unique(Y))
    
    emission_term = np.zeros((Nhidden, Nv))
    norm_term = np.zeros(Nhidden)
    for l in range(N):
        norm_term += np.sum(gamma[l], axis = 1)
        
        X = np.zeros((T, Nv))
        for i in range(T):
            X[i, Y[l][i] - 1] = 1
        emission_term += np.dot(gamma[l], X)
    
    for i in range(Nhidden):
        emission_term[i,:] /= norm_term[i]
        
    pi_term = np.zeros(Nhidden)
    pi_norm = 0.0
    
    for l in range(N):
        pi_norm += np.sum(gamma[l][:,0])
        pi_term += gamma[l][:,0]
        
    pi = pi_term / pi_norm
    
    trans = np.zeros((Nhidden, Nhidden))
    trans_norm = np.zeros(Nhidden)
    
    for l in range(N):
        for t in range(T - 1):
            for j in range(Nhidden):
                for k in range(Nhidden):
                    trans[j][k] += smoothedMarginal[l][j][k][t]
                for r in range(Nhidden):
                    trans_norm[j] += smoothedMarginal[l][j][r][t]
    for i in range(Nhidden):
        trans[i,:] = trans[i,:] / trans_norm[i] 
    
    
    return trans, emission_term, pi
    

In [128]:
def EM_estimate_multinomial(Y, Nhidden, Niter, epsilon, init):
    
    # Dimensions of the data
    N, T = Y.shape
    
    # Initialization
    
    # Initial transition matrix should be stochastic (rows sum to 1)
    A = init.A
    
    # Observation matrix B
    B = init.B
    
    # Class prior
    pi = init.pi
    
    #print (A, B, pi)
    
    ###############################################
    # EM algorithm
    
    i = 0
    # Initialize convergence criterion here
    
    while i<Niter: # and condition on criterion and precision epsilon
        # Iterate here
        gamma = []
        smoothedMarginal = []
        for j in range(N):
            b = computeSmallB_Discrete(Y[j], B)
            
            alpha, beta, gamma_j, logProb, Z = ForwardBackwardSmoothing(A, b, pi, Nhidden, T)
            
            gamma.append(gamma_j)
            
            sMar = SmoothedMarginals(A, b, alpha, beta, T, Nhidden)
            
            smoothedMarginal.append(sMar)
            
            
        A_n, B_n, pi_n = EM_Maximisation_multinomial(Y, gamma, smoothedMarginal, Nhidden, T, N)
        
        if LA.norm(A_n - A) < epsilon or LA.norm(B_n - B) < epsilon or LA.norm(pi_n - pi) < epsilon:
            break
        else:
            A = A_n
            B = B_n
            pi = pi_n
            i += 1
#     print (i)
    return A, B, pi

## Viterbi decoding

Viterbi decoding should be performed on the smoothed data and most of the algorithm doesn't depend on the output model. To help you, we identified the steps that are model specific. Implement Viterbi decoding by completing the skeleton below. 'smallB' is a function and should be used in the standard way: smallB(x).

In [130]:
def ViterbiDecode(Y, Nhidden, outModel, init):
    
    if outModel == 'gauss':
        A, Mu, Sigma, Pi = EM_estimate_gaussian(Y, Nhidden, 100, 1e-6, init)
        smallB = lambda X : computeSmallB_Gaussian(X, Mu, Sigma, Nhidden, len(X))
    elif outModel == 'multinomial':
        A, B, Pi = EM_estimate_multinomial(Y, Nhidden, 100, 1e-6, init)
        smallB = lambda X : computeSmallB_Discrete(X, B)
    else:
        raise ValueError('Invalid observation model: must be either "gauss" or "multinomial"')
     # Implement Viterbi decoding here.
    N, T = Y.shape
    S_all = np.zeros((N, Nhidden, T))
    Pi = Pi.reshape(Nhidden)
    
    A_log = np.log2(A.transpose())

    for l in range(N):
        
        b = smallB(Y[l])
        S_all[l][:, 0] = np.log2(Pi) + np.log2(b[:, 0])
        
        for t in range(1, T):
            pre = S_all[l][:, t-1].reshape(1,Nhidden)
            pre = np.append(pre, pre, axis = 0)

            Max = A_log + pre
            Max = np.max(Max, axis = 1)
            
            S_all[l][:, t] = Max + np.log2(b[:, t])   
    
    S = np.zeros((N, T))
    for l in range(N):
        for t in range(T):
            seq = np.where(S_all[l][:,t] == np.max(S_all[l][:,t]))[0] + 1
            S[l][t] = seq

   
    return S

## Demo code

In [133]:
with np.load('init_gaussian.npz') as f:
    init_g = InitGaussian(f['arr_0'], f['arr_1'], f['arr_2'], f['arr_3'])
    
with np.load('init_multinomial.npz') as f:
    init_m = InitMultinomial(f['arr_0'], f['arr_1'], f['arr_2'])

with np.load('data_gaussian.npz') as f:
    Y_c, S_c = f['arr_0'], f['arr_1']

with np.load('data_multinomial.npz') as f:
    Y_d, S_d = f['arr_0'], f['arr_1']
    
A_g, Means_g, Variances_g, Pi_g = EM_estimate_gaussian(Y_c, 2, 100, 1e-6, init_g)
A_m, B_m, Pi_m = EM_estimate_multinomial(Y_d, 2, 100, 1e-6, init_m)

S_g = ViterbiDecode(Y_c, 2, 'gauss', init_g)
S_m = ViterbiDecode(Y_d, 2, 'multinomial', init_m)

print('*** Viterbi decoding accuracy (Gaussian): {}'.format( float((S_c == S_g).sum()) / float(S_c.size )))
print('*** Viterbi decoding accuracy (Multinomial): {}'.format( float((S_d == S_m).sum()) / float(S_d.size) ))

*** Viterbi decoding accuracy (Gaussian): 0.9949
