My implementation of Rabiner's article "A tutorial on Hidden Markov Models and Selected Applications in Speech Recognition"



Solutions to the three problems of Hidden Markov Models:
* Likelihood **evaluation** of a sequence of observations given a fully specified HMM model => Forward-Backward algorithm
* Most **"correct" state** sequence determination problem, given a sequence of observations and a fully specified model => Viterbi algorithm
* **Model training** problem. Specifying the "best" model parameters given a set of observed signals => Baum-Welch algorithm

In [1]:
import numpy as np

In [2]:
def initialize_random_model(N, M):
    """
    Parameters:
        N (int): number of unique states
        M (int): number of unique observation signals (the signal is a categorical variable)
        
    Returns:
        A (numpy.array N x N): state transition matrix
        B (numpy.array N x M): state-observation emission probability matrix
        pi (numpy.array N): state initialization probabilities
    """
    
    # Transition matrix
    A = np.random.random(size = (N, N))
    A /= A.sum(axis=1).reshape((N, 1))

    # Observation emission matrix
    B = np.random.random(size = (N, M))
    B /= B.sum(axis=1).reshape((N, 1))

    # Initial transition probabilities
    pi = np.random.random(size = (N,))
    pi /= pi.sum()
    
    return A, B, pi

def simulate(model, T):
    """
    Parameters:
        model (tuple or list) with elements (in order):
            A (numpy.array N x N): state transition matrix
            B (numpy.array N x M): state-observation emission probability matrix
            pi (numpy.array N): state initialization probabilities
        T: sequence length
        
    Returns:
        Q: state sequence simulation
        O: observation sequence simulation
    """
    
    A, B, pi = model

    Q = []
    O = []
    
    # Initialization
    q = np.random.choice(np.arange(N), p=pi)
    Q.append(q)
    O.append(np.random.choice(np.arange(M), p=B[q]))

    while len(O) < T:
        q = np.random.choice(np.arange(N), p=A[q])
        Q.append(q)
        O.append(np.random.choice(np.arange(M), p=B[q]))
        
    return Q, O

## Problem #1: Likelihood Evaluation

In [4]:
def forward(O, model):
    """
    Implements the forward piece of the Forward-Backward algorithm
    
    Parameters:
        A, B, pi = model
        O (list): observation sequence of length T
    
    Returns:
        np.array (N x T)
    """
    
    A, B, pi = model

    N, _ = B.shape
    T = len(O)
    
    # Keep track within matrix
    forw = np.zeros((N, T))
    forw[:,0] = B[:,O[0]] * pi # Initial step

    # Induction
    for t in np.arange(1, T):

        forw[:,t] = A.T.dot(forw[:,t-1]) * B[:,O[t]]
        
    return forw

def likelihood(O, model):
    """
    Computes the likelihood of an observation sequence given the model
    """
    return forward(O, model)[:,-1].sum()

In [18]:
# Model initialization
N = 4
M = 3
T = 10
model = initialize_random_model(N, M)

# Simulate a random sequence
Q, O = simulate(model, T)

print(O)

[2, 1, 2, 2, 0, 1, 0, 1, 2, 2]


In [19]:
forward_matrix = forward(O, model)

print(forward_matrix)

[[5.95223439e-02 4.56452364e-02 1.15566492e-02 5.32089224e-03
  1.17636598e-03 3.82518234e-04 6.71270902e-05 2.81288816e-05
  8.76442383e-06 4.30961141e-06]
 [2.17730493e-01 5.43944193e-02 2.86217312e-02 1.12055208e-02
  9.09792677e-05 4.75565303e-04 5.74507346e-06 3.26645920e-05
  2.11619694e-05 8.98756824e-06]
 [4.42338580e-02 4.40383686e-02 1.51091919e-02 5.28884243e-03
  9.21576299e-04 9.15464987e-04 1.00921680e-04 7.38808816e-05
  1.75868438e-05 5.19921345e-06]
 [1.02527183e-01 3.78549928e-02 1.66437558e-02 6.69353497e-03
  2.53785243e-03 3.66881829e-04 1.81087478e-04 2.72965877e-05
  1.42518912e-05 5.64139175e-06]]


In [20]:
# Likelihood estimate
print(likelihood(O, model))

2.4137784848116605e-05


## Problem #2: Most "correct" state sequence determination

In [27]:
def viterbi(O, model):
    """
    Implements the Viterbi algorithm
    
    Parameters:
        A, B, pi = model
        O (list): observation sequence of length T
    
    Returns:
        list of length T: most likely state sequence
    """
    
    A, B, pi = model
    N, _ = B.shape
    T = len(O)
    
    # Initialization
    ds = np.zeros((N, T))
    phis = np.zeros((N, T))
    ds[:,0] = B[:,O[0]] * pi

    for t in np.arange(1, T):
        delta = A.T * ds[:,t-1]
        ds[:,t] = np.max(delta, axis=1) * B[:,O[t]]
        phis[:,t] = np.argmax(delta, axis=1)

    qs = []
    s = np.argmax(ds[:,-1])
    qs.append(s)

    # Backtracking
    for i in np.arange(1, T):
        s = phis[int(s), -i]
        qs.append(int(s))

    # Reverse the list
    return qs[-1::-1]

def state_correctness(Q, O, model):
    """Evaluate likelihood of state seq Q given observation sequence O and model"""
    
    A, B, pi = model
    
    # Initialization
    lik = pi[Q[0]] * B[Q[0], O[0]]
    
    for qp, q, o in zip(Q[:-1], Q[1:], O[1:]):
        lik *= A[qp, q] * B[q, o]
        
    return lik

In [22]:
# True state sequence 
print(Q)

[0, 2, 2, 1, 0, 2, 2, 2, 2, 3]


In [23]:
# Estimate state sequence
Qhat = viterbi(O, model)

print(Qhat)

[1, 0, 1, 1, 3, 2, 2, 2, 2, 2]


In [31]:
# Likelihood of the true sequence
print(state_correctness(Q, O, model))
# Likelihood of the estimated sequence
print(state_correctness(Qhat, O, model))

3.011425723009036e-10
1.7856632367010275e-08


## Problem #3: Model training

In [52]:
def backward(O, model):
    """
    Implements the backward piece of the Forward-Backward algorithm
    
    Parameters:
        A, B, pi = model
        O (list): observation sequence of length T
    
    Returns:
        np.array (N x T)
    """
    
    A, B, pi = model
    N, _ = B.shape
    T = len(O)
    
    backw = np.zeros((N, T))
    backw[:,-1] = 1

    # Induction
    for t in np.arange(T-2, -1, -1):
        backw[:,t] = A.dot(backw[:,t+1]*B[:,O[t+1]])
        
    return backw

def build_eps(model_t, O):
    """
    Epsilon tensor computation (N, N, T)
    """

    A_t, B_t, pi_t = model_t
    N, M  = B_t.shape
    T = len(O)

    # Forward
    forw = forward(O, model_t)

    # Backward procedure
    backw = backward(O, model_t)

    # Compute epsilon values as a 3D tensor
    eps = np.zeros((N, N, T-1))

    for t in range(T-1):
        for i in range(N):
            for j in range(N):
                eps[i, j, t] = forw[i, t] * A_t[i, j] * B_t[j, O[t+1]] * backw[j, t+1]

    # Standardize
    eps /= eps.sum(axis=(0, 1))
    
    return eps

In [32]:
# Data generation
n = 5000
min_obs_size = 5
max_obs_size = 12

data = []

for _ in range(n):
    Q, O = simulate(model, T=np.random.randint(min_obs_size, max_obs_size))
    data.append(O)

In [33]:
# Random initialization of model parameters

A_o, B_o, pi_o = initialize_random_model(N, M)
model_o = (A_o, B_o, pi_o)

In [35]:
# Better initializing for B

# Compute the most likely state sequence based on the original data
Qs = [viterbi(O, model_o) for O in data]

B_n = np.zeros((N, M))
for O, Q in zip(data, Qs):
    for o, q in zip(O, Q):
        B_n[q, o] += 1

B_n = (B_n.T / B_n.sum(axis=1)).T
B_n = np.maximum(B_n, 0.06)
B_n = (B_n.T / B_n.sum(axis=1)).T

In [38]:
# Random
print(B_o)

[[0.40365756 0.36125001 0.23509244]
 [0.41377374 0.17820069 0.40802557]
 [0.54276851 0.18252476 0.27470673]
 [0.3693256  0.22808768 0.40258671]]


In [39]:
# Better
print(B_n)

[[0.14226962 0.77024955 0.08748083]
 [0.13955184 0.27260674 0.58784142]
 [0.39424676 0.40056535 0.20518789]
 [0.05568258 0.05568258 0.88863484]]


In [44]:
# Final initalized parameters
A_i = A_o.copy()
B_i = B_n.copy() # Use the new initalized value
pi_i = pi_o.copy()

model_i = (A_i, B_i, pi_i)

# Initial log-likelihood of the data
initial_loglik = np.sum([np.log(likelihood(O, model_i)) for O in data])
print(initial_loglik)

-41770.73016975706


In [53]:
# Training iterations
iterations = 30

for _ in range(iterations):

    # Build all eps
    all_eps = [build_eps(model_i, O) for O in data]
    
    # Pi update
    pi_i = np.vstack([eps[:,:,0].sum(axis=1) for eps in all_eps]).sum(axis=0) 
    pi_i /= pi_i.sum()
    
    # Transition matrix update
    nb_transitions_from_state_i = np.vstack([eps.sum(axis=(1, 2)) for eps in all_eps]).sum(axis=0)

    nb_transitions = np.zeros((N, N)) 
    for eps in all_eps:
        nb_transitions += eps.sum(axis=2)

    A_i = (nb_transitions.T / nb_transitions_from_state_i).T
    
    # Emission matrix update
    B_i = np.zeros((N, M))
    for O, eps in zip(data, all_eps):
        for k in np.arange(M):
            B_i[:,k] += eps[:,:,np.array(O[:-1]) == k].sum(axis=(1, 2))
    B_i = (B_i.T / nb_transitions_from_state_i).T
    
    # Compilation and evaluation of the updated model
    model_i = (A_i, B_i, pi_i)
    new_loglik = np.sum([np.log(likelihood(O, model_i)) for O in data])
    print(new_loglik)

-41342.09633499189
-41320.27346469552
-41304.149009329296
-41291.950530236965
-41282.51889705635
-41275.087261240995
-41269.13436569442
-41264.297300908365
-41260.31786623348
-41257.00864055539
-41254.23101292127
-41251.88066624989
-41249.87780809753
-41248.160482576895
-41246.67991652871
-41245.397230229966
-41244.281077283595
-41243.30592641644
-41242.45079293767
-41241.69828944669
-41241.033906164885
-41240.44545852837
-41239.922658109426
-41239.45677556924
-41239.040373078955
-41238.667089776274
-41238.33146815471
-41238.02881237636
-41237.75507174025
-41237.50674416621
