<h1 align="center">Advanced Statistical Machine Learning & Pattern Recognition - CO495</h1>
<h2 align="center">Coursework 2 - Hidden Markov Models </h2>
<h3 align="center">Ilaria Manco </h3> 

In [1]:
import numpy as np
from collections import namedtuple
from scipy.stats import norm

## Helper functions and classes

In [2]:
def normalize(A, dim=None, precision=1e-9):
    """This function is adapted from Kevin Murphy's code for Machine Learning: a Probabilistic Perspective.

    Make the entries of a (multidimensional) array sum to 1
    A, z = normalize(A) normalize the whole array, where z is the normalizing constant
    A, z = normalize(A, dim)
    If dim is specified, we normalize the specified dimension only.
    dim=0 means each column sums to one
    dim=1 means each row sums to one


    Set any zeros to one before dividing.
    This is valid, since s=0 iff all A(i)=0, so
    we will get 0/1=0

    Adapted from https://github.com/probml/pmtk3"""
    
    if dim is not None and dim > 1:
        raise ValueError("Normalize doesn't support more than two dimensions.")
    
    z = A.sum(dim)
    # If z is a scalar, z.shape is an empty tuple and evaluates to False
    if z.shape:
        z[np.abs(z) < precision] = 1
    elif np.abs(z) < precision:
        return 0, 1
    
    if dim == 1:
        return np.transpose(A.T / z), z
    else:
        return A / z, z

The initial values are provided as namedtuples (initialization.A is the initial value for A)

In [3]:
InitGaussian = namedtuple('InitGaussian', ['A', 'Means', 'Variances', 'pi'])
InitMultinomial = namedtuple('InitMultinomial', ['A', 'B', 'pi'])

## Filtering and Smoothing

### Observation model

The core of EM estimation on HMM operates on vectors of probabilities, so the main difference between EM for Gaussian HMM and multinomial HMM is the computation of the observation probabilities and which parameters to estimate.

The two functions below compute the probabilities of the data for a given observation model.

In [4]:
def computeSmallB_Gaussian(Y, Means, Variances, Nhidden, T):
    """Compute the probabilities for the data points Y for a Gaussian observation model 
        with parameters Means and Variances.
        
        Input parameters:
            - Y: the data
            - Means: vector of the current estimates of the means
            - Variances: vector of the current estimates of the variances
            - Nhidden: number of hidden states
            - T: length of the sequence
        Output:
            - b: vector of observation probabilities
    """
    
    b = np.zeros((Nhidden, T))
    
    for i in range(Nhidden):
        pdf = norm.pdf(Y, Means[i], np.sqrt(Variances[i]))
        b[i] = pdf
                   
    return normalize(b, dim = 0)[0]

In [5]:
def computeSmallB_Discrete(Y, B):
    """Compute the probabilities for the data points Y for a multinomial observation model 
        with observation matrix B
        
        Input parameters:
            - Y: the data
            - B: matrix of observation probabilities
        Output:
            - b: vector of observation probabilities
    """
    k = len(B)
    N = len(Y)
    
    b = np.zeros((k, N))

    for i in range(k):
        for n in range(N):
            b[i][n] = B[i][int(Y[n]-1)]
    
    return normalize(b, dim = 0)[0]

### Smoothing and filtering: Estimation step

The E step involves smoothing and filtering.

In [6]:
def BackwardFiltering(A, b, N, T):
    """Perform backward filtering.
        Input parameters:
            - A: estimated transition matrix (between states)
            - b: estimated observation probabilities (local evidence vector)
            - N: number of hidden states
            - T: length of the sequence
        Output:
            - beta: filtered probabilities
    """
    beta = np.ones((N, T))
    
    for t in range(T-1, -1, -1):
        beta[:, t-1] = np.dot(A,beta[:,t]*b[:,t])
        
    #normalise
    beta = beta/np.sum(beta, axis = 0)
        
    return beta

In [7]:
def ForwardFiltering(A, b, pi, N, T):
    """Filtering using the forward algorithm (Section 17.4.2 of K. Murphy's book)
    Input:
      - A: estimated transition matrix
      - b: estimated observation probabilities (local evidence vector)
      - pi: initial state distribution pi(j) = p(z_1 = j)
    Output:
      - Filtered belief state at time t: alpha = p(z_t|x_1:t)
      - log p(x_1:T)
      - Z: normalization constant"""
    
    alpha = np.ones((N, T))
    Z = np.ones(T)
    
    alpha[:,0], Z[0] = normalize(np.multiply(b[:,0], pi.T))
    
    for t in range(1, T):
        alpha[:,t], Z[t] = normalize(np.multiply(b[:,t], np.dot(A.T,alpha[:,t-1])))
    
    logProb = np.sum(np.log(Z))
    
    return alpha, logProb, Z

In [8]:
def ForwardBackwardSmoothing(A, b, pi, N, T):
    """Smoothing using the forward-backward algorithm.
    Input:
      - A: estimated transition matrix
      - b: local evidence vector (observation probabilities)
      - pi: initial distribution of states
      - N: number of hidden states
      - T: length of the sequence
    Output:
      - alpha: filtered belief state as defined in ForwardFiltering
      - beta: conditional likelihood of future evidence as defined in BackwardFiltering
      - gamma: gamma_t(j) proportional to alpha_t(j) * beta_t(j)
      - lp: log probability defined in ForwardFiltering
      - Z: constant defined in ForwardFiltering"""
    
    alpha, logProb, Z = ForwardFiltering(A, b, pi, N, T) 
    beta = BackwardFiltering(A, b, N, T)
    
    gamma = normalize(alpha*beta, dim =0)[0]
    
    return alpha, beta, gamma, logProb, Z

In [9]:
def SmoothedMarginals(A, b, alpha, beta, T, Nhidden):
    "Two-sliced smoothed marginals p(z_t = i, z_t+1 = j | x_1:T)"
    
    marginal = np.zeros((Nhidden, Nhidden, T-1));
    
    for t in range(T-1):
        
        product = b[:, t+1] * beta[:, t+1]
        product = np.reshape(product, (1,2))

        marginal[:, :, t] = normalize(A * np.outer(alpha[:, t],  product.T), dim=1)[0]

    return marginal

## EM estimation

Below is the implementation of the EM algorithm.

#### Initialisation
Here, only the initialisation values provided are used. 

However, other ways to initialise the parameters include:
* randomly choose a set of parameters and run the algorithm multiple times to identify the best set
* ignore the Markov dependencies and use other mixture model estimation techniques such as K-Means or EM
* use Viterbi training after reaching near convergence with the forwards-backwards training

#### Convergence
A convergence criterion is applied as follows: at each step, the difference between the logProb obtained from the previous iteration and the current one is computed to check whether this is smaller than $\epsilon = 10^6$. If this is the case, convergence is reached and the udpated parameters are returned. 

If not, the iterations continue until they reach the upper bound fixed by the *Niter* argument.


#### Performance
The performance of the algorithm is evaluated using Viterbi decoding (implemented below).

### Gaussian observation model

In [10]:
def EM_estimate_gaussian(Y, Nhidden, Niter, epsilon, init):
    
    # Dimensions of the data
    N, T = Y.shape
    
    # Initialization
    A = init.A
    Means = init.Means
    Variances = init.Variances;
    pi = init.pi
    
    ###############################################
    # EM algorithm
    
    i = 0
        
    logProbSum = [0, 1]

    while i<Niter and (abs(logProbSum[i+1] - logProbSum[i]) > epsilon): 
        means_num, den, var_num = np.zeros((Nhidden,1)), np.zeros((Nhidden,1)), np.zeros((Nhidden,1))
        pi_new, A_new = np.zeros(np.shape(init.pi)), np.zeros(np.shape(init.A))
        
        logProbList = []
        
        for n in range(N):
            #select sequence
            Y_s = Y[n]
            
            b = computeSmallB_Gaussian(Y_s, Means, Variances, Nhidden, T)
            alpha, beta, gamma, logProb, Z =  ForwardBackwardSmoothing(A, b, pi, Nhidden, T)
            marginal = SmoothedMarginals(A, b, alpha, beta, T, Nhidden)
                        
            means_num +=  np.reshape(np.sum(np.multiply(gamma, Y_s), axis = 1), (Nhidden,1))
            den += np.reshape(np.sum(gamma, axis = 1), (Nhidden,1))
            var_num += np.reshape(np.sum(np.multiply(gamma, (Y_s - Means)*(Y_s - Means)), axis = 1), (Nhidden,1))
            
            pi_new += np.reshape(gamma[:,0], (Nhidden, 1))
            A_new += np.sum(marginal, axis = 2)
            
            logProbList.append(logProb)

        logProbSum.append(sum(logProbList))
        
        Means = means_num/den
        Variances = var_num/den
        A = normalize(A_new, dim =1)[0]
        pi = normalize(pi_new)[0] 

        i += 1    
    
    print("Convergence (Gaussian): ",abs(logProbSum[i+1] - logProbSum[i]) < epsilon)    
    return A, Means, Variances, pi

### Multinomial observation model

In [11]:
def one_hot_encoding(Y):
    N = len(np.unique(Y))
    T = len(Y)
    X = np.zeros((T, N))
    
    for t in range(T):
        X[t][int(Y[t])-1] = 1
    
    return X

In [12]:
def EM_estimate_multinomial(Y, Nhidden, Niter, epsilon, init):
    
    # Dimensions of the data
    N, T = Y.shape
    
    # Initialization
    A = init.A
    B = init.B
    pi = init.pi
    
    ###############################################
    # EM algorithm
    
    i = 0
    
    logProbSum = [0, 1]
    
    while (i<Niter) and (abs(logProbSum[i+1] - logProbSum[i]) > epsilon): 
        A_new, B_new, pi_new = np.zeros(np.shape(init.A)), np.zeros(np.shape(init.B)), np.zeros(np.shape(init.pi))
        den = np.zeros((Nhidden, 1))
    
        logProbList = []

        for n in range(N):
            #select the sequence
            X = one_hot_encoding(Y[n])
            
            b = computeSmallB_Discrete(Y[n], B)
            alpha, beta, gamma, logProb, Z = ForwardBackwardSmoothing(A, b, pi, Nhidden, T)
            marginal = SmoothedMarginals(A, b, alpha, beta, T, Nhidden)

            B_new += np.dot(gamma, X)
            den += np.reshape(np.sum(gamma, axis = 1), (Nhidden,1))
            
            pi_new += np.reshape(gamma[:,0], (Nhidden, 1))
            A_new += np.sum(marginal, axis = 2)

            logProbList.append(logProb)

        logProbSum.append(sum(logProbList))
        B = normalize(B_new/den, dim = 1)[0]       
        A = normalize(A_new, dim = 1)[0]
        pi = normalize(pi_new)[0]

        i += 1
    
    print("Convergence (multinomial): ",abs(logProbSum[i+1] - logProbSum[i]) < epsilon)
    return A, B, pi

## Viterbi decoding

In [13]:
def ViterbiDecode(Y, Nhidden, outModel, init):
    
    if outModel == 'gauss':
        A, Mu, Sigma, Pi = EM_estimate_gaussian(Y, Nhidden, 100, 1e-6, init)
        smallB = lambda X : computeSmallB_Gaussian(X, Mu, Sigma, Nhidden, len(X))
    elif outModel == 'multinomial':
        A, B, Pi = EM_estimate_multinomial(Y, Nhidden, 100, 1e-6, init)
        smallB = lambda X : computeSmallB_Discrete(X, B)
    else:
        raise ValueError('Invalid observation model: must be either "gauss" or "multinomial"')
    
    # Implement Viterbi decoding here.
    N, T = np.shape(Y)
    
    S = np.ones((N, T))

    for n in range(N):
        b = smallB(Y[n])
        
        delta = np.ones((Nhidden, T))
        psi = np.ones((Nhidden, T))
        
        delta[:,0] = np.log2((Pi.T * b[:,0]))
        
        alpha, beta, gamma, logProb, Z = ForwardBackwardSmoothing(A, b, Pi, Nhidden, T)
        marginals = SmoothedMarginals(A, b, alpha, beta, T, Nhidden)
        
        for t in range(1, T):
            for j in range(Nhidden):
                delta[j, t], psi[j, t] = max(delta[:, t-1] + np.log2(A[:,j])), np.argmax(delta[:, t-1] + np.log2(A[:,j])) 
                delta[j, t] = delta[j, t] + np.log2(b[j,t])
                
        #traceback
        S[n][T-1] = np.argmax(delta[:,T-1])
        for t in range(T-2, -1, -1):
            S[n][t] = psi[int(S[n][t+1])][t+1]
                                                                        
    return S+1

## Demo code

In [14]:
with np.load('init_gaussian.npz') as f:
    init_g = InitGaussian(f['arr_0'], f['arr_1'], f['arr_2'], f['arr_3'])
    
with np.load('init_multinomial.npz') as f:
    init_m = InitMultinomial(f['arr_0'], f['arr_1'], f['arr_2'])
    
with np.load('data_gaussian.npz') as f:
    Y_c, S_c = f['arr_0'], f['arr_1']

with np.load('data_multinomial.npz') as f:
    Y_d, S_d = f['arr_0'], f['arr_1']
    
A_g, Means_g, Variances_g, Pi_g = EM_estimate_gaussian(Y_c, 2, 100, 1e-6, init_g)
A_m, B_m, Pi_m = EM_estimate_multinomial(Y_d, 2, 100, 1e-6, init_m)

S_g = ViterbiDecode(Y_c, 2, 'gauss', init_g)
S_m = ViterbiDecode(Y_d, 2, 'multinomial', init_m)

print('*** Viterbi decoding accuracy (Gaussian): {}'.format( (S_c == S_g).sum() / S_c.size ))
print('*** Viterbi decoding accuracy (Multinomial): {}'.format( (S_d == S_m).sum() / S_d.size ))

Convergence (Gaussian):  True
Convergence (multinomial):  False
Convergence (Gaussian):  True
Convergence (multinomial):  False
*** Viterbi decoding accuracy (Gaussian): 0.9995
*** Viterbi decoding accuracy (Multinomial): 0.5975


In [15]:
print('A =', A_g)
print('Means=', Means_g)
print('Variances=', Variances_g)
print('pi=', Pi_g)

A = [[ 0.39067931  0.60932069]
 [ 0.38883957  0.61116043]]
Means= [[ 0.09215062]
 [ 5.00272057]]
Variances= [[ 0.40331952]
 [ 0.82207155]]
pi= [[ 0.56011078]
 [ 0.43988922]]


In [16]:
print('A =', A_m)
print('B=', B_m)
print('pi=', Pi_m)

A = [[ 0.38662667  0.61337333]
 [ 0.38662667  0.61337333]]
B= [[ 0.11980829  0.26147544  0.04246297  0.03442291  0.10120943  0.44062095]
 [ 0.13510802  0.04664145  0.18213438  0.17774353  0.14615633  0.3122163 ]]
pi= [[ 0.33914967]
 [ 0.66085033]]


As convergence is not reached within 100 iterations for the multinomial case, the lower performance compared to the Gaussian model is expected.