In [3]:
import numpy as np
import pandas as pd
from collections import Counter

1. generate data (gaussian)

2. two cases, one easy to classify, one hard to classify.

3. forward, backward.

In [175]:
class HMM():
    def __init__(self,X_train,Y_train):
        self.make_codebook(X_train,Y_train)
        
        self.X_train = self.encode_X(X_train)
        self.Y_train = self.encode_Y(Y_train) 
        
        
    def make_codebook(self,X_train,Y_train):
        
        self.X_dic_encode = {char:i for i,char in enumerate(np.unique(X_train.ravel()))}
        self.X_dic_decode = {self.X_dic_encode[char]:char for char in self.X_dic_encode}
        
        self.Y_dic_encode = {char:i for i,char in enumerate(np.unique(Y_train.ravel()))}
        self.Y_dic_decode = {self.Y_dic_encode[char]:char for char in self.Y_dic_encode}
    
    def encode_X(self,X):
        return np.array([[self.X_dic_encode[X[i][j]] for j in range(X.shape[1])] for i in range(X.shape[0])])
        
    def encode_Y(self,Y):
        return np.array([[self.Y_dic_encode[Y[i][j]] for j in range(Y.shape[1])] for i in range(Y.shape[0])])
    
    def decode_X(self,X):
        return np.array([[self.X_dic_decode[X[i][j]] for j in range(X.shape[1])] for i in range(X.shape[0])])
    
    def decode_Y(self,Y):
        return np.array([[self.Y_dic_dncode[Y[i][j]] for j in range(Y.shape[1])] for i in range(Y.shape[0])])
    
    def ic(self,Y_train):
        cnt = Counter([temp[0] for temp in Y_train])
        return np.array([cnt[i] for i in range(len(self.Y_dic_decode))])
    
    def fc(self,Y_train):
        cnt = np.zeros((len(self.Y_dic_decode),len(self.Y_dic_decode)))
        for i in range(Y_train.shape[0]):
            cnt[Y_train[i][-1]][Y_train[i][-2]]+=1
        return cnt
    
    def tc(self,Y_train):
        cnt = np.zeros((len(self.Y_dic_decode),len(self.Y_dic_decode)))
        for i in range(Y_train.shape[0]):
            for j in range(Y_train.shape[1]-2):
                cnt[Y_train[i][j+1]][Y_train[i][j]]+=1
        return cnt
    
    def sc(self,X_train,Y_train):
        cnt = np.zeros((len(self.X_dic_decode),len(self.Y_dic_decode)))
        for i in range(X_train.shape[0]):
            for j in range(X_train.shape[1]):
                cnt[X_train[i][j]][Y_train[i][j]]+=1
        return cnt
    
    def collect_counts(self):
        self.initial_counts = self.ic(self.Y_train)
        self.final_counts = self.fc(self.Y_train)
        self.transition_counts = self.tc(self.Y_train)
        self.state_counts = self.sc(self.X_train,self.Y_train)
    
    def sanity_check(self):
        if np.sum(self.initial_counts)==self.X_train.shape[0]:
            print(1)
        if np.sum(self.final_counts) == self.X_train.shape[0]:
            print(2)
        if np.sum(self.transition_counts) == self.X_train.shape[0]*(self.X_train.shape[1]-2):
            print(3)
        if np.sum(self.state_counts) == self.X_train.shape[0]*self.X_train.shape[1]:
            print(4)
    
    def fit(self):
        self.init_probs = (self.initial_counts/self.initial_counts.sum())
        self.transition_probs = (self.transition_counts/self.transition_counts.sum(0))
        self.final_probs = (self.final_counts/self.final_counts.sum(0))
        self.observation_probs = (self.state_counts/self.state_counts.sum(0))
    
    def build_potentials(self,sequence):
        s = self.encode_X(sequence)[0]
        node_potentials = np.zeros((len(self.Y_dic_decode),s.shape[0]))
        for i in range(len(self.Y_dic_decode)):
            for j in range(s.shape[0]):
                if not j:
                    node_potentials[i,j] = self.init_probs[i]*self.observation_probs[s[j],i]
                else:
                    node_potentials[i,j] = self.observation_probs[s[j],i]
        
        edge_potentials = np.zeros((len(self.Y_dic_decode),len(self.Y_dic_decode),s.shape[0]-1))
        for cur_state in range(len(self.Y_dic_decode)):
            for next_state in range(len(self.Y_dic_decode)):
                for i in range(s.shape[0]-1):
                    if i == s.shape[0]-2:
                        edge_potentials[cur_state,next_state,i] = self.final_probs[next_state,cur_state]
                    else:
                        edge_potentials[cur_state,next_state,i] = self.transition_probs[next_state,cur_state]
        return node_potentials,edge_potentials
    
    def _forward_backward(self,node_potentials,edge_potentials):
        H,N = node_potentials.shape
        forward = np.zeros([H,N])
        backward = np.ones([H,N])
        forward[:,0] = node_potentials[:,0]
        ##forward
        for pos in range(1,N):
            for cur_state in range(H):
                forward_v = forward[:,pos-1]
                trans_v = edge_potentials[:,cur_state,pos-1]
                forward[cur_state,pos] = (forward_v@trans_v)*node_potentials[cur_state,pos]

        ##backward
        for pos in range(N-2,-1,-1):
            for cur_state in range(H):
                backward_v = backward[:,pos+1]*node_potentials[:,pos+1]
                trans_v = edge_potentials[cur_state,:,pos]
                backward[cur_state,pos] = trans_v@backward_v

        return forward,backward
    
    def forward_backward(self,sequence):
        node_potentials,edge_potentials = self.build_potentials(sequence)
        return self._forward_backward(node_potentials,edge_potentials)
    
    def get_node_posteriors(self,sequence):
        forward,backward = self.forward_backward(sequence)
        p = forward[:,0]@backward[:,0]
        H,N = forward.shape
        posteriors = forward*backward/p
        return posteriors
    
    def _viterbi(node_potentials,edge_potentials):
        H,N = node_potentials.shape
        forward = np.zeros((H,N))
        beststate = np.zeros((H,N),dtype="int")

        #forward
        for state in range(H):
            forward[state,0] = node_potentials[state,0]
        for pos in range(1,N):
            for state in range(H):
                forward_v = forward[:,pos-1]
                trans_v = edge_potentials[:,state,pos-1]
                forward[state,pos] = np.max(forward_v*trans_v)*node_potentials[state,pos]
                beststate[state,pos] = np.argmax(forward_v*trans_v)
        #backward
        y = []
        y.append(np.argmax(forward[:,N-1]))
        for pos in range(N-1,0,-1):
            y.append(beststate[y[-1],pos])
        return y[::-1]
    
    def viterbi_decode(self,sequence):
        node_potentials,edge_potentials = self.build_potentials(sequence)
        return _viterbi(node_potentials,edge_potentials)
        
        
                
        

In [176]:
X_train = np.array([["w","w","s","c"],["w","w","s","c"],["w","s","s","c"]])
Y_train = np.array([["r","s","s","s"],["r","r","r","s"],["s","s","s","s"]])

In [177]:
hmm = HMM(X_train,Y_train)

In [178]:
hmm.collect_counts()

In [179]:
hmm.sanity_check()

1
2
3
4


In [180]:
hmm.fit()

In [181]:
hmm.init_probs[0]

0.6666666666666666

In [182]:
hmm.observation_probs[1,1]

0.375

In [183]:
hmm.transition_probs

array([[0.66666667, 0.        ],
       [0.33333333, 1.        ]])

In [184]:
hmm.final_probs

array([[0., 0.],
       [1., 1.]])

In [185]:
node_potentials,edge_potentials = hmm.build_potentials(X_train[0][None,:])

In [158]:
def forward_backward(node_potentials,edge_potentials):
    H,N = node_potentials.shape
    forward = np.zeros([H,N])
    backward = np.ones([H,N])
    forward[:,0] = node_potentials[:,0]
    ##forward
    for pos in range(1,N):
        for cur_state in range(H):
            forward_v = forward[:,pos-1]
            trans_v = edge_potentials[:,cur_state,pos-1]
            forward[cur_state,pos] = (forward_v@trans_v)*node_potentials[cur_state,pos]
    
    ##backward
    for pos in range(N-2,-1,-1):
        for cur_state in range(H):
            backward_v = backward[:,pos+1]*node_potentials[:,pos+1]
            trans_v = edge_potentials[cur_state,:,pos]
            backward[cur_state,pos] = trans_v@backward_v
    
    return forward,backward
            
            

In [159]:
forward,backward = forward_backward(node_potentials,edge_potentials)

In [160]:
forward

array([[0.5       , 0.25      , 0.04166667, 0.        ],
       [0.08333333, 0.0625    , 0.0546875 , 0.03613281]])

In [161]:
backward

array([[0.06640625, 0.109375  , 0.375     , 1.        ],
       [0.03515625, 0.140625  , 0.375     , 1.        ]])

In [162]:
def sanity_check_forward_backward(forward,backward):
    for i in range(forward.shape[1]):
        print(forward[:,i]@backward[:,i])

In [163]:
sanity_check_forward_backward(forward,backward)

0.0361328125
0.0361328125
0.0361328125
0.0361328125


In [187]:
p = hmm.get_node_posteriors(X_train[0][None,:])

In [189]:
np.argmax(p,axis=0)

array([0, 0, 1, 1], dtype=int64)

In [222]:
def viterbi(node_potentials,edge_potentials):
    H,N = node_potentials.shape
    forward = np.zeros((H,N))
    beststate = np.zeros((H,N),dtype="int")
    
    #forward
    for state in range(H):
        forward[state,0] = node_potentials[state,0]
    for pos in range(1,N):
        for state in range(H):
            forward_v = forward[:,pos-1]
            trans_v = edge_potentials[:,state,pos-1]
            forward[state,pos] = np.max(forward_v*trans_v)*node_potentials[state,pos]
            beststate[state,pos] = np.argmax(forward_v*trans_v)
    #backward
    y = []
    y.append(np.argmax(forward[:,N-1]))
    for pos in range(N-1,0,-1):
        y.append(beststate[y[-1],pos])
    return y[::-1]
        

In [223]:
edge_potentials

array([[[0.66666667, 0.66666667, 0.        ],
        [0.33333333, 0.33333333, 1.        ]],

       [[0.        , 0.        , 0.        ],
        [1.        , 1.        , 1.        ]]])

In [224]:
viterbi(node_potentials,edge_potentials)

[0, 0, 0, 1]

In [221]:
np.argmax(node_potentials[:,0])==np.array([0])

array([ True])

In [206]:
hmm.Y_dic_decode

{0: 'r', 1: 's'}