# Dang Thanh Vu - 197796

In [1]:
import numpy as np
import scipy.io
from scipy.stats import norm
def TrainDOHMMBaumWelch_Single_Seq_variables(pi_init, A, B, O):
    M, N = B.shape[:]
    T = len(O)
    
    #alpha
    alpha = np.zeros((T, N))
    alpha[0] = B[O[0]].T*pi_init
    c = np.zeros(T)
    c[0] = 1./np.sum(alpha[0])
    alpha[0] = c[0]*alpha[0]
    
    for t in range(T - 1):
        for j in range(N):
            alpha[t + 1, j] = B[O[t + 1], j]*np.sum(alpha[t]*A[:, j].T)
        c[t+1] = 1./np.sum(alpha[t+1])
        alpha[t+1] = c[t+1]*alpha[t+1]
    P = -np.sum(np.log10(c))
    
    beta = np.zeros((T, N))
    beta[-1, :] = 1
    beta[-1] = beta[-1]*c[-1]
    for t in np.arange(T-2, -1, -1):
        for i in range(N):
            beta[t, i] = np.sum(A[i]*B[O[t + 1]]*beta[t+1])
        beta[t] = c[t]*beta[t]
    
    #ksi
    ksi = np.zeros((T - 1, N, N))
    for t in range(T-1):
        denum = 0
        for i in range(N):
            for j in range(N):
                denum = denum + alpha[t, i]*A[i, j]*B[O[t+1], j]*beta[t+1, j]
        for i in range(N):
            for j in range(N):
                ksi[t, i, j] = (alpha[t, i]*A[i, j]*B[O[t+1], j]*beta[t+1, j])/denum
    #gamma
    gamma = np.zeros((T - 1, N))
    for t in range(T - 1):
        for i in range(N):
            gamma[t, i] = 0
            for j in range(N):
                gamma[t, i] = gamma[t, i] + ksi[t, i, j]
    return alpha, beta, c, P, ksi, gamma

def TrainDOHMMBaumWelch_Single_Seq_reest_variables(M,N,T,O,gamma,ksi):
    Adenum = np.zeros(N)
    Anum = np.zeros((N, N))
    for i in range(N):
        for j in range(N):
            Adenum[i] = 0
            Anum[i, j] = 0
            for t in range(T-1):
                Adenum[i] = Adenum[i] + gamma[t, i]
                Anum[i, j] = Anum[i, j] + ksi[t, i, j]
    
    Bdenum = np.zeros(N)
    Bnum = np.zeros((M, N))
    for j in range(N):
        for k in range(M):
            Bdenum[j] = 0
            Bnum[k,j] = 0
            for t in range(T-1):
                Bdenum[j] = Bdenum[j] + gamma[t, j]
                if(O[t] == k):
                    Bnum[k, j] = Bnum[k, j] + gamma[t, j]
    return Anum, Adenum, Bnum, Bdenum
            
def MultSeqTrainDoHMMBWsc(pi_init, A, B, NumericData, maxEpoch):
    #M: state of data, N: state, K: number of data, T: data length
    M, N = B.shape[:]
    K = len(NumericData)
    epoch  = 1
    curProb = -10000
    AllProb = []
    
    alpha = []; beta = []; c = []; P = []; ksi = []; gamma = []; Anum = []; Adenum = []; Bnum = []; Bdenum = []
    
    while epoch <= maxEpoch:
        for k in range(K):
            O = NumericData[k]
            T = len(O)
            alpha_, beta_, c_, P_, ksi_, gamma_ = TrainDOHMMBaumWelch_Single_Seq_variables(pi_init, A, B, O)
            Anum_, Adenum_, Bnum_, Bdenum_ =TrainDOHMMBaumWelch_Single_Seq_reest_variables(M,N,T,O,gamma_,ksi_)
            alpha.append(alpha_); beta.append(beta_); P.append(P_); ksi.append(ksi_); gamma.append(gamma_)
            Anum.append(Anum_); Adenum.append(Adenum_); Bnum.append(Bnum_); Bdenum.append(Bdenum_)
        
        pi_init_hat = np.zeros(N)
        for i in range(N):
            pi_init_hat[i] = 0
            for k in range(K):
                pi_init_hat[i] = pi_init_hat[i] + gamma[k][0, i]
        pi_init_hat = pi_init_hat/np.sum(pi_init_hat)
        
        A_hat_num = np.zeros((N, N))
        A_hat_denum = np.zeros((N, N))
        A_hat = np.zeros((N, N))
        for i in range(N):
            for j in range(N):
                A_hat_num[i, j] = 0
                A_hat_denum[i, j] = 0
                for k in range(K):
                    A_hat_num[i, j] = A_hat_num[i, j] + (1./P[k])*Anum[k][i, j]
                    A_hat_denum[i, j] = A_hat_denum[i, j] + (1./P[k])*Adenum[k][i]
                A_hat[i, j] = A_hat_num[i, j]/A_hat_denum[i, j]
    
        
        B_hat_num = np.zeros((M, N))
        B_hat_denum = np.zeros((M, N))
        B_hat = np.zeros((N, N))
        for i in range(M):
            for j in range(N):
                B_hat_num[i, j] = 0
                B_hat_denum[i, j] = 0
                for k in range(K):
                    B_hat_num[i, j] = B_hat_num[i, j] + (1/P[k])*Bnum[k][i, j]
                    B_hat_denum[i, j] = B_hat_denum[i, j] + (1/P[k])*Bdenum[k][j]
                B_hat[i, j] = B_hat_num[i, j]/B_hat_denum[i, j]
            
        sumP = np.sum(P)
        if(sumP <= curProb):
            break
        else:
            AllProb.append(sumP)
            curProb = sumP
        
        pi_init=pi_init_hat
        A=A_hat
        B=B_hat
        epoch=epoch+1
        
        

    return pi_init, A, B
        

In [2]:
def BWDoHMMst(pi_init,A,B,O):#Forward
    T = len(O)
    M, N = B.shape[:]
    
    alpha = np.zeros((N, T))
    alpha[:, 0] = pi_init*B[O[0]].T
    for t in range(1, T):
        for i in range(N):
            alpha[i, t] = np.sum((alpha[:, t-1]*A[:,i]))*(B[O[t], i])
    
    return np.sum(alpha[:, -1])

def VitDoHMMst(pi_init,A,B,O):
    T = len(O)
    M, N = B.shape[:]
    
    alpha = np.zeros((N, T))
    alpha[:, 0] = pi_init*B[O[0]].T
    Pred = np.zeros((N, T))
    
    for t in range(1, T):
        for i in range(N):
            temp = alpha[:, t-1]*A[:, i]*B[O[t], i]
            alpha[i, t] = np.max(temp)
            temp = alpha[:, t-1]*A[:, i]
            Pred[i, t] = np.argmax(temp)
    
    path = np.zeros(T)
    p = np.max(alpha[:, -1])
    path[-1] = np.argmax(Pred[:, -1])
    for t in np.arange(T - 2, -1, -1):
        path[t] = Pred[int(path[t + 1]), t + 1]
    return p, path

def VitCoHMMst(pi_init,A,B,O):
    T = len(O)
    N, N = A.shape[:]
    
    alpha = np.zeros((N, T))
    for i in range(N):
        alpha[i, 0] = pi_init[i]*norm(B[0,i], B[1, i]).pdf(O[0])
    Pred = np.zeros((N, T))
    
    for t in range(1, T):
        for i in range(N):
            temp = alpha[:, t-1]*A[:, i]*norm(B[0,i], B[1, i]).pdf(O[t])
            alpha[i, t] = np.max(temp)
            temp = alpha[:, t-1]*A[:, i]
            Pred[i, t] = np.argmax(temp)
    
    path = np.zeros(T)
    p = np.max(alpha[:, -1])
    path[-1] = np.argmax(Pred[:, -1])
    for t in np.arange(T - 2, -1, -1):
        path[t] = Pred[int(path[t + 1]), t + 1]
    return p, path
    

        

In [3]:
def MultSeqTrainDoHMMVIT(pi_init, A, B, NumericData, maxEpoch):
    L = len(NumericData)
    M, N = B.shape[:]
    print(pi_init)
    epoch=0
    AllProb=[]
    MatchingProb = np.zeros(L)
    while epoch <= maxEpoch:
        Atemp = np.zeros((N,N))
        Btemp = np.zeros((M, N))
        pitemp = np.zeros(N)
        
        for i in range(L):
            O = NumericData[i]
            p, Path = VitDoHMMst(pi_init, A, B, O)
            MatchingProb[i] = p
            Path = Path.astype(int)
            for k in range(len(Path) - 1):
                Atemp[Path[k],Path[k+1]]=Atemp[Path[k],Path[k+1]]+1#count
            for k in range(len(Path)):
                Btemp[O[k], Path[k]] = Btemp[O[k], Path[k]] + 1#count
            pitemp[Path[0]] = pitemp[Path[0]] + 1#count
        AllProb.append(np.sum(MatchingProb))
        if (epoch > 1) and (AllProb[epoch] <= AllProb[epoch - 1]):
            break
        pi_init = pitemp/np.sum(pitemp)
        N, N = A.shape[:]
        for kkk in range(N):
            Atemp[kkk] = Atemp[kkk]/np.sum(Atemp[kkk])
            Btemp[:, kkk] = Btemp[:, kkk]/np.sum(Btemp[:, kkk])
        A = Atemp
        B = Btemp
        epoch = epoch + 1
    return pi_init, A, B
            
                

In [4]:
data = scipy.io.loadmat("DOHMMTrainingData.mat")
TrainingData = data["TrainingData"][0]
Train = []
for data in TrainingData:
    s = data[0]
    num_data = []
    for c in s:
        if c == 'H':
            num_data.append(0)
        else:
            num_data.append(1)
    Train.append(num_data)
            

# 6.3.1

In [5]:
pi_init = np.array([0.5, 0.5]).T
A_init = np.array([[0.6, 0.4], [0.01, 0.99]])
B_init = np.array([[0.6, 0.01], [0.4, 0.99]])
pi, A, B = MultSeqTrainDoHMMBWsc(pi_init, A_init, B_init, Train[:70], 1000)
print("pi ", pi)
print("A ", A)
print("B ", B)
print("small change in inital value -> large change in the result")

pi  [0.93384223 0.06615777]
A  [[0.96002916 0.03997084]
 [0.30940782 0.69059218]]
B  [[0.69097144 0.06529574]
 [0.30902856 0.93470426]]
small change in inital value -> large change in the result


# 6.3.2

In [6]:
for i in range(70, 100):
    p, path = VitDoHMMst(pi_init,A,B,Train[i])
    print("sample ",i, " th with Viterbi score ", p)
    print(path)

sample  70  th with Viterbi score  1.2344324713269179e-13
[1. 1. 1. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0.]
sample  71  th with Viterbi score  1.0998207096553392e-15
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0.]
sample  72  th with Viterbi score  2.332690921222666e-15
[1. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0.]
sample  73  th with Viterbi score  1.134831876228945e-14
[1. 1. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0.]
sample  74  th with Viterbi score  1.0998207096553398e-15
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0