In [230]:
import numpy as np
import pandas as pd
#from scipy.misc import logsumexp

# Posterior

In [221]:
class HMM_Algorithm(object):
    
    def __init__(self, transition, emission, obs ):
        
        self.trans = transition
        self.emiss = emission
        self.obs = obs
        
        self.alpha_set = pd.DataFrame(list(self.emiss.T[obs[0]]), index = trans.index, columns=[0] )
        self.beta_set  = pd.DataFrame(np.zeros(len(trans)), index = trans.index, columns=[len(obs)-1] )
        self.joint_prob = pd.DataFrame(np.zeros(len(trans)) , index = trans.index, columns=[len(obs)-1] )
        self.total_prob_of_data = None
        
        ### expected count
        self.expected_emiss = pd.DataFrame( -float('inf'), index = self.emiss.index ,columns = self.emiss.columns)
        self.expected_trans = pd.DataFrame( -float('inf'), index = self.trans.index ,columns = self.trans.columns)
    
    def forward(self):
        
        for i in xrange(1,len(obs)) :            
            # log probability (tag,tag)
            log_prob = ((self.trans + self.alpha_set[i-1].T).T + self.emiss.T[obs[i]].T).T 
            self.alpha_set[i] = np.log(np.exp(log_prob.T - log_prob.max(1)).T.sum(1)) + log_prob.max(1)
            
        ### update joint probability
        self.joint_prob[len(obs)-1] += self.alpha_set[len(obs)-1]
        
        ### update total_prob_of_data
        self.total_prob_of_data = np.log(np.exp(self.joint_prob).sum(0)[len(obs)-1])
        
        ### update expected emiss
        log_expected_count_combine = pd.concat([self.expected_emiss.T[self.obs[-1]], self.joint_prob[len(obs)-1]],axis=1).T
        log_count_max = log_expected_count_combine.max(0)
        # logsumexp
        self.expected_emiss.T[obs[-1]] = (
                                          np.log(np.exp(log_expected_count_combine - log_count_max).sum(0)) 
                                          + log_count_max
                                         )
        
        ### update expected trans
        log_expected_count = ( 
                              np.array([self.alpha_set[len(obs)-2],]).T + [self.beta_set[len(obs)-1],] # alpha*beta
                              + self.trans.T # trans prob
                              + self.emiss.T[obs[-1]] # emiss prob
                             ).T
        log_count_max = pd.concat(
                                  [pd.DataFrame(
                                                pd.concat([self.expected_trans[h],log_expected_count[h]],axis = 1).max(1)
                                                , columns=[h]
                                               ) 
                                   for h in self.trans.columns
                                  ]
                                  , axis = 1
                                 )
        self.expected_trans = np.log( np.exp(self.expected_trans - log_count_max) 
                                     +np.exp(log_expected_count - log_count_max )) + log_count_max
        
        
        
    def backward(self):
        i = len(self.obs)-2
        while i >= 0 :     
            # log probability (tag,tag)
            log_prob = (self.trans.T + self.beta_set[i+1].T + self.emiss.T[self.obs[i+1]].T).T
            self.beta_set[i] = np.log(np.exp(log_prob - log_prob.max(0)).sum(0)) + log_prob.max(0)
            
            ### update joint probability
            self.joint_prob[i] = self.alpha_set[i] + self.beta_set[i] - self.total_prob_of_data
            
            ### update expected emiss
            log_expected_count = pd.concat([self.expected_emiss.T[self.obs[i]], self.joint_prob[i]],axis=1).T
            log_count_max = log_expected_count.max(0)       
            # logsumexp
            self.expected_emiss.T[obs[i]] = (
                                              np.log(np.exp(log_expected_count - log_count_max).sum(0)) 
                                              + log_count_max
                                             )

            ### update expected trans
            if i > 0:
                log_expected_count = ( 
                                      np.array([self.alpha_set[i-1],]).T + [self.beta_set[i],] # alpha*beta
                                      + self.trans.T # trans prob
                                      + self.emiss.T[obs[i]] # emiss prob
                                     ).T
                log_count_max = pd.concat(
                                          [pd.DataFrame(
                                                        pd.concat([self.expected_trans[h],log_expected_count[h]],axis = 1).max(1)
                                                        , columns=[h]
                                                       ) 
                                           for h in self.trans.columns
                                          ]
                                          , axis = 1
                                         )
                self.expected_trans = np.log( np.exp(self.expected_trans - log_count_max) 
                                             +np.exp(log_expected_count - log_count_max )) + log_count_max
            
            
            i -= 1
    
        
    def E_step(self):
        self.forward()
        self.backward()
        
        
    def M_step(self):
        # new emission probability
        total = (np.log(np.exp(self.joint_prob.T - self.joint_prob.max(1)).sum(0)) + self.joint_prob.max(1)).T
        self.new_emiss = self.expected_emiss - total
        # new transition probability
        total = np.log(np.exp(self.expected_trans - self.expected_trans.max(0)).sum(0)) + self.expected_trans.max(0)
        self.new_trans = self.expected_trans - total
        

In [222]:
trans = np.log(pd.DataFrame([[0.7,0.3],[0.3,0.7]], index = ["H","C"], columns=["H","C"] ))
emiss = np.log(pd.DataFrame([[0.7,0.1],[0.2,0.2],[0.1,0.7]], index = [1,2,3], columns=["H","C"] ))
obs = [2,1,2,3]

In [229]:
test = HMM_Algorithm( trans,emiss,obs)
test.E_step()
test.M_step()

# Viterbi