### Problem 25

Viterbi learning

Given: A sequence of emitted symbols x = x1 ... xn in an alphabet A, generated by a k-state HMM with unknown transition and emission probabilities, initial Transition and Emission matrices and a number of iterations i.

Return: A matrix of transition probabilities Transition and a matrix of emission probabilities Emission that maximizes Pr(x, π) over all possible transition and emission matrices and over all hidden paths π.

In [14]:
import numpy as np
class ViterbiLearning():
    def __init__(self, infile):
        '''
        contructor: saves attributes 
        
        Parameters
        ----------
            infile : file name
                
        '''
        self.file=infile
    
    def readHMM(self):
        """
        read HMM file
        
        Return
        ----------
        String:list
            the index of observable string the path emits
        state:list
            all the states
        emission:
            all the emission 
        transition.astype(float):ndarray
            transition probability
        emissionProb.astype(float):ndarray
            emission matrix
        """
        with open(self.file) as rawData:
            data=rawData.readlines()
        iteration=int(data[0].rstrip()) #the first row is the length of the path
        string=data[2].rstrip() #the 3rd row is the obserable string
        emission=data[4].rstrip().split('\t') #emission list
        state=data[6].rstrip().split('\t') #state list
        #-----------------------from 9th to 9+len(state)-1 is transition matrix----------------------#
        transitionMatrix=data[10:10+len(state)-1] 
        transitionProb=np.array([data[9].rstrip().split('\t')[1:]]) #the first row
        for i in range(len(state)-1):
            transitionProb=np.append(transitionProb,[transitionMatrix[i].rstrip().split('\t')[1:]],axis=0)
        #-----------------------from 9th to 9+len(state)-1 is transition matrix----------------------#
        
        #---------------------from 11th line on, the rest is emission probability----------------------#
        probMatrix=data[12+len(state):] 
        emissionProb=np.array([data[11+len(state)].rstrip().split('\t')[1:]]) #emission probability starts from here
        for i in range(len(state)-1):
            #we append the probability to the matrix
            emissionProb=np.append(emissionProb,[probMatrix[i].rstrip().split('\t')[1:]],axis=0) 
        #---------------------from 11th line on, the rest is emission probability----------------------#
        String=[] #stores the index of the observe
        for i in string:
            String.append(emission.index(i)) #we can now use the index to access emission matrix
        return iteration,String,state,emission,transitionProb.astype(float),emissionProb.astype(float)
    
    def Vertibi(self,string,state,emission,transitionProb,emissionProb):
        """
        Calculate the maximal probability Pr(x) that the HMM emits x.
        
        Parameters
        ----------
        string:list
            the index of observable string the path emits
        state:list
            all the states
        emission:
            all the emission 
        transitionProb:ndarray
            transition probability
        emissionProb:ndarray
            emission matrix
            
        Returns
        ----------
        hiddenPath:str
            the most likely hidden path that emits x.
        """
        vertibiGraph=np.empty([len(state), len(string)]) #create an empty matrix to store the node
        vertibiGraph[:,0]=np.multiply(1/len(state),np.array(emissionProb[:,string[0]]))#calcalte the fisrt column
        stateMatrix=np.zeros((len(state), len(string)-1)).astype(np.int32) #stateMatrix is for backtracking
        for col in range(1,len(string)):
            for row in range(len(state)):
                temp=np.multiply(vertibiGraph[:,col-1],transitionProb[:,row]) #temp for get the max
                vertibiGraph[row,col]=np.max(temp)*emissionProb[row,string[col]] #times the emission prob
                #the element of vertibiGraph[row,col-1] tells us which precursor leads to biggest vertibiGraph[row,col]
                stateMatrix[row,col-1]=np.argmax(temp) #stateMatrix[row,col-1] stores the index of max incoming node 
        #----------------------------------------Backtracking--------------------------------------#
        path = np.zeros(len(string)).astype(np.int32)
        path[-1] = np.argmax(vertibiGraph[:, -1]) #the last one is the index of max of the last column 
        viterbiScore=max(vertibiGraph[:, -1])
        for n in range(len(string)-2, -1, -1):#reverse the order
            path[n] = stateMatrix[int(path[n+1]), n]
        hiddenPath='' #now we add the state to the path
        for i in path:
            hiddenPath+=state[i]
        #----------------------------------------Backtracking--------------------------------------#
        return viterbiScore,[state.index(i) for i in hiddenPath]
     
    def estimateEmit(self,string,state,path,emission):
        """
        Estimate emission Matrix 
        
        Parameters
        ----------
        string:str
            the index of observable string the path emits
        state:list
            all the states
        path:str
            hidden path
        emission:list
            all the emission 
        
        Return
        ----------
        np.around(emissionMatrix, decimals=3):ndarray
            The emission Matrix 
        """
        emissionMatrix=np.zeros([len(state),len(emission)]) #initialize the emissionMatrix with 0
        for i in range(len(string)): 
            emissionMatrix[path[i],string[i]]+=1
        for i in range(len(state)):
            if sum(emissionMatrix[i])!=0:#if there are such pairs
                emissionMatrix[i]=emissionMatrix[i]/sum(emissionMatrix[i]) 
            else:#we take the average
                emissionMatrix[i]=1/len(state)
        return np.around(emissionMatrix, decimals=3)
    
    def estimateTrans(self,string,state,path):
        """
        Estimate transition Matrix 
        
        Parameters
        ----------
        string:str
            the index of observable string the path emits
        state:list
            all the states
        path:str
            hidden path
        emission:list
            all the emission 
        
        Return
        ----------
        np.around(transMatrix, decimals=3):ndarray
            The transition Matrix 
        """
        transMatrix=np.zeros([len(state),len(state)])#initialize the transMatrix with 0
        for i in range(len(string)-1):#calulate the counts of 2-mer states
            transMatrix[path[i],path[i+1]]+=1
        for i in range(len(state)):
            if sum(transMatrix[i])!=0:#if there are such pairs
                transMatrix[i]=transMatrix[i]/sum(transMatrix[i])
            else:#we take the average
                transMatrix[i]=1/len(state)
        return np.around(transMatrix, decimals=3)
    
    def viterbiLearning(self):
        """
        Perform viterbi Learning
        
        Returns
        ----------
        state:list
            all the states
        emission:list
            all the emissions
        transitionProb:ndarray
            estimated transition matrix
        emissionProb:ndarray
            estimated emission matrix
        """
        iteration,string,state,emission,transitionProb,emissionProb=self.readHMM() #initial information of the hmm
        viterbiScore,path=self.Vertibi(string,state,emission,transitionProb,emissionProb)#initial hidden path
        i=0
        while i<iteration:
            #keeping runing to update viterbi graph
            transitionProb=self.estimateTrans(string,state,path)
            emissionProb=self.estimateEmit(string,state,path,emission)
            viterbiScore,path=self.Vertibi(string,state,emission,transitionProb,emissionProb)
            i+=1
        return state,emission,transitionProb,emissionProb

### Main

In [15]:
def main(infile):
    '''
    Get the probability of the path here
    
    Parameters
        ----------
        infile : str 
            the filename  

        Returns
        -------
        STDOUT
    '''
    hmm=ViterbiLearning(infile) #instantiation
    state,emission,estimateTrans,estimateEmit=hmm.viterbiLearning()
    print('\t'+'\t'.join(state)) #column name
    for i in range(len(state)): #print the rowname and the matrix
        print(state[i]+'\t'+'\t'.join([str(j) for j in estimateTrans[i]]))
    print('--------')#separate two matrices
    print('\t'+'\t'.join(emission)) #column name
    for i in range(len(state)):#print the rowname and the matrix
        print(state[i]+'\t'+'\t'.join([str(j) for j in estimateEmit[i]]))

### Run the program here

In [17]:
if __name__ == "__main__":
    main('rosalind_ba10i.txt')

	A	B	C
A	0.333	0.333	0.333
B	0.0	0.411	0.589
C	0.0	0.791	0.209
--------
	x	y	z
A	0.333	0.333	0.333
B	0.474	0.526	0.0
C	0.302	0.0	0.698
