# Import the necessary libaries

In [120]:
import numpy as np

# Define the given parameters

In [121]:
A = np.array([
    [0.6, 0.05, 0.1, 0.25],
    [0.1, 0.05, 0.7, 0.15],
    [0.05, 0.7, 0.1, 0.15],
    [0.1, 0.1, 0.6, 0.2]
]) #Transition matrix between states
B = np.array([
    [0.8, 0.2, 0.1, 0.5],
    [0.1, 0.7, 0.7, 0.3],
    [0.1, 0.1, 0.2, 0.2]
]).T #We transpose B to be consistent with Dr.Kaminski's lectures
pi = np.array([0.3, 0.1, 0.4, 0.2]) #The prior distribution over the ponds
T = 1000 #Our L

# Implementation
For each algorithm, we implement two variations. One variation is without taking the $\log$. It is implemented with matrix multiplication, when possible, for efficiency. The second variation is by taking the $\log$ of the variables. We have found the function **np.logaddexp.reduce** to be very useful.

In [122]:
class HMM:
    def __init__(self, T, A, B, pi):
        self.T = T
        self.A = A
        self.B = B
        self.pi = pi
        self.log_B = np.log(self.B)
        self.log_A = np.log(self.A)
        self.log_pi = np.log(pi)
        self.N = A.shape[0] # N is the number of possible states
        self.M = B.shape[1] # M is the number of possible observations

    def SampleFishFromPond(self, pond):
        fish = np.random.choice(self.M, p = self.B[pond])
        return fish


    def CatchNextFish(self, curr_pond):
        new_pond = np.random.choice(self.N, p = self.A[curr_pond])
        fish = self.SampleFishFromPond(new_pond)
        return new_pond, fish

    def SimulateSystem(self):
        ponds = np.empty(self.T, dtype = np.int32)
        fishes = np.empty(self.T, dtype = np.int32)
        ponds[0] = np.random.choice(4, p = self.pi)
        fishes[0] = self.SampleFishFromPond(ponds[0])
        for i in range(1, self.T):
            new_pond, new_fish = self.CatchNextFish(ponds[i - 1])
            ponds[i] = new_pond
            fishes[i] = new_fish
        self.ponds = ponds
        self.fishes = fishes
        return ponds, fishes
    
    def Forward(self):
        # Remember that the forward algorithm only has accsses to the fishes
        alpha = np.empty((self.T, self.N))
        alpha[0] = pi * B[:, self.fishes[0]]
        for t in range(1, self.T):
            alpha[t] = np.dot(self.A.T, alpha[t - 1]) * B[:, self.fishes[t]]
        return np.sum(alpha[-1])
    
    def Forward2(self):
        log_alpha = np.empty((self.T, self.N))
        log_alpha[0] = self.log_pi + self.log_B[:, self.fishes[0]]
        for t in range(1, self.T):
            for j in range(self.N):
                log_alpha[t, j] = np.logaddexp.reduce(log_alpha[t - 1] + self.log_A[:, j]) + self.log_B[j, self.fishes[t]]
        log_prob = np.logaddexp.reduce(log_alpha[-1])
        return np.exp(log_prob)
    
    def Backward(self):
        beta = np.ones((self.T, self.N))
        for t in range(self.T - 2, -1, -1):
            beta[t] = np.dot(self.A, beta[t + 1] * B[:, self.fishes[t + 1]]) 
        return np.sum(beta[0] * B[:, self.fishes[0]] * self.pi)
    
    def Backward2(self):
        log_beta = np.zeros((self.T, self.N))
        for t in range(self.T - 2, -1, -1):
            for i in range(self.N):
                log_beta[t, i] = np.logaddexp.reduce(self.log_A[i] + self.log_B[:, self.fishes[t + 1]] + log_beta[t + 1])
        log_prob = np.logaddexp.reduce(log_beta[0] + self.log_B[:, self.fishes[0]] + self.log_pi)
        return np.exp(log_prob)
    
    def Viterbi(self):
        psi = np.zeros((self.T, self.N), dtype = np.int32)
        delta = np.empty_like(psi, dtype = np.float32)
        delta[0] = self.pi * self.B[:, self.fishes[0]]
        for t in range(1, self.T):
            A_T_times_delta = self.A.T * delta[t - 1] # Hadamar product between matrix and array
            delta[t] = np.max(A_T_times_delta, axis = 1) * self.B[:, self.fishes[t]]
            psi[t] = np.argmax(A_T_times_delta, axis = 1)
        P = np.max(delta[-1])
        q = np.empty(self.T, dtype = np.int32)
        q[-1] = np.argmax(psi[-1])
        for t in range(self.T - 2, -1, -1):
            q[t] = psi[t + 1, q[t + 1]] 
        return P, q
    
    def Viterbi2(self):
        psi = np.zeros((self.T, self.N), dtype = np.int32)
        log_delta = np.empty_like(psi, dtype = np.float32)
        log_delta[0] = self.log_pi + self.log_B[:, self.fishes[0]]
        for t in range(1, self.T):
            A_T_times_delta = self.log_A.T + log_delta[t - 1] # Hadamard product between matrix and array
            log_delta[t] = np.max(A_T_times_delta, axis = 1) + self.log_B[:, self.fishes[t]]
            psi[t] = np.argmax(A_T_times_delta, axis = 1)
        log_P = np.max(log_delta[-1])
        P = np.exp(log_P)
        q = np.empty(self.T, dtype = np.int32)
        q[-1] = np.argmax(psi[-1])
        for t in range(self.T - 2, -1, -1):
            q[t] = psi[t + 1, q[t + 1]] 
        return P, q
    
    def CompareAgainstFishSequence(self, q):
        matches = np.sum(q == self.ponds)
        accuracy = matches / self.T
        print("Number of matches: ", matches)
        print("Accuracy of the algorithm: ", accuracy)

In [123]:
def TestSystem():
    hmm = HMM(T, A, B, pi)
    ponds, fishes = hmm.SimulateSystem()
    print("Probability produced by forward procedure: ", hmm.Forward())
    print("Probability produced by forward procedure (log variation): ", hmm.Forward2())
    print("Probability produced by backward procedure: ", hmm.Backward())
    print("Probability produced by backward procedure (log variation): ", hmm.Backward2())
    P1, q1 = hmm.Viterbi()
    P2, q2 = hmm.Viterbi2()
    print("Probability produced by Viterbi: ", P1)
    print("Probability produced by Viterbi (log variation): ", P2)
    print("Comparing the actual sequence of states with the ones estimated by Viterbi:")
    hmm.CompareAgainstFishSequence(q1)
    print("Comparing the actual sequence of states with the ones estimated by Viterbi (log variation):")
    hmm.CompareAgainstFishSequence(q2)

# Simulation
It is evident that the Viterbi algorithm performs much better when taking the $\log$.

In [124]:
TestSystem()

Probability produced by forward procedure:  0.0
Probability produced by forward procedure (log variation):  0.0
Probability produced by backward procedure:  0.0
Probability produced by backward procedure (log variation):  0.0
Probability produced by Viterbi:  0.0
Probability produced by Viterbi (log variation):  0.0
Comparing the actual sequence of states with the ones estimated by Viterbi:
Number of matches:  187
Accuracy of the algorithm:  0.187
Comparing the actual sequence of states with the ones estimated by Viterbi (log variation):
Number of matches:  442
Accuracy of the algorithm:  0.442


To compare the differences between forward and backward, we take $L = 700$. It seems that there is no major difference between the backward and forward procedures, even when taking log, as the two variations still return the same non-zero probability. 

In [125]:
T = 700
TestSystem()

Probability produced by forward procedure:  1.368338147224397e-287
Probability produced by forward procedure (log variation):  1.3683381472198267e-287
Probability produced by backward procedure:  1.3683381472243932e-287
Probability produced by backward procedure (log variation):  1.3683381472223156e-287
Probability produced by Viterbi:  0.0
Probability produced by Viterbi (log variation):  0.0
Comparing the actual sequence of states with the ones estimated by Viterbi:
Number of matches:  129
Accuracy of the algorithm:  0.18428571428571427
Comparing the actual sequence of states with the ones estimated by Viterbi (log variation):
Number of matches:  348
Accuracy of the algorithm:  0.49714285714285716
