In [1]:
"""
HMM
"""

'\nHMM\n'

In [9]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt


In [245]:
def random_pick(somelist,probes):
    val =np.random.uniform(0,1);
    N =len(probes)
    cum =0.0
    for i in range(N):
        cum +=probes[i]
        if(cum>=val):
            return somelist[i]

In [246]:
Q =np.array([0,1,2,3],dtype=int)
V =np.array([0,1],dtype=int)
pi =np.array([0.25,0.25,0.25,0.25],dtype=float)
T =10
A =np.array([[0,1.0,0,0],[0.4,0,0.6,0],[0,0.4,0,0.6],[0,0,0.5,0.5]],dtype=float)
B =np.array([[0.5,0.5],[0.3,0.7],[0.6,0.4],[0.8,0.2]],dtype=float)

In [280]:
class HMM(object):
    def __init__(self,Q=None,V=None,pi=None,A=None,B=None):
        self.Q =Q
        self.V =V
        self.pi =pi
        self.A=A
        self.B=B
    
    def generate_observation(self,N):
        states =np.zeros((N,),dtype=int)
        obs    =np.zeros((N,),dtype=int)
        
        states[0] =random_pick(self.Q,self.pi)
        for i in range(N):
            obs[i] =random_pick(V,self.B[states[i],:])
            if i<N-1:
                states[i+1] =random_pick(Q,self.A[states[i],:])
        return [states,obs]
    
    def probes_forward(self,observations):
        N =len(observations)
        alpha =np.zeros((N,len(self.Q)),dtype=float)
        
        for i in range(len(self.pi)):
            alpha[0,i] =self.pi[i]*self.B[i,observations[0]]
        
        for i in range(len(observations)-1):
            for j in range(len(self.Q)):
                alpha[i+1,j]=self.B[j,observations[i+1]]*np.dot(alpha[i,:],self.A[:,j])
        
        probes =np.sum(alpha[N-1,:])
  
       
        return [alpha,probes]
    
    def probes_backward(self,observations):
        N =len(observations)
        beta =np.zeros((N,len(self.Q)),dtype=float)
        
        for i in range(len(self.Q)):
            beta[N-1,i] =1
        
        for i in range(len(observations)-1):
            for j in range(len(self.Q)):
                temp =np.multiply(self.A[j,:],self.B[:,observations[N-1-i]])
                beta[N-2-i,j]=np.dot(temp,beta[N-1-i,:])
        

        
        temp =np.multiply(self.pi,self.B[:,observations[0]])
        
        probes =np.dot(temp,beta[0,:])
        
        return [beta,probes]
    
    def fit(self,observations):
        """
        Baum-Welch
        """
        N =len(observations)
        alpha =np.zeros((N,len(self.Q)),dtype=float)
        beta  =np.zeros((N,len(self.Q)),dtype=float)
        gamma =np.zeros((N,len(self.Q)),dtype=float)
        epsilon =np.zeros((N,len(self.Q),len(self.Q)),dtype=float)
        
        maxpass =1000
        

        
        
        
        #cannot be flat
            
        A =np.random.rand(len(self.Q),len(self.Q)) 
        B =np.random.rand(len(self.Q),len(self.V))
        pi =np.random.rand(len(self.Q))
        
        A =A/(np.sum(A,axis=1))[:,np.newaxis]
        B =B/(np.sum(B,axis=1))[:,np.newaxis]
        pi =pi /np.sum(pi)
        #print('A ',A)
        #print('B ',B)
        #print('pi ',pi)
        
        states =np.concatenate((A,np.concatenate((B,pi[:,np.newaxis]),axis=1)),axis=1)
        last_states =np.zeros(states.shape,dtype=float)
        
        
        times =0
        
        eps =1e-8
        
                    
        self.A =A
        self.B =B
        self.pi =pi
        
        while(times<maxpass):

            if(np.linalg.norm(states-last_states)<eps):
                break
            times +=1
            last_states =states
            [alpha,_] =self.probes_forward(observations)
            [beta,_] =self.probes_backward(observations)
            
            
            gamma =np.multiply(alpha,beta)
            gamma =gamma/(np.sum(gamma,axis=1)[:,np.newaxis])
            
            for t in range(N-1):
                for i in range(len(self.Q)):
                    for j in range(len(self.Q)):
                        epsilon[t,i,j] =alpha[t,i]*self.A[i,j]*self.B[j,observations[t+1]]*beta[t+1,j]
                epsilon[t,:,:]  =epsilon[t,:,:]/np.sum(epsilon[t,:,:])
            
            #print('gamma ',gamma)
            #print('epsilon ',epsilon)
            
            for i in range(len(self.Q)):
                for j in range(len(self.Q)):
                    self.A[i,j] =np.sum(epsilon[:-1,i,j]) /(np.sum(gamma[:-1,i]))
            
            for j in range(len(self.Q)):
                for k in range(len(self.V)):
                    self.B[j,k] = np.dot(gamma[:,j],observations==self.V[k])/np.sum(gamma[:,j])
            
            self.pi =gamma[0,:]
           
            
            states =np.concatenate((self.A,np.concatenate((self.B,self.pi[:,np.newaxis]),axis=1)),axis=1)
        
        
        
    def decoding(self,observations):
        """
        Viterbi
        """
        
        N =len(observations)
        theta =np.zeros((N,len(self.Q)),dtype=float)
        parent =np.zeros((N,len(self.Q)),dtype=float)
        
        for i in range(len(self.Q)):
            theta[0,i] =self.pi[i]*self.B[i,observations[i]]
            parent[0,i]=-1
        max_index =-1
        max_val =-999
        for t in range(N-1):
            for i in range(len(self.Q)):
                max_index =-1
                max_val =-1
                for j in range(len(self.Q)):
                    if(max_val<(theta[t,j]*self.A[j,i]*self.B[i,observations[t+1]])):
                        max_val =theta[t,j]*self.A[j,i]*self.B[i,observations[t+1]]
                        max_index =j
                theta[t+1,i]=max_val
                parent[t+1,i] =max_index
        states =np.zeros((N,),dtype=int)
        states[N-1] =np.argmax(theta[N-1,:])
        for i in range(N-1):
            states[N-2-i] =parent[N-1-i,states[N-1-i]]
        
        #print('theta ',theta)
        #print('parents ',parent)
        return states

            
                
            

            
            
            
            
            
            
            
        
    
    
        
              
            
            
                
            
        
    
        

In [281]:
hmm =HMM(Q=Q,V=V,pi=pi,A=A,B=B)
print('A ',hmm.A)
print('B ',hmm.B)
print('pi ',hmm.pi)
res =hmm.generate_observation(20)
print('states \r\n ',res[0])
print('observations \r\n ',res[1])
observations =res[1]
states      =res[0]

N =40
p =20
data =np.zeros((N,p),dtype=int)
for i in range(N):
    data[i,:] =hmm.generate_observation(20)[1]

A  [[0.  1.  0.  0. ]
 [0.4 0.  0.6 0. ]
 [0.  0.4 0.  0.6]
 [0.  0.  0.5 0.5]]
B  [[0.5 0.5]
 [0.3 0.7]
 [0.6 0.4]
 [0.8 0.2]]
pi  [0.25 0.25 0.25 0.25]
states 
  [0 1 2 1 2 3 2 1 0 1 0 1 0 1 2 1 2 3 2 3]
observations 
  [0 1 1 1 0 0 1 1 1 0 1 1 1 1 1 1 0 0 1 0]


In [282]:

hmm.probes_forward(observations)

[array([[1.25000000e-01, 7.50000000e-02, 1.50000000e-01, 2.00000000e-01],
        [1.50000000e-02, 1.29500000e-01, 5.80000000e-02, 3.80000000e-02],
        [2.59000000e-02, 2.67400000e-02, 3.86800000e-02, 1.07600000e-02],
        [5.34800000e-03, 2.89604000e-02, 8.56960000e-03, 5.71760000e-03],
        [5.79208000e-03, 2.63275200e-03, 1.21410240e-02, 6.40044800e-03],
        [5.26550400e-04, 3.19454688e-03, 2.86792512e-03, 8.38787072e-03],
        [6.38909376e-04, 1.17160431e-03, 2.44426540e-03, 1.18293809e-03],
        [2.34320863e-04, 1.13163087e-03, 5.17772653e-04, 4.11605656e-04],
        [2.26326175e-04, 3.09000947e-04, 3.53912541e-04, 1.03293284e-04],
        [6.18001893e-05, 1.10367357e-04, 1.42228326e-04, 2.11195333e-04],
        [2.20734715e-05, 8.30840638e-05, 6.87272324e-05, 3.81869324e-05],
        [1.66168128e-05, 3.46950551e-05, 2.75775618e-05, 1.20659611e-05],
        [6.93901102e-06, 1.93534862e-05, 1.07400055e-05, 4.51590353e-06],
        [3.87069725e-06, 7.86450924e-0

In [283]:
hmm.probes_backward(observations)

[array([[7.26500447e-07, 8.40728455e-07, 3.70565666e-07, 3.36446134e-07],
        [2.58479291e-06, 1.03785778e-06, 1.34904114e-06, 6.66379057e-07],
        [2.76663446e-06, 3.69256130e-06, 2.01887871e-06, 2.62603315e-06],
        [7.26773216e-06, 3.95233494e-06, 9.32922860e-06, 7.60187435e-06],
        [6.26110731e-06, 2.42257739e-05, 7.50031522e-06, 1.33794495e-05],
        [6.58315341e-05, 2.08703577e-05, 3.07207418e-05, 1.04080673e-05],
        [6.38440256e-05, 9.40450487e-05, 3.37564691e-05, 3.65677346e-05],
        [2.91913131e-04, 9.12057509e-05, 1.48593428e-04, 6.84904905e-05],
        [2.04226688e-04, 4.17018758e-04, 2.09835056e-04, 2.65234794e-04],
        [1.13440693e-03, 6.80755626e-04, 5.28159368e-04, 2.66967460e-04],
        [2.17395635e-03, 1.62058133e-03, 1.02485148e-03, 6.19971639e-04],
        [5.15942147e-03, 3.10565194e-03, 2.45290430e-03, 1.29390780e-03],
        [9.71049139e-03, 7.37060211e-03, 4.84814024e-03, 3.24279754e-03],
        [2.22159437e-02, 1.38721306e-0

In [284]:

#hmm.fit(observations)

In [285]:
print('A ',hmm.A)
print('B ',hmm.B)
print('pi ',hmm.pi)

A  [[0.  1.  0.  0. ]
 [0.4 0.  0.6 0. ]
 [0.  0.4 0.  0.6]
 [0.  0.  0.5 0.5]]
B  [[0.5 0.5]
 [0.3 0.7]
 [0.6 0.4]
 [0.8 0.2]]
pi  [0.25 0.25 0.25 0.25]


In [286]:
hmm.decoding(observations)

theta  [[1.25000000e-01 1.75000000e-01 1.00000000e-01 5.00000000e-02]
 [3.50000000e-02 8.75000000e-02 4.20000000e-02 1.20000000e-02]
 [1.75000000e-02 2.45000000e-02 2.10000000e-02 5.04000000e-03]
 [4.90000000e-03 1.22500000e-02 5.88000000e-03 2.52000000e-03]
 [2.45000000e-03 1.47000000e-03 4.41000000e-03 2.82240000e-03]
 [2.94000000e-04 7.35000000e-04 8.46720000e-04 2.11680000e-03]
 [1.47000000e-04 2.37081600e-04 4.23360000e-04 2.11680000e-04]
 [4.74163200e-05 1.18540800e-04 5.68995840e-05 5.08032000e-05]
 [2.37081600e-05 3.31914240e-05 2.84497920e-05 6.82795008e-06]
 [6.63828480e-06 7.11244800e-06 1.19489126e-05 1.36559002e-05]
 [1.42248960e-06 4.64679936e-06 2.73118003e-06 1.43386952e-06]
 [9.29359872e-07 9.95742720e-07 1.11523185e-06 3.27741604e-07]
 [1.99148544e-07 6.50551910e-07 2.38978253e-07 1.33827822e-07]
 [1.30110382e-07 1.39403981e-07 1.56132458e-07 2.86773903e-08]
 [2.78807962e-08 9.10772675e-08 3.34569554e-08 1.87358950e-08]
 [1.82154535e-08 1.95165573e-08 2.18585442e-08 4

array([1, 0, 1, 2, 3, 2, 1, 0, 1, 0, 1, 0, 1, 0, 1, 2, 3, 3, 2, 3])

In [238]:
test =np.array([1,2,3,4],dtype=float)
test[:-1]

array([1., 2., 3.])