In [1]:
import numpy as np

In [2]:
def construct_diag(Y, stateX):
    diag = np.identity(len(Y))
    return diag * Y[:, stateX]

def normalize(vector):
    return vector / np.sum(vector, axis = 0)

In [3]:
class HMM():
    def __init__(self, T, B, obs, pi):
        """
        discrete state HMM. 
        assume that the states are given as integers, i.e. state1 = 0, state2 = 1 etc.
        --> use states as indices
        
        T specifying the transition matrix T of the hidden markov chain (x-variables), 
        B specifying the transition matrix B from hidden state to observations (emission matrix)
        obs specifying a sequence of observations
        
        """
        self.T = T # transition matrix
        self.B = B #  emission matrix
        self.o = obs # sequence of integers
        self.pi = pi # probability vector, containing the probs of the initial (hidden) state
        
    def alpha(self, t):
        """
        forward-pass
        """
        ############
        # wikipedia style: see 
        # https://en.wikipedia.org/wiki/Forward%E2%80%93backward_algorithm#RussellNorvig10
        if t > -1: ##>0
            return normalize(construct_diag(self.B, self.o[t]) @ self.T.T @ self.alpha(t - 1))
        else:
            return normalize(construct_diag(self.B, self.o[0]) @ self.T.T @ self.pi)
            

    def beta(self, t):
        """
        backward-pass
        """
        ############
        # wikipedia style: see 
        # https://en.wikipedia.org/wiki/Forward%E2%80%93backward_algorithm#RussellNorvig10
        T = len(self.o) 
        if t < T:
            return normalize(self.T @ construct_diag(self.B, self.o[t]) @ self.beta(t+1))
        else:
            return np.ones(len(self.B[:, 0]))

    def forward_backward(self):
        probs = np.zeros((len(self.o), len(self.T)))
        for j in range(-1, len(probs)):
            probs[j] = self.alpha(j) * self.beta(j+1) / np.sum(self.alpha(j) * self.beta(j+1))
        return probs, np.argmax(probs, axis = 1)
     

    def viterbi(self):
        T = len(self.o)
        K = self.T.shape[0]
        T1 = np.zeros((K, T))
        T2 = np.zeros((K, T)) # holds the backpointers
        T1[:, 0] = self.pi*self.B[:,self.o[0]] #initialization
        for t in range(1, T):
            s_temp = np.zeros((K, K))
            for j in range(K):
                s_temp[j]  = T1[:, t-1] * self.T[:,j] * self.B[j, self.o[t]]
                # compute the temporary rows from which the maximum is chosen, i.e. from which previous state
                # the according probability was computed
            T1[:, t] = np.max(s_temp, axis = 1) 
            # pick the max. prob out of all possible prev. states (for all states simultanously) 
            T2[:, t] = np.argmax(s_temp, axis = 1) 
            #from which state the according max. prob. was computed (for all states simult.)
        
        # do the backtracing
        states = np.zeros(T, dtype = np.int32)
        states[-1] = np.argmax(T1[:, -1])
        for t in range(T-2, -1, -1): # exclude T-1 (last index)
            states[t] = T2[states[t+1], t+1]
        return states

## Application: Chimpanzee

In [4]:
A = np.ones((2, 2)) * 0.5
B = np.array([[0.5, 0.4, 0.1], [0.4, 0.1, 0.5]])
o = [0,1,1,0,2,0,2,2,2,0,2,2,2,2,2,0,0,1,1,2]

hmm = HMM(A, B, o, np.array([0.5, 0.5]))
probs, states = hmm.forward_backward()
print('Fwd/Bwd encoded state sequence: ', states)
print('Viterbi encoded state sequence: ', hmm.viterbi())

Fwd/Bwd encoded state sequence:  [0 0 0 0 1 0 1 1 1 0 1 1 1 1 1 0 0 0 0 1]
Viterbi encoded state sequence:  [0 0 0 0 1 0 1 1 1 0 1 1 1 1 1 0 0 0 0 1]


Forward-backward and Viterbi algorithm yield the same sequence of hidden states (due to symmetry of transition matrix?). They do not if the transition matrix is not symmetric:

In [5]:
# again the chimpanzee with another transition matrix
A = np.array([[0.99, 0.01], [0.3, 0.7]])
B = np.array([[0.5, 0.4, 0.1], [0.4, 0.1, 0.5]])
o = [2,1,1,0,2,0,2,2,2,0,2,2,2,2,2,0,0,1,1,2]

hmm = HMM(A, B, o, np.array([0.5, 0.5]))
probs, states = hmm.forward_backward()
print('Fwd/Bwd encoded state sequence: ', states)
print('Viterbi encoded state sequence: ', hmm.viterbi())
#print('via backpointer so very nice  : ', hmm.viterbi_bp())

Fwd/Bwd encoded state sequence:  [1 1 0 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0]
Viterbi encoded state sequence:  [1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0]


#### The wiki example

see https://en.wikipedia.org/wiki/Forward%E2%80%93backward_algorithm

In [7]:
A = np.array([[0.7, 0.3], [0.3, 0.7]])
B = np.array([[0.9, 0.1], [0.2, 0.8]])
o = [0, 0, 1, 0, 0]

#for obs in o:
#    print(construct_diag(B, obs))
pi = np.array([-0.90277778,  3.95833333])
hmm_wiki = HMM(A, B, o, pi)
print('F/B: ', hmm_wiki.forward_backward()[1])
print('Vit: ', hmm_wiki.viterbi())
print(hmm_wiki.forward_backward()[0])

F/B:  [0 0 1 0 0]
Vit:  [1 1 1 0 0]
[[ 0.86733889  0.13266111]
 [ 0.82041905  0.17958095]
 [ 0.30748358  0.69251642]
 [ 0.82041905  0.17958095]
 [ 0.86733889  0.13266111]]
