In [17]:
import numpy as np

In [18]:
class HHM():
    def __init__(self, obs):
        # number of states
        self.num_states = 3

        # number of observations
        self.num_obs = 3

        # probability of initial states
        self.initial_prob = np.array([1/3, 1/3, 1/3])

        # array of state transition probabilities
        # e.g. 0.3 at row 1 col 2 means than the probability of 
        # transitioning from state 2 to state 3 is 0.3
        self.state_trans = np.array([[0.2, 0.3, 0.5],
                                     [0.2, 0.7, 0.1],
                                     [0.4, 0.2, 0.4]])

        # array of observation probabilities
        # e.g. 0.1 at row 1 col 2 means than the probability of 
        # observing state 1 when the actual state is 2 is 0.1
        # i.e. self.obs_probs[i, j] = p (z_i , x_j)
        self.obs_probs = np.array([[0.6, 0.1, 0.2],
                                   [0.1, 0.7, 0.3],
                                   [0.3, 0.2, 0.5]])

        # the actual observation sequence
        self.obs = obs

        # array of alphas
        self.alphas = np.zeros([self.num_obs, self.num_states])

        # array of betas
        self.betas = np.zeros([self.num_obs, self.num_states])

        # array of deltas
        self.deltas = np.zeros([self.num_obs, self.num_states])

        # array of pres for Viterbi algorithm
        self.pres = np.zeros([self.num_obs, self.num_states])

    #### FILTERING ####

    # get alphas of time t, need to be called in order
    def get_alpha(self, t):
        if t == 0:
            for i in range(self.num_states):
                self.alphas[0, i] = self.obs_probs[self.obs[0], i] * self.initial_prob[i]
        elif t < self.num_obs:
            for i in range(self.num_states):
                self.alphas[t, i] = self.obs_probs[self.obs[t], i]\
                    * np.dot(self.state_trans[:, t-1], self.alphas[t-1])
        else:
            print(f"Error: there are only {self.num_obs} observations")

    # get all alphas
    def get_alphas(self):
        for t in range(self.num_obs):
            self.get_alpha(t)

    # get results of filtering
    def filter(self):
        self.get_alphas()
        filter_res = np.zeros([self.num_obs, self.num_states])
        for t in range(self.num_obs):
            for state in range(self.num_states):
                filter_res[t, state] = self.alphas[t, state] / np.sum(self.alphas[t])
        return filter_res
        
    # get MAP estimate with filtering
    def MAP_filter(self):
        filter_res = self.filter()
        res = np.argmax(filter_res, axis=-1) + 1
        return res
    
    #### SMOOTHING ####
    
    # get beta of time t, need to be called in backward order
    def get_beta(self, t):
        if t == self.num_obs - 1:
            for i in range(self.num_states):
                self.betas[t, i] = 1.0 #self.obs_probs[self.obs[t], i] * 1.0
        elif t >= 0:
            for i in range(self.num_states):
                for x in range(self.num_states):
                    self.betas[t, i] += self.obs_probs[self.obs[t+1], x]\
                    * self.state_trans[x, i] * self.betas[t+1, x]
        else:
            print("Error: observation index cannot be negative")

    # get all betas
    def get_betas(self):
        for t in range(self.num_obs - 1, -1, -1):
            self.get_beta(t)

    # get results of smoothing
    def smoothing(self):
        self.get_alphas()
        self.get_betas()
        smoothing_res = np.zeros([self.num_obs, self.num_states])
        for t in range(self.num_obs):
            for state in range(self.num_states):
                smoothing_res[t, state] = self.alphas[t, state] * self.betas[t, state] / np.dot(self.alphas[t], self.betas[t])
        return smoothing_res
        
    # get MAP estimate with smoothing
    def MAP_smoothing(self):
        smoothing_res = self.smoothing()
        res = np.argmax(smoothing_res, axis=-1) + 1
        return res

    #### VITERBI ####

    def get_delta_and_pre(self, t):
        if t == 0:
            for i in range(self.num_states):
                self.deltas[0, i] = self.obs_probs[self.obs[0], i] * self.initial_prob[i]
                self.pres[0, i] = None
        elif t < self.num_obs:
            for i in range(self.num_states):
                self.deltas[t, i] = self.obs_probs[self.obs[t], i]\
                    * np.max(self.state_trans[:, i] * self.deltas[t-1])
                self.pres[t, i] = np.argmax(self.state_trans[:, i] * self.deltas[t-1])
        else:
            print(f"Error: there are only {self.num_obs} observations")

    # get all deltas and pres
    def get_deltas_and_pres(self):
        for t in range(self.num_obs):
            self.get_delta_and_pre(t)

    # get results of Viterbi algorithm
    def Viterbi(self):
        res = np.zeros(self.num_obs)
        self.get_deltas_and_pres()

        # get most likely terminal state
        res[-1] = np.argmax(self.deltas[-1])

        # backtracking
        for t in range(self.num_obs - 1 - 1, -1, -1):
            res[t] = self.pres[t + 1, int(res[t + 1])]
        return res + 1


In [22]:
hhm = HHM(np.array([2, 3, 1]) - 1)
print(hhm.filter())
print(hhm.MAP_filter())

[[0.09090909 0.63636364 0.27272727]
 [0.3        0.2        0.5       ]
 [0.66666667 0.11111111 0.22222222]]
[2 3 1]


In [24]:
print(hhm.smoothing())
print(hhm.MAP_smoothing())

[[0.0894691  0.60557006 0.30496084]
 [0.20689655 0.18181818 0.61128527]
 [0.66666667 0.11111111 0.22222222]]
[2 3 1]


In [25]:
print(hhm.Viterbi())

[3. 3. 1.]
