In [198]:
import os
import sys
import numpy as np
from random import randint

class HMM(object): 
    # base class for different HMM models
    def __init__(self, M, N):
        # model is (A, B, pi) where A = Transition probs(states*states), 
        #B = Emission Probs(states*obs), pi = initial distribution(states)        
        # a model can be initialized to random parameters using a json file 
        #that has a random params model
        # M:number of states of the model
        # N:number of hidden states of the model
        self.A_raw = np.random.random((M, M)) 
        self.row_sums_A = self.A_raw.sum(axis=1)
        self.A = self.A_raw / self.row_sums_A[:, np.newaxis]
        # Get transition probability
        self.B_raw = np.random.random((M, N))
        self.row_sums_B = self.B_raw.sum(axis=1)
        self.B = self.B_raw / self.row_sums_B[:, np.newaxis]
        # Get emission probability
        self.pi_raw = np.random.random((1, M)) 
        self.row_sums_pi = self.pi_raw.sum(axis=1)
        self.pi = self.pi_raw / self.row_sums_pi[:, np.newaxis]
        # Get initial probability
        self.M = M
        self.N = N
    
    def backward(self, obs):
        # This function is for backward algorithm, suppose that A, B, pi 
        #are given, and it calculates a bwk matrix (obs*states).
        # The backward algorithm can be used to calculate the likelihood 
        #of the probability P(Y_{k+1}, ... , Y_n|t_k=C)
        #=sum_q P(Y_{k+2}, ... , Y_n|t_{k+1}=q)P(q|C)P(x_{k+1}|q)
        #The backward probability b is the probability of seeing the observations from 
        #time t + 1 to the end, given that we are in state i at time t
        self.bwk = [[0 for x in range(self.M)] for y in range(len(obs))] 
        #Initalize bwk to be empty matrix T*M
        # Initialize base cases (t == T)
        for y in range(self.M):
            self.bwk[len(obs)-1][y] = 1 
            #self.A[y]["Final"] #self.pi[y] * self.B[y][obs[0]]
        for t in reversed(range(len(obs)-1)):
            for y in range(self.M):
                self.bwk[t][y] = sum((self.bwk[t+1][y1] * self.A[y][y1] * self.B[y1][obs[t+1]]) 
                                    for y1 in range(self.M))
                #beta_k(C)=\sum_q beta_{k+1}(q)P(q|C)P(w_{k+1}|q)
        prob = sum((self.pi[0][y]* self.B[y][obs[0]] * self.bwk[0][y]) for y in range(self.M))

        return prob 
        #This prob is the likelihood of the input obs   
    
    def forward(self, obs):
        # This function is for forward algorithm, suppose that A, B, pi are given, 
        #and it calculates a fwd matrix (obs*states).
        # The forward algorithm can be used to calculate the likelihood of the model
        #P(Y1, ... , Yn)=sum_t(\prod_i P(Y[i]|t[i])P(t[i]|t[i-1])
        self.fwd = [[0 for x in range(self.M)] for y in range(len(obs))]  
        #Initalize fwk to be empty matrix, and finally fwd is N*T
        # Initialize base cases (t == 0)
        for y in range(self.M):
            self.fwd[0][y] = self.pi[0][y] * self.B[y][obs[0]] 
            #alpha_1(q)=p(w1,t1=q)=P(t1=q|t0)*p(w1|t1=q)
        # Run Forward algorithm for t > 0
        for t in range(1, len(obs)):
            #self.fwd.append({})
            for y in range(self.M):
                self.fwd[t][y] = sum((self.fwd[t-1][y0] * self.A[y0][y] * self.B[y][obs[t]]) 
                                     for y0 in range(self.M))
                #alpha_k(q)=\sum_q1 alpha_{k-1}(q1)P(t_k=q|t_{k-1}=q1)P(w_k|t+k=q)
        prob = sum((self.fwd[len(obs) - 1][s]) for s in range(self.M))
        # The likelihood of input equals to the summation of fwd[N][t]
        return prob

    def viterbi(self, obs):
    #the task of determining which sequence of variables is the underlying source 
    #of some sequence of observations is called the decoding task
    #Decoding: Given as input an HMM = (A, B, pi) and a sequence of observations 
    #O = Y_1, ... Y_N, find the most probable sequence of states Q = X_1, ... X_T.
    # Goal: find the best path!
    # argmax_t P(Y1, ... Y_N, X_1, ..., X_T|A, B, pi)
        vit = [[0 for x in range(self.M)] for y in range(len(obs))] 
        # matrix
        path = {} 
        # path
        # Initialize base cases (t == 0)
        for y in range(self.M):
            vit[0][y] = self.pi[0][y] * self.B[y][obs[0]]
            path[y] = [y]
        
        # Run Viterbi for t > 0
        for t in range(1, len(obs)):
            #vit.append({})
            newpath = {}
            for y in range(self.M):
                (prob, state) = max((vit[t-1][y0] * self.A[y0][y] * self.B[y][obs[t]], y0) 
                                    for y0 in range(self.M))
                vit[t][y] = prob
                newpath[y] = path[state] + [y]
            # Don't need to remember the old paths
            path = newpath
        n = 0           
        # if only one element is observed max is sought in the initialization values
        if len(obs)!=1:
            n = t
        (prob, state) = max((vit[n][y], y) for y in range(self.M))
        return (prob, path[state])

    
    
    def forward_backward(self, obs): 
        # returns model given the initial model and observations
        #forward-backward algorithm is a special case of EM algorithm
        #The Baum-Welch algorithm iteratively estimate the counts. 
        #We will start with an estimate for the transition and observation probabilities and 
        #then use these estimated probabilities to derive better and better probabilities. 
        #We get our estimated probabilities by computing the forward probability for 
        #an observation and then dividing that probability mass among all the different 
        #paths that contributed to this forward probability.
        gamma = [[0 for x in range(self.M)] for y in range(len(obs))]
        # this is needed to keep track of finding a state i at a time t for all i and all t
        zi = [[[0 for x in range(self.M)] for y in range(self.M)] for z in range(len(obs))]  
        # this is needed to keep track of finding a state i at a time t and j at a time (t+1) 
        #for all i and all j and all t
        # get alpha and beta tables computes
        p_obs = self.forward(obs)
        self.backward(obs)
        # compute gamma values
        for t in range(len(obs)):
            for y in range(self.M):
                gamma[t][y] = (self.fwd[t][y] * self.bwk[t][y]) / p_obs
                if t == 0:
                    self.pi[0][y] = gamma[t][y]
                #gamma[t][y]=P(q_t=j|Y_1, ..., Y_N,A,B,pi)
                #=P(q_t=j,Y_1, ..., Y_N|A,B,pi)/P(Y_1, ..., Y_N|A,B,pi)
                #=alpha_t(j)beta_t(j)/P(Y_1, ..., Y_N|A,B,pi)
                #compute zi values up to T - 1
                if t == len(obs) - 1:
                    continue
                #zi[t][y] = {}
                for y1 in range(self.M):
                    zi[t][y][y1] = self.fwd[t][y] * self.A[y][y1] * self.B[y1][obs[t + 1]] * self.bwk[t + 1][y1] / p_obs
        #z[t][i][j]=P(q_t=i,q_{t+1}=j|Y_1, ..., Y_N,A,B,pi)
        #=P(q_t=i,q_{t+1}=j,Y_1, ..., Y_N|A,B,pi)/P(Y_1, ..., Y_N|A,B,pi)
        #=alpha_t(i)a_{ij}b_j(O_{t+1})beta_{t+1}(j)/apha_t(X_T)
        return (gamma,zi)
    
    
    def baum_welch(self,obs):
        gamma = [[0 for x in range(self.M)] for y in range(len(obs))]
        # this is needed to keep track of finding a state i at a time t for all i and all t
        zi = [[[0 for x in range(self.M)] for y in range(self.M)] for z in range(len(obs))]  
        # this is needed to keep track of finding a state i at a time t and j at a time (t+1) 
        #for all i and all j and all t


        # now that we have gamma and zi let us re-estimate
        (gamma,zi)=self.forward_backward(obs)
        for y in range(self.M):
            for y1 in range(self.M):
                # we will now compute new a_ij
                #a_{ij)=expected number of transitions from state i to state j/expected number 
                #of transitions from state i
                val = sum([zi[t][y][y1] for t in range(len(obs) - 1)]) #
                val /= sum([gamma[t][y] for t in range(len(obs) - 1)])
                self.A[y][y1] = val
        # re estimate gamma
        for y in range(self.M):
            for k in range(self.N): 
                # for all symbols vk
                val = 0.0
                for t in range(len(obs)):
                    if obs[t] == k :
                        val += gamma[t][y]
                val /= sum([gamma[t][y] for t in range(len(obs))])
                self.B[y][k] = val
                    #b_j(v_k)=expected number of times in state j and observing symbol vk/expected 
                    #number of times in state j
        return

In [199]:
M=randint(0,10)
N=randint(0,10)
hmm=HMM(M,N)
T=randint(0,10)
observations = []
for i in xrange(0,T):
    observations.append(randint(0,N-1))
#observations=[1,0,1,1]
print "M=", M, "N=", N, "Observations = ", observations

M= 4 N= 7 Observations =  [4, 5, 2, 6, 5, 2, 6, 1]


In [200]:
p1=hmm.backward(observations)
print " Fwd Prob = ", p1

 Fwd Prob =  4.91785348585e-07


In [201]:
p2=hmm.forward(observations)
print " Fwd Prob = ", p2

 Fwd Prob =  4.91785348585e-07


In [202]:
prob, hidden_states = hmm.viterbi(observations)
print "Max Probability = ", prob, " Hidden State Sequence = ", hidden_states

Max Probability =  6.46454266625e-09  Hidden State Sequence =  [3, 1, 3, 0, 1, 3, 0, 1]


In [203]:
(gamma,zi)=hmm.forward_backward(observations)
print "zi= ", zi
print "gamma=", gamma

zi=  [[[0.0084594922431747693, 0.046923410368640291, 0.013112457626554604, 0.0024389435020313252], [0.00015655667542833525, 1.1354798197827114e-05, 0.00012819504328365666, 8.7777029051073636e-05], [0.00030255031108179575, 0.00057546464574836139, 0.00063128722745130932, 0.0001722311156064356], [0.21072661792093059, 0.47911996096633963, 0.22260446465710201, 0.014549235869378136]], [[0.057195533926299905, 0.095633752808221589, 0.033976997626168196, 0.03283893278992582], [0.21475509108690968, 0.0046952109106159455, 0.067394769356110745, 0.23978511942528979], [0.067435760343828602, 0.038664741396807818, 0.053926562490019196, 0.076449340323735987], [0.0077425123406283268, 0.0053065294836810856, 0.0031345807332265824, 0.0010645649585309736]], [[0.12073475735359324, 0.16784493495925504, 0.043960331024113411, 0.014588874360704822], [0.099826405560136416, 0.0018146125821096554, 0.019201445149719883, 0.023457771307360476], [0.07183759785498782, 0.034245499056576013, 0.035210325477599516, 0.017139

In [204]:
hmm.baum_welch(observations)
print "The new model parameters after 1 iteration are: "
print "A = ", hmm.A
print "B = ", hmm.B
print "pi = ", hmm.pi

The new model parameters after 1 iteration are: 
A =  [[ 0.23396321  0.57230734  0.12534276  0.06838669]
 [ 0.47504189  0.01643728  0.15174274  0.35677809]
 [ 0.32035989  0.23401991  0.22740174  0.21821847]
 [ 0.35125796  0.4384997   0.19126879  0.01897355]]
B =  [[  0.00000000e+00   1.28400596e-01   2.90807586e-01   0.00000000e+00
    2.38371856e-03   1.85366568e-01   3.93041532e-01]
 [  0.00000000e+00   1.85034690e-01   1.35622846e-01   0.00000000e+00
    1.93178285e-07   3.99946278e-01   2.79395993e-01]
 [  0.00000000e+00   5.58758389e-02   2.75730401e-01   0.00000000e+00
    9.61418837e-04   4.30527309e-01   2.36905033e-01]
 [  0.00000000e+00   9.28020243e-02   3.15747080e-01   0.00000000e+00
    4.75218071e-01   5.71564105e-02   5.90764140e-02]]
pi =  [[  5.76389497e-03   4.49384068e-07   1.12091722e-03   9.93114738e-01]]
