In [194]:
import numpy as np

class HMM():

    def __init__(self,initial_prob, transmission_prob, emission_prob):
        '''
        Note that this implementation assumes that n, m, and T are small
        enough not to require underflow mitigation.

        Required Inputs:
        - transmission_prob: an (n+2) x (n+2) numpy array, initial, where n is
        the number of hidden states
        - emission_prob: an (n x m) 2-D numpy array, where m is the number of
        possible observations

        Optional Input:
        - obs: a list of observation labels, in the same order as their
        occurence within the emission probability matrix; otherwise, will assume
        that the emission probabilities are in alpha-numerical order.
        '''
        self.initial_prob=initial_prob
        self.transmission_prob = transmission_prob
        self.emission_prob = emission_prob
        self.n = self.emission_prob.shape[0] #n hidden states
        self.m = self.emission_prob.shape[1] #m possible obs
        self.observations = None
        self.psi = []
        self.emiss_ref = {}
        self.state_probs = []
    def forward(self,sequence,length=None):
        if length==None:
            length=len(sequence)
        alpha=np.zeros((self.n,length))
        #initial
        for i in range(self.n):
            alpha[i,0]=self.initial_prob[i]*self.emission_prob[i,int(sequence[0])]
        for j in range(self.n):
            for i in range(1,length):
                alpha_sum=0
                for k in range(self.n):
                    alpha_sum+=alpha[k,i-1]*self.transmission_prob[k,j]
                alpha[j,i]=self.emission_prob[j,int(sequence[i])]*alpha_sum
        return alpha
    def backward(self,sequence,length=None):
        if length==None:
            length=len(sequence)
        beta=np.zeros((self.n,length))
        for i in range(self.n):
            beta[i,length-1]=1
        for i in range(length-2,-1,-1):
            for j in range(self.n):
                beta_sum=0
                for k in range(self.n):
                    beta_sum+=beta[k,i+1]*self.transmission_prob[j,k]*self.emission_prob[k,int(sequence[i+1])]
                beta[j,i]=beta_sum
        return beta

    def mse(self,a,b):
        return np.abs((a-b)).mean()

    def gamma(self,sequence,length=None):
        if length==None:
            length=len(sequence)
        alpha=self.forward(sequence)
        beta=self.backward(sequence)
        alpha_beta=alpha*beta
        alpha_beta_sum=np.sum(alpha_beta,axis=0,keepdims=True)


        #init gamma test
        # print("gamma test")
        # for i in range(self.n):
        #     for t in range(len(sequence)):
        #         gam=alpha[i,t]*beta[i,t]
        #         gam_sum=0
        #         for j in range(self.n):
        #             gam_sum+=alpha[j,t]*beta[j,t]
        #         gam=gam/gam_sum
        #         print((alpha_beta/alpha_beta_sum)[i,t],gam)

        return alpha_beta/alpha_beta_sum
    def shi(self,sequence):
        length=len(sequence)
        trans_expanded=np.expand_dims(self.transmission_prob, axis=2)
        trans_repeated=trans_expanded.repeat(length,axis=2)

        alpha=self.forward(sequence)
        beta=self.backward(sequence)

        alpha_expanded=np.expand_dims(alpha, axis=1)
        alpha_repeated=alpha_expanded.repeat(self.n,axis=1)
        alpha_trans= trans_repeated * alpha_repeated
        new_b=np.zeros((self.n,length))
        for i,ch in enumerate(sequence[1:]):
            for j in range(self.n):
                new_b[j,i]=self.emission_prob[j,int(ch)]
        new_b[:,length-1]=1

        beta_emit=new_b*np.roll(beta,-1,axis=1)

        shii=alpha_trans*beta_emit
        raw_shi=shii
        shii=shii/shii.sum(axis=(0,1))

        #initial shi test
        # print("shi test")
        # for i in range(self.n):
        #     for j in range(self.n):
        #         for t in range(len(sequence)):
        #             if t+1==len(sequence):
        #                 tmp=beta[j,(t+1)%len(sequence)]
        #             else:
        #                 tmp=beta[j,t+1]*self.emission_prob[j,int(sequence[t+1])]
        #             sh=alpha[i,t]*self.transmission_prob[i,j]*tmp
        #             sh_sum=0
        #             for k in range(self.n):
        #                 for w in range(self.n):
        #                     if t+1==len(sequence):
        #                         tmp=beta[w,(t+1)%len(sequence)]
        #                     else:
        #                         tmp=beta[w,t+1]*self.emission_prob[w,int(sequence[t+1])]
        #                     sh_sum+=alpha[k,t]*self.transmission_prob[k,w]*tmp
        #             sh=sh/sh_sum
        #             print(shii[i,j,t],sh)

        return shii

    def obs_encode(self,sequences):
        obs=np.zeros((len(sequences),self.m,len(sequences[0])))
        for i in range(len(sequences)):
            for j in range(self.m):
                for k in range(len(sequences[0])):
                    if int(sequences[i][k])==j:
                        obs[i,j,k]=1
        self.obs=obs
    def update(self,sequences):
        gamma_list=[]
        sheee_list=[]
        old_init_prob=self.initial_prob
        old_trans_prob=self.transmission_prob
        old_emit_prob=self.emission_prob
        for sequence in sequences:
            gamma=self.gamma(sequence)
            sheee=self.shi(sequence)
            gamma_list.append(gamma)
            sheee_list.append(sheee)
        gamma=np.stack(gamma_list, axis=0)
        sheee=np.stack(sheee_list,axis=0)
        #print(self.initial_prob.shape,gamma[:,:,1].mean(axis=0).shape)

        #initial prob update
        self.initial_prob=gamma[:,:,0].mean(axis=0)

        #transition prob update
        self.transmission_prob=sheee[:,:,:,:-1].sum(axis=(0,3))/gamma[:,:,:-1].sum(axis=(0,2)).reshape(-1,1)

        #rebuild observations as matrices obs[i,j,k]=1 if
        obs=self.obs
        obs_reshaped=np.transpose(obs,(1,0,2)).reshape((obs.shape[1],-1))
        gamma_reshaped=np.transpose(gamma,(1,0,2)).reshape((gamma.shape[1],-1))
        #print(permuted_gamma)
        #print(obs.shape,permuted_gamma.shape)
        new_emits=np.dot(gamma_reshaped,obs_reshaped.T)/gamma.sum(axis=(0,2)).reshape(-1,1)
        self.emission_prob=new_emits
        init_error=self.mse(old_init_prob,self.initial_prob)
        trans_error=self.mse(old_trans_prob,self.transmission_prob)
        emit_error=self.mse(old_emit_prob,self.emission_prob)

        print(f"=============")
        print("Updated")
        print(f"init error={init_error}")
        print(f"trans error={trans_error}")
        print(f"emit_error={emit_error}")

        ###################test
        # print("initial state update test")
        # for i in range(self.n):
        #     gam_sum=0
        #     for j in range(len(sequences)):
        #         gam_sum+=gamma[j,i,1]
        #     new_pi=gam_sum/len(sequences)
        #     print(new_pi,self.initial_prob[i])

        # print("transion update test")
        # for i in range(self.n):
        #     for j in range(self.n):
        #         trans_sum=0
        #         for ip in range(len(sequences)):
        #             for t in range(len(sequences[0])-1):
        #                 trans_sum+=sheee[ip,i,j,t]
        #         gam_sum=0
        #         for ip in range(len(sequences)):
        #             for t in range(len(sequences[0])-1):
        #                 gam_sum+=gamma[ip,i,t]
        #         new_trans= trans_sum/gam_sum
        #         print(self.transmission_prob[i,j],new_trans)
        # print("emition update test")
        # for i in range(self.n):
        #     for j in range(self.m):
        #         emits_sum=0
        #         for ip in range(len(sequences)):
        #             for t in range(len(sequences[0])):
        #                 if int(sequences[ip][t])==j:
        #                     emits_sum+=gamma[ip,i,t]
        #         gam_sum=0
        #         for ip in range(len(sequences)):
        #             for t in range(len(sequences[0])):
        #                 gam_sum+=gamma[ip,i,t]
        #         new_emit=emits_sum/gam_sum
        #         print(self.emission_prob[i,j],new_emit)
    def viterbi(self,sequence):
        length=len(sequence)
        T1=np.zeros((self.n,length))
        T2=(np.zeros((self.n,length))-1).astype(int)
        for i in range(self.n):
            T1[i,0]=self.initial_prob[i]*self.emission_prob[i,int(sequence[0])]
            T2[i,0]=0
        for t in range(1,length):
            for i in range(self.n):
                T_list=[]
                for k in range(self.n):
                    T_list.append(T1[k,t-1]*self.transmission_prob[k,i]*self.emission_prob[i,int(sequence[t])])
                T1[i,t]=max(T_list)
                T2[i,t]=T_list.index(T1[i,t])
        highest_prob=np.max(T1[:,-1])
        best_states=[]
        best_last_state=np.argmax(T1[:,-1])
        for t in range(len(sequence)-1,-1,-1):
            best_states.insert(0,best_last_state)
            best_last_state=T2[best_last_state,t]
        print("================================")
        print(f"The prob of observing sequence:\n\"{sequence}\"\nis: {highest_prob}")
        tmp=""
        for state in best_states:
            tmp+=f"{str(state)}"
        print(tmp+" End")


# trans=np.array([[0.25,0.45,0.30],[0.10,0.35,0.35],[0.33,0.50,0.17]])
# emis=np.array([[0.45,0.55],[0.30,0.70],[0.10,0.90]])
# init=np.array([0.5,0.5,0.5])

# # trans=np.array([[0.25,0.75],[0.10,0.90]])
# # emis=np.array([[0.45,0.55],[0.30,0.70]])
# # init=np.array([1,1])
# input=["1010011","1011111","1011110"]
# #input=["1010011"]
# hmm=HMM(init,trans,emis)
# alpha=hmm.forward("1010011")
# beta=hmm.backward("1010011")
# hmm.obs_encode(input)
# # for i in range(10):
# #     hmm.update(input)
# hmm.viterbi("11101001")
# #hmm.shi("1001").shape

In [195]:
sequences=[]
with open("haplotype.txt","r") as f:
    for line in f.readlines():
        sequences.append(line[:-1])


In [196]:
np.random.seed(42)
n=2
m=2
trans_prob=np.ones((n,n))
trans_prob=trans_prob/trans_prob.sum(axis=1,keepdims=True)

emit_prob=np.ones((n,m))
emit_prob=emit_prob/emit_prob.sum(axis=1,keepdims=True)

init_prob=np.ones(n)/n

hmm=HMM(init_prob,trans_prob,emit_prob)
hmm.obs_encode(sequences)
    

In [197]:
iterations=20
for _ in range(iterations):
    hmm.update(sequences)

Updated
init error=0.0
trans error=0.0
emit_error=0.2857913294100459
Updated
init error=0.005555540278562421
trans error=0.0003082874840105859
emit_error=0.01693239413992014
Updated
init error=0.014827617971270024
trans error=0.0022083210516223167
emit_error=0.024136630614197387
Updated
init error=0.02691154951786265
trans error=0.007533480204691884
emit_error=0.025336414790714766
Updated
init error=0.0372181700527191
trans error=0.016495533634322032
emit_error=0.019396069770761127
Updated
init error=0.04091536187447517
trans error=0.02515342848682224
emit_error=0.010958875999590903
Updated
init error=0.037609041699428414
trans error=0.029332257723103897
emit_error=0.005773032678914915
Updated
init error=0.031060751109103613
trans error=0.029241898679870537
emit_error=0.0029415294229547936
Updated
init error=0.024401026858105912
trans error=0.02696217998577974
emit_error=0.0015384684081940114
Updated
init error=0.018685292853360597
trans error=0.024778214713444185
emit_error=0.00076310

In [198]:
print(hmm.initial_prob)
print(hmm.transmission_prob)
print(hmm.emission_prob)


[0.7332871 0.2667129]
[[0.81230229 0.18769771]
 [0.11756712 0.88243288]]
[[9.99995965e-01 4.03532609e-06]
 [7.62649041e-01 2.37350959e-01]]


In [199]:
obs=[]
with open("observations.txt","r") as f:
    for line in f.readlines():
        obs.append(line[:-1])

In [200]:
for sequence in obs:
    hmm.viterbi(sequence)

The prob of observing sequence:
"00000000001000010000100101001001100000001000001110000000000000001000001000010000000000000000000"
is: 7.414478741364792e-24
00000000001111111111111111111111111111111111111111111111111111111111111111110000000000000000000 End
The prob of observing sequence:
"01110001001011010111100011001000101110100101011101000110111100100000011000110000000000001000100"
is: 2.347228761789966e-37
11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111 End
The prob of observing sequence:
"00001000011000010000100101001001100001001000001111000000000001001001001010000000000000000000100"
is: 1.442330069859313e-28
00001111111111111111111111111111111111111111111111111111111111111111111110000000000000000000111 End
The prob of observing sequence:
"00000000001000010000100101000001110000010000000010000000000000000000000000000000000000000000000"
is: 4.0823123945111406e-19
000000000011111111111111111111111111111111111111100000000000000000000000000

In [6]:
import numpy as np

class HMM_DIP():

    def __init__(self,initial_prob, transmission_prob, emission_prob,sequences):
        '''
        Note that this implementation assumes that n, m, and T are small
        enough not to require underflow mitigation.

        Required Inputs:
        - transmission_prob: an (n+2) x (n+2) numpy array, initial, where n is
        the number of hidden states
        - emission_prob: an (n x m) 2-D numpy array, where m is the number of
        possible observations

        Optional Input:
        - obs: a list of observation labels, in the same order as their
        occurence within the emission probability matrix; otherwise, will assume
        that the emission probabilities are in alpha-numerical order.
        '''
        self.initial_prob=initial_prob
        self.transmission_prob = transmission_prob
        self.emission_prob = emission_prob
        self.n = emission_prob.shape[0] #n hidden states
        self.m = emission_prob.shape[1] #m possible obs
        self.observations = None
        self.psi = []
        self.emiss_ref = {}
        self.state_probs = []
        self.sequences=sequences
        self.encodes_sequences(sequences)
        self.obs_encode(sequences)
        self.make_initials()
        self.make_transition()
        self.make_emmissions()
    def make_emmissions(self):
        B = np.zeros((self.n, self.n, 3))
        for i in range(self.n):
            for j in range(self.n):
                for k in range(3):
                    if k == 0:
                        B[i,j,k]=self.emission_prob[i, 0] * self.emission_prob[j, 0] 
                    elif k == 1:
                        B[i,j,k]=0.5*self.emission_prob[i, 0] * self.emission_prob[j, 1] + 0.5*self.emission_prob[i, 1] * self.emission_prob[j, 0]
                        #B[i,j,k]=self.emission_prob[i, 0] * self.emission_prob[j, 1] + self.emission_prob[i, 1] * self.emission_prob[j, 0]

                    else:
                        B[i,j,k]=self.emission_prob[i, 1] * self.emission_prob[j, 1]
        self.B=B
    def make_transition(self):
        A=np.zeros((self.n,self.n,self.n,self.n))
        for i in range(self.n):
            for j in range(self.n):
                for z in range(self.n):
                    for k in range(self.n):
                        A[i,j,z,k]=self.transmission_prob[i,z]*self.transmission_prob[j,k]+self.transmission_prob[i,k]*self.transmission_prob[j,z]
                        #A[i,j,z,k]=self.transmission_prob[i,z]*self.transmission_prob[j,k]
        self.A=A
    def make_initials(self):
        pi=np.zeros((self.n,self.n))
        for i in range(self.n):
            for j in range(self.n):
                pi[i,j]=self.initial_prob[i]*self.initial_prob[j]
        self.pi=pi

    def encodes_sequences(self,sequences):
        self.encodes=[]
        for sequence in sequences:
            encoded_seq=self.seq_encode(sequence)
            self.encodes.append(encoded_seq)
    def obs_encode(self,sequences):
        obs=np.zeros((len(sequences),self.n,self.m,len(sequences[0])))
        for r in range(len(sequences)):
            for i in range(self.n):
                for j in range(self.m):
                    for t in range(len(sequences[0])):
                        emit=int(sequences[r][t])
                        if j==0 and emit==0:
                            obs[r,i,j,t]=1
                        elif j==0 and emit==1:
                            obs[r,i,j,t]=0
                        elif j==0 and emit==2:
                            obs[r,i,j,t]=0
                        
                        elif j==1 and emit==0:
                            obs[r,i,j,t]=0
                        elif j==1 and emit==1:
                            obs[r,i,j,t]=0
                        elif j==1 and emit==2:
                            obs[r,i,j,t]=1
        self.obs=obs


    def get_gamma_coef(self):
        encoded_seqs=self.encodes
        B=np.zeros((len(encoded_seqs),self.n,self.n,self.m,len(encoded_seqs[0])))
        for r in range(len(encoded_seqs)):
            for t in range(len(encoded_seqs[0])):
                for i in range(self.n):
                    for j in range(self.n):
                        for e in range(self.m):
                            obs=encoded_seqs[r][t]
                            if e==0 and obs==2:
                                B[r,i,j,e,t]=0
                            elif e==0 and obs==1:
                                B[r,i,j,e,t]=0.5*self.emission_prob[i,0]*self.emission_prob[j,1] + 0.5*self.emission_prob[i,1]*self.emission_prob[j,0]
                            elif e==0 and obs==0:
                                B[r,i,j,e,t]=self.emission_prob[i,0]*self.emission_prob[j,0]

                            elif e==1 and obs==2:
                                B[r,i,j,e,t]=self.emission_prob[i,1]*self.emission_prob[j,1]
                            elif e==1 and obs==1:
                                B[r,i,j,e,t]=0.5*self.emission_prob[i,0]*self.emission_prob[j,1]+0.5*self.emission_prob[i,1]*self.emission_prob[j,0]
                            elif e==1 and obs==0:
                                B[r,i,j,e,t]=0
                            

                        
        return B

    def seq_encode(self,sequence):
        seq_encoded=[]
        for i in range(len(sequence)):
            emit_symbol=None
            if sequence[i]=="0":
                emit_symbol=0
            elif sequence[i]=="1":
                emit_symbol=1
            else:
                emit_symbol=2
            seq_encoded.append(emit_symbol)
        return seq_encoded
    def forward(self,seq_id,length=None):
        if length==None:
            length=len(self.encodes[seq_id])
        alpha=np.zeros((self.n,self.n,length))
        enc_sequence=self.encodes[seq_id]
        #initial
        for i in range(self.n):
            for j in range(self.n):
                emit_symbol=enc_sequence[0]
                init_prob=self.pi[i,j]
                init_emit=self.B[i,j,emit_symbol]
                alpha[i,j,0]=init_prob*init_emit
        for t in range(1,length):
            for i in range(self.n):
                for j in range(self.n):
                    alpha_sum=0
                    emit_symbol=enc_sequence[t]
                    emition_prob=self.B[i,j,emit_symbol]
                    for k1 in range(self.n):
                        for k2 in range(self.n):
                            alpha_trans=alpha[k1,k2,t-1]*self.A[k1,k2,i,j]
                            alpha_sum+=alpha_trans
                    alpha[i,j,t]=alpha_sum*emition_prob
        return alpha
    def backward(self,seq_id,length=None):
        if length==None:
            length=len(self.encodes[seq_id])
        beta=np.zeros((self.n,self.n,length))
        enc_sequence=self.encodes[seq_id]
        #initial
        for i in range(self.n):
            for j in range(self.n):
                beta[i,j,length-1]=1
        for t in range(length-2,-1,-1):
            for i in range(self.n):
                for j in range(self.n):
                    beta_sum=0
                    emit_symbol=enc_sequence[t+1]
                    for k1 in range(self.n):
                        for k2 in range(self.n):
                            beta_trans=beta[k1,k2,t+1]*self.A[i,j,k1,k2]
                            emition_prob=self.B[k1,k2,emit_symbol]
                            beta_sum+=beta_trans*emition_prob
                    beta[i,j,t]=beta_sum
        return beta

    def mse(self,a,b):
        return np.abs((a-b)).sum()
    
    def gamma(self,seq_id,length=None):
        if length==None:
            length=len(self.encodes[seq_id])
        alpha=self.forward(seq_id)
        beta=self.backward(seq_id)

        new_beta=np.zeros((self.n,length))
        for i in range(self.n):
            new_beta[i]=beta[i,:,:].sum(axis=0)
        
        new_alpha=np.zeros((self.n,length))
        for i in range(self.n):
            new_alpha[i]=alpha[i,:,:].sum(axis=0)
        
        alpha_beta=new_alpha*new_beta
        gamma_mat=alpha_beta/alpha_beta.sum(axis=0,keepdims=True)
        #new_gamma=alpha*beta
        #new_gamma=new_gamma/new_gamma.sum(axis=(0,1),keepdims=True)
        #init gamma test
        # print("gamma test")
        # for i in range(self.n):
        #     for t in range(len(sequence)):
        #         gam=0
        #         for j in range(self.n):
        #             gam+=alpha[j,i,t]*beta[j,i,t]
        #         gam_sum=0
        #         for k1 in range(self.n):
        #             for k2 in range(self.n):
        #                 gam_sum+=alpha[k1,k2,t]*beta[k1,k2,t]
        #         gam=gam/gam_sum
        #         print(gamma_mat[i,t],gam)

        return gamma_mat
    def shi(self,seq_id):
        length=len(self.encodes[seq_id])
        trans_expanded=np.expand_dims(self.A, axis=-1)
        trans_repeated=trans_expanded.repeat(length,axis=-1)
        
        alpha=self.forward(seq_id)
        beta=self.backward(seq_id)

        alpha_expanded=np.expand_dims(alpha, axis=(2,3))
        alpha_repeated=alpha_expanded.repeat(self.n,axis=2)
        alpha_repeated=alpha_repeated.repeat(self.n,axis=3)
        alpha_trans= trans_repeated * alpha_repeated


        encoded=self.encodes[seq_id]
        new_b=np.zeros((self.n,self.n,length))
        for t,ch in enumerate(encoded[1:]):
            for i in range(self.n):
                for j in range(self.n):
                    emmit=self.B[i,j,encoded[t+1]]
                    new_b[i,j,t]=emmit
        new_b[:,:,length-1]=1

        beta_emit=new_b*np.roll(beta,-1,axis=2)

        shii=alpha_trans*beta_emit

        new_shii=np.zeros((self.n,self.n,length))
        for i in range(self.n):
            for j in range(self.n):
                new_shii[i,j]=shii[i,:,j,:].sum(axis=(0,1))

        new_shii=new_shii/new_shii.sum(axis=(0,1))

        #initial shi test
        # print("shi test")
        # for i in range(self.n):
        #     for z in range(self.n):
        #         for t in range(len(sequence)):
        #             sh=0
        #             for j in range(self.n):
        #                 for k in range(self.n):
        #                     if t+1==len(sequence):
        #                         tmp=beta[z,k,(t+1)%len(sequence)]
        #                     else:
        #                         emit_prob=0.5*self.emission_prob[z,encoded[t+1][0]]*self.emission_prob[k,encoded[t+1][1]]+0.5*self.emission_prob[z,encoded[t+1][1]]*self.emission_prob[k,encoded[t+1][0]]
        #                         tmp=beta[z,k,t+1]*emit_prob
        #                     sh+=alpha[i,j,t]*self.transmission_prob[i,z]*self.transmission_prob[j,k]*tmp
        #                     sh_sum=0
        #             for ip in range(self.n):
        #                 for jp in range(self.n):
        #                     for zp in range(self.n):
        #                         for kp in range(self.n):
        #                             if t+1==len(sequence):
        #                                 tmp=beta[zp,kp,(t+1)%len(sequence)]
        #                             else:
        #                                 emit_prob=0.5*self.emission_prob[zp,encoded[t+1][0]]*self.emission_prob[kp,encoded[t+1][1]]+0.5*self.emission_prob[zp,encoded[t+1][1]]*self.emission_prob[kp,encoded[t+1][0]]
        #                                 tmp=beta[zp,kp,t+1]*emit_prob
        #                             sh_sum+=alpha[ip,jp,t]*self.transmission_prob[ip,zp]*self.transmission_prob[jp,kp]*tmp
        #             print(shii[i,z,t],sh/sh_sum)

        return new_shii

        
    def update(self):
        gamma_list=[]
        sheee_list=[]
        old_init_prob=self.initial_prob
        old_trans_prob=self.transmission_prob
        old_emit_prob=self.emission_prob
        inputs=self.encodes
        for seq_id in range(len(inputs)):
            gamma=self.gamma(seq_id)
            sheee=self.shi(seq_id)
            gamma_list.append(gamma)
            sheee_list.append(sheee)
        gamma=np.stack(gamma_list, axis=0)
        sheee=np.stack(sheee_list,axis=0)
        #print(self.initial_prob.shape,gamma[:,:,1].mean(axis=0).shape)
        #initial prob update
        self.initial_prob=gamma[:,:,0].mean(axis=0)

        #transition prob update
        self.transmission_prob=sheee[:,:,:,:-1].sum(axis=(0,3))/gamma[:,:,:-1].sum(axis=(0,2)).reshape(-1,1)

        #rebuild observations as matrices obs[i,j,k]=1 if
        obs=self.obs
        gamma_expanded=np.expand_dims(gamma,axis=2).repeat(self.m,axis=2)
        new_emits=(obs*gamma_expanded).sum(axis=(0,-1))
        new_emits=new_emits/gamma.sum(axis=(0,2)).reshape(-1,1)
        self.emission_prob=new_emits


        # for j in range(self.m):
        #     obs=obs[:,:,j,:]
        #     obs_reshaped=np.transpose(obs,(1,0,2)).reshape((obs.shape[1],-1))
        #     gamma_reshaped=np.transpose(gamma,(1,0,2)).reshape((gamma.shape[1],-1))
        #print(permuted_gamma)
        #print(obs.shape,permuted_gamma.shape)
        # gamma_coef=self.get_gamma_coef()
        # em=np.zeros((self.n,self.m))
        # for i in range(self.n):
        #     for e in range(self.m):
        #         tmp=(gamma_coef[:,i,:,e,:]*new_gamma[:,i,:,:]).sum(axis=1)
        #         em[i,e]=tmp.sum(axis=(0,-1))#/gamma[:,i,:].sum(axis=(0,-1))
        # em=em/em.sum(axis=1,keepdims=True)
        # print(em)
        # self.emission_prob=em
        
        
        # raw_emits=np.dot(obs_reshaped,gamma_reshaped.T).T
        # new_emits=raw_emits/raw_emits.sum(axis=1).reshape(-1,1)
        # self.emission_prob=new_emits

        
        #print(self.emission_prob,new_emitssssssss)
        #self.emission_prob=new_emitssssssss


        init_error=self.mse(old_init_prob,self.initial_prob)
        trans_error=self.mse(old_trans_prob,self.transmission_prob)
        emit_error=self.mse(old_emit_prob,self.emission_prob)

        self.make_initials()
        self.make_transition()
        self.make_emmissions()
        print(f"=============")
        print("Updated")
        print(f"init error={init_error}")
        print(f"trans error={trans_error}")
        print(f"emit_error={emit_error}")

        ###################test
        # print("initial state update test")
        # for i in range(self.n):
        #     gam_sum=0
        #     for j in range(len(sequences)):
        #         gam_sum+=gamma[j,i,0]
        #     new_pi=gam_sum/len(sequences)
        #     print(new_pi,self.initial_prob[i])

        # print("transion update test")
        # for i in range(self.n):
        #     for j in range(self.n):
        #         trans_sum=0
        #         for ip in range(len(sequences)):
        #             for t in range(len(sequences[0])-1):
        #                 trans_sum+=sheee[ip,i,j,t]
        #         gam_sum=0
        #         for ip in range(len(sequences)):
        #             for t in range(len(sequences[0])-1):
        #                 gam_sum+=gamma[ip,i,t]
        #         new_trans= trans_sum/gam_sum
        #         print(self.transmission_prob[i,j],new_trans)
        # print("emition update test")
        # for i in range(self.n):
        #     for j in range(self.m):
        #         emits_sum=0
        #         for ip in range(len(sequences)):
        #             for t in range(len(sequences[0])):
        #                 if j==0 and int(sequences[ip][t])==0:
        #                     emits_sum+=gamma[ip,i,t]
        #                 elif j==0 and int(sequences[ip][t])==1:
        #                     emits_sum+=0.5*gamma[ip,i,t]
        #                 elif j==0 and int(sequences[ip][t])==2:
        #                     emits_sum+=0
                        
        #                 elif j==1 and int(sequences[ip][t])==0:
        #                     emits_sum+=0
        #                 elif j==1 and int(sequences[ip][t])==1:
        #                     emits_sum+=0.5*gamma[ip,i,t]
        #                 elif j==1 and int(sequences[ip][t])==2:
        #                     emits_sum+=gamma[ip,i,t]
                        
        #         gam_sum=0
        #         for ip in range(len(sequences)):
        #             for t in range(len(sequences[0])):
        #                 gam_sum+=gamma[ip,i,t]
        #         new_emit=emits_sum/gam_sum
        #         print(self.emission_prob[i,j],new_emits[i,j],new_emit)
    def print_states(self,sequence,best_states,highest_prob):
        print("================================")
        print(f"The prob of observing sequence:\n\"{sequence}\"\nis: {highest_prob}")
        tmp1=""
        tmp2=""
        for state in best_states:
            tmp1+=f"{str(state[0])}"
            tmp2+=f"{str(state[1])}"
        print(tmp1+" End")
        print(tmp2+" End")
    def generate_seq(self,best_states):
        hap1=""
        hap2=""
        for state in best_states:
            state1=int(state[0])
            state2=int(state[1])
            max_emit1=np.argmax(self.emission_prob[state1,:])
            max_emit2=np.argmax(self.emission_prob[state2,:])
            hap1+=str(max_emit1)
            hap2+=str(max_emit2)
        return hap1,hap2
    def viterbi(self,sequence,print_fn):
        length=len(sequence)
        T1=np.zeros((self.n,self.n,length))
        T2=(np.zeros((self.n,self.n,2,length))-1).astype(int)
        for i in range(self.n):
            for j in range(self.n):
                T1[i,j,0]=self.pi[i,j]*self.B[i,j,int(sequence[0])]
                T2[i,j,:,0]=0
        for t in range(1,length):
            for i in range(self.n):
                for j in range(self.n):
                    T_list=[]
                    T2_list=[]
                    for k1 in range(self.n):
                        for k2 in range(self.n):
                            T_list.append(T1[k1,k2,t-1]*self.A[k1,k2,i,j]*self.B[i,j,int(sequence[t])])
                            T2_list.append((k1,k2))
                    
                    T1[i,j,t]=max(T_list)
                    path1,path2=T2_list[T_list.index(T1[i,j,t])]
                    T2[i,j,0,t]=path1
                    T2[i,j,1,t]=path2
        highest_prob=np.max(T1[:,:,-1])
        best_states=[]
        best_last_state=np.argmax(T1[:,:,-1])
        best_last_state=np.unravel_index(best_last_state, T1[:,:,0].shape)
        for t in range(len(sequence)-1,-1,-1):
            best_states.insert(0,best_last_state)
            best_last_state=T2[best_last_state[0],best_last_state[1],:,t]
            best_last_state=(best_last_state[0],best_last_state[1])
        if print_fn:
            self.print_states(sequence,best_states,highest_prob)
        return highest_prob,sequence,best_states,T1


# trans=np.array([[0.25,0.45,0.30],[0.10,0.35,0.35],[0.33,0.50,0.17]])
# emis=np.array([[0.45,0.55],[0.30,0.70],[0.10,0.90]])
# init=np.array([0.5,0.5,0.5])

# trans=np.array([[0.25,0.75],[0.10,0.90]])
# emis=np.array([[0.45,0.55],[0.30,0.70]])
# init=np.array([1,1])
# input=["1010011","1011111","1011110"]
# input=["000000","111111"]
input=["2222220000000002","1112222222220002"]
# hmm=HMM_DIP(init,trans,emis)
# # #hmm.shi(input[0]).shape
# # hmm.obs_encode(input[0])
# # hmm.viterbi(input[0],print_fn=True)

np.random.seed(43)

n=2
m=2
trans_prob=np.random.random((n,n))
trans_prob=trans_prob/trans_prob.sum(axis=1,keepdims=True)

emit_prob=np.random.random((n,m))
emit_prob=emit_prob/emit_prob.sum(axis=1,keepdims=True)
#emit_prob=np.array([[0.7,0.3],[0.2,0.8]])
init_prob=np.ones(n)/n

hmm=HMM_DIP(init_prob,trans_prob,emit_prob,input)
# gam=hmm.gamma(0)
# shii=hmm.shi(0)
hmm.update()
# #hmm.forward(input[0])
# # #a=hmm.gamma(input[0])
# # # alpha=hmm.forward("1010011")
# # # beta=hmm.backward("1010011")
# #hmm.obs_encode(input)
# # # # for i in range(10):
# # hmm.viterbi("11101001")
# # # #hmm.shi("1001").shape
iterations=1
for _ in range(iterations):
    hmm.update()

Updated
init error=0.0728402658508478
trans error=0.23739045324700006
emit_error=0.238486783539736
Updated
init error=0.04307128013385708
trans error=0.12426742968736001
emit_error=0.015417953932716555


In [2]:
print(hmm.initial_prob)
print(hmm.transmission_prob)
print(hmm.emission_prob)


[0.73650618 0.26349382]
[[0.83665732 0.1494031 ]
 [0.2261293  0.79145259]]
[[9.39617507e-53 9.07477126e-01]
 [9.04517230e-01 1.22832467e-08]]


In [7]:
sequences=[]
with open("genotype.txt","r") as f:
    for line in f.readlines():
        sequences.append(line[:-1])

In [8]:
np.random.seed(43)

n=4
m=2
trans_prob=np.random.random((n,n))
trans_prob=trans_prob/trans_prob.sum(axis=1,keepdims=True)

emit_prob=np.random.random((n,m))
emit_prob=emit_prob/emit_prob.sum(axis=1,keepdims=True)
#emit_prob=np.array([[0.7,0.3],[0.2,0.8]])
init_prob=np.ones(n)/n

hmm=HMM_DIP(init_prob,trans_prob,emit_prob,sequences)
print(hmm.initial_prob)
print(hmm.transmission_prob)
print(hmm.emission_prob)
iterations=20
for _ in range(iterations):
    hmm.update()



[0.25 0.25 0.25 0.25]
[[0.10477588 0.55465404 0.12147415 0.21909594]
 [0.13667646 0.35894176 0.27828793 0.22609386]
 [0.01480479 0.37440736 0.20152986 0.40925798]
 [0.18186139 0.04066162 0.61948445 0.15799255]]
[[0.56163827 0.43836173]
 [0.08333922 0.91666078]
 [0.4663465  0.5336535 ]
 [0.28762368 0.71237632]]
Updated
init error=0.36860358002580457
trans error=1.6415199685904576
emit_error=3.2573885579568884
Updated
init error=0.08648592026096878
trans error=0.4284674006644124
emit_error=0.026466464706486073
Updated
init error=0.06150133370358049
trans error=0.2149145867936486
emit_error=0.01601004395598144
Updated
init error=0.047144807368236424
trans error=0.10959785400837606
emit_error=0.01670607508378054
Updated
init error=0.03863049575491892
trans error=0.05703608120012657
emit_error=0.01840525078843818
Updated
init error=0.0349113920382379
trans error=0.0307068095648369
emit_error=0.019512409197851452


KeyboardInterrupt: 

In [9]:
print(hmm.initial_prob)
print(hmm.transmission_prob)
print(hmm.emission_prob)


[0.4497392  0.00334711 0.35465835 0.19225534]
[[0.11949983 0.14881872 0.39907414 0.33267124]
 [0.11038928 0.18626893 0.37852492 0.32464547]
 [0.11672067 0.14827097 0.39961648 0.33543511]
 [0.11857655 0.14919158 0.40319177 0.32904433]]
[[0.78657251 0.04637864]
 [0.342974   0.30866924]
 [0.74799975 0.06718216]
 [0.69202055 0.10009896]]


In [10]:
def switch_err(str1,ref):
    error=0
    for i in range(len(str1)):
        if str1[i]=="1":
            if str1[i]!=ref[i]:
                error+=1
    return error/len(str1)

In [15]:
obs=[]
with open("gen_hap.txt","r") as f:
    for line in f.readlines():
        obs.append(line[:-1])

ref_list=[]
for i in range(0,len(obs),4):
    hap1=obs[i]
    hap2=obs[i+1]
    gen=obs[i+2]
    ref_list.append((hap1,hap2,gen))
mean_eror=0
#ref_list=ref_list[:3]
# ref_list=[("00000000000000010000000001000101100000000010000010000000000010000000000000000100000000000000010",
#            "01110001001010010111100011011011101110100101011101001000000000000100011000011000000000001000100",
#            "01110001001010020111100012011112201110100111011111001000000010000100011000011100000000001000110")]
ref_list=list(set(ref_list))[:3]
count=len(ref_list)
for i in range(len(ref_list)):
    hap1,hap2,sequence=ref_list[i]
    #prob1,seq1=hmm.viterbi(sequence)
    
    _,ref,best_states,_=hmm.viterbi(sequence,print_fn=False)
    gen_hap1,gen_hap2=hmm.generate_seq(best_states)
    print(ref)
    print(gen_hap1)
    print(gen_hap2)
    a=switch_err(hap1,gen_hap1)
    b=switch_err(hap2,gen_hap2)

    c=switch_err(hap1,gen_hap2)
    d=switch_err(hap2,gen_hap1)


    if a+b<c+d:
        mean_eror+=a+b
    else:
        mean_eror+=c+d
print(mean_eror/count)


01111001102010020111200112012012201110101101012212000000000001001100012000022000100010012110201
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
00000000001100020000100102001102200000001000001120000000000010001000001000010100000000000000010
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
00000000000000100000000002000001100000000000000010000000000000000000000000000000010000000000000
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
0.3333333333333333
