In [1]:
# !/usr/bin/env python3
import sys
import itertools
import time



########################################################################
# File:problem26.py
#  executable: problem26.py
#  purpose: Implement Baum-Welch Learning
#  stderr: errors and status
#  stdout:
#
# Author: Arushi Mithal
#
# Notes:  1. To run the program from command line terminals:
#          Unix/Windows: python  problem26.py < input.txt > output.out
#
# Laptop, where test were running, specs:
#        Windows 10-64bit. Processor i-5 5200U CPU @2.20GHz 2.20 GHz
#        Internal RAM  4.00 GB
########################################################################
class BaumWelch:
    """
         Used to implement Baum-Welch Learning

          From the input file, store number of iterations, string x, observable states, hidden states, transition
          matrix, and emission matrix of an HMM. Train the data by first finding the forward and backward path
          probabilities with the input data, then calculating the node responsibility matrix and edge responsibility
          matrix using soft decoding algorithm, and with the help of these, re-estimate the transition and emission
          matrices. Repeat this process as many times as the number of iterations provided in the input, then print
          the final transition and emission matrices produced.

          use Commandline: python problem26.py < input.txt > output.out
        """

    def __init__(self, iterations, emittedString, alphabets, states, transitionDict, emissionDict):
        self.iterations = iterations
        self.observedString = emittedString
        self.possibleObservableStates = alphabets
        self.possibleHiddenStates = states
        self.transitionMatrix = transitionDict
        self.emissionMatrix = emissionDict
        self.initialProbabilities = {}
        self.outcomeLikelihood = 0
        self.forwardPathProbabilities = []
        self.initialBackwardProbabilities = {}
        self.backwardPathProbabilities = []
        self.eStepResult = {}

    def forward(self):
        """ Calculate and make a list of forward path probabilities for all hidden states at every time step"""
        pathProbabilities = []
        for i in range(len(self.possibleHiddenStates)):  # calculate initial probabilities for forward path
            self.initialProbabilities[self.possibleHiddenStates[i]] = 1 / len(self.possibleHiddenStates) * \
                                                                      self.emissionMatrix[
                                                                          self.observedString[0] +
                                                                          self.possibleHiddenStates[i]]
        pathProbabilities.append(self.initialProbabilities)
        for t in range(1, len(self.observedString)):  # iterate over observed string starting from index 1
            newPathProbabilities = {}
            for hiddenState in self.possibleHiddenStates:  # iterate over the hidden states
                tempProbabilityDict = {}
                for key, value in pathProbabilities[
                    t - 1].items():  # for every key-value pair in the path probability at index before the t
                    tempProbabilityDict[key + hiddenState] = value * self.transitionMatrix[key + hiddenState] * \
                                                             self.emissionMatrix[
                                                                 self.observedString[t] + hiddenState]
                    # calculate the probability of that particular transition
                newPathProbabilities[hiddenState] = sum(tempProbabilityDict.values())  # add all the path probabilites
                # for every iteration of key-value pair in the path probability before the current index
            pathProbabilities.append(newPathProbabilities)
        for values in pathProbabilities[-1].values():
            self.outcomeLikelihood += values  # calculate P(x) by adding the sum of values of forward path
            # probabilities for all hidden states at sink node
        return pathProbabilities

    def backward(self):
        """ Calculate and make a list of backward path probabilities for all hidden states at every time step"""
        self.backwardPathProbabilities = []
        reversedObservedString = self.observedString[::-1]  # first reverse the string
        for i in range(len(self.possibleHiddenStates)):  # the initial probabilities for each hidden state are set to 1
            self.initialBackwardProbabilities[self.possibleHiddenStates[i]] = 1
        self.backwardPathProbabilities.append(self.initialBackwardProbabilities)
        for t in range(len(reversedObservedString) - 1):  # iterate over observed string in reverse
            newPathProbabilities = {}
            for hiddenState in self.possibleHiddenStates:  # iterate over hidden states
                tempProbabilityDict = {}
                for key, value in self.backwardPathProbabilities[
                    t].items():  # iterate over path probabilities dict pairs
                    tempProbabilityDict[key + hiddenState] = value * self.transitionMatrix[hiddenState + key] * \
                                                             self.emissionMatrix[
                                                                 reversedObservedString[t] + key]
                # calculate probabilties of transition values for each hidden state and add them
                newPathProbabilities[hiddenState] = sum(tempProbabilityDict.values())
            self.backwardPathProbabilities.append(newPathProbabilities)  # append that to path probabilities
        self.backwardPathProbabilities = self.backwardPathProbabilities[::-1]  # reverse that list
        return self.backwardPathProbabilities

    def edgeResponsibilityMatrix(self):
        """ used to calculate edge responsibility matrix using soft decoding problem for an edge"""
        for transition in self.transitionMatrix:  # iterate over the transition matrix
            self.eStepResult[transition] = []  # and use the keys as keys for result dict
        for i in range(len(self.forwardPathProbabilities) - 1):  # iterate over forward path probabilities
            for j in self.eStepResult:  # iterate over every key and value in forward path probability at index i and
                # calculate soft Decoding probability
                k = j[:1]
                l = j[1:]
                value = (self.forwardPathProbabilities[i][k] * self.backwardPathProbabilities[i + 1][l] *
                         self.transitionMatrix[k + l] * self.emissionMatrix[
                             self.observedString[i + 1] + l]) / self.outcomeLikelihood
                self.eStepResult[k + l].append(value)
        newTransitionMatrix = {}
        for i in self.eStepResult:
            newTransitionMatrix[i] = sum(self.eStepResult[i])  # for each key, add all values
        return newTransitionMatrix

    def nodeResponsibilityMatrix(self):
        """ Used to calculate node responsibility matrix using soft decoding problem for a node """
        result = {}  #initialise dict called result
        for emission in self.emissionMatrix:  # iterate over the emission matrix and use the keys as keys for result
            result[emission] = []
        for i in range(len(self.forwardPathProbabilities)):  # iterate over forward path probabilities
            for key, value in self.forwardPathProbabilities[i].items():  # iterate over every key and value in forward
                # path probability at index i and calculate soft Decoding probabilitiy
                result[self.observedString[i] + key].append(
                    value * self.backwardPathProbabilities[i][key] / self.outcomeLikelihood)
        newEmissionMatrix = {}
        for key, value in result.items():  # for each key, add all values
            newEmissionMatrix[key] = sum(value)
        return newEmissionMatrix

    def parameterEstimator(self):
        """ Used to estimate the parameters of transition matrix and emission matrix"""
        emissionMatrix = self.nodeResponsibilityMatrix()
        transitionMatrix = self.edgeResponsibilityMatrix()
        #print("Transition Matrix")
        #print(transitionMatrix)
        transitionEstimator = {}  # initialise transition estimator
        total = {}
        for i in self.possibleHiddenStates:  # iterate over hidden states
            total[i] = 0  # hidden state used as key, value set to 0 in total dict
        for key, transition in transitionMatrix.items():  # iterate over transition matrix
            total[key[:1]] += transition  # change value in total dict by adding transition probability values
        for key, value in transitionMatrix.items():  # iterate over transition matrix and calculate transition estimator
            transitionEstimator[key] = value / total[key[:1]]
        emissionEstimator = {}  # initialise emission estimator
        total1 = {}
        for i in self.possibleHiddenStates:  # iterate over hidden states
            total1[i] = 0  # hidden state used as key, value set to 0 in total dict
        
        for key, value in emissionMatrix.items():  # iterate over emission Matrix
            total1[key[1:]] += value  # change value in total dict by adding emission probability values

        for key, value in emissionMatrix.items():  # iterate over emission matrix and calculate emission estimator
            emissionEstimator[key] = value / total1[key[1:]]
        #print("inside, length", emissionEstimator)
        return transitionEstimator, emissionEstimator

    def training(self):
        for i in range(self.iterations):  # iterate
            self.forwardPathProbabilities = self.forward()  # find new forward path probabilities
            
            self.backwardPathProbabilities = self.backward()  # find new backward path probabilities
            self.transitionMatrix, self.emissionMatrix = self.parameterEstimator()  # new transition and emission matrices
        return self.transitionMatrix, self.emissionMatrix  # return the final transition and emission matrices

    def print(self):
        """   print the results in Rosalind Format     """
        transitionList = [self.possibleHiddenStates]
        for i in range(0, len(self.possibleHiddenStates)):
            tempList = [self.possibleHiddenStates[i]]
            for j in range(0, len(self.possibleHiddenStates)):
                tempList.append('{:.3f}'.format(
                    round(self.transitionMatrix[self.possibleHiddenStates[i] + self.possibleHiddenStates[j]], 3)))
            transitionList.append(tempList)     
        print("\t" + "\t".join(transitionList[0]))
        for i in range(1, len(transitionList)):
            print("\t".join(transitionList[i]))
        print("--------")

        emissionList = [self.possibleObservableStates]
        for i in range(0, len(self.possibleHiddenStates)):
            tempList = [self.possibleHiddenStates[i]]
            for j in range(0, len(self.possibleObservableStates)):
                tempList.append(
                    '{:.3f}'.format(
                        round(self.emissionMatrix[self.possibleObservableStates[j] + self.possibleHiddenStates[i]], 3)))
            emissionList.append(tempList)
        print("\t" + "\t".join(emissionList[0]))
        for i in range(1, len(emissionList)):
            print("\t".join(emissionList[i]))
        

def parse():
    """ Used to parse the input file data """
    data = []
    
    filename1= "rosalind_ba10k_762_9_dataset.txt"
    output_file=sys.argv[2]
    #print("first")
    with open(filename1, "r") as file:
    #with open("test.txt", "r") as file:
        for files in file:
            data.append(files.rstrip())
    #print(data)
    #iterations = 1
    iterations = int(data[0])
    emittedString = data[2]
    alphabets = data[4].split()
    states = data[6].split()
    table = data[8:]
    transition = []
    emission = []
    s = '--------'
    tables = [list(y) for x, y in itertools.groupby(table, lambda z: z == s) if not x]
    tempList = [t.strip('\t') for t in tables[0]]
    for t in tempList:
        tList = t.split('\t')
        transition.append(tList)
    transitionDict = {}
    for i in range(0, len(states)):
        for j in range(1, len(states) + 1):
            transitionDict[transition[0][i] + transition[j][0]] = float(transition[i + 1][j])
    tempList = [t.strip('\t') for t in tables[1]]
    for t in tempList:
        tList = t.split('\t')
        emission.append(tList)
    emissionDict = {}
    for i in range(0, len(alphabets)):
        for j in range(1, len(states) + 1):
            emissionDict[emission[0][i] + emission[j][0]] = float(emission[j][i + 1])
    # print(iterations, "\n", emittedString, "\n", alphabets, "\n", states, "\n", transitionDict, "\n", emissionDict)
    return iterations, emittedString, alphabets, states, transitionDict, emissionDict


def main():
    iterations, emittedString, alphabets, states, transitionDict, emissionDict = parse()
    start_time = time.time()
    v1 = BaumWelch(iterations, emittedString, alphabets, states, transitionDict, emissionDict)
    v1.training()
    print(" %s seconds " % (time.time() - start_time))

    v1.print()


if __name__ == '__main__':
    main()


 0.5382184982299805 seconds 
	A	B	C	D
A	0.000	0.087	0.232	0.680
B	0.566	0.434	0.000	0.000
C	0.442	0.124	0.140	0.294
D	0.001	0.401	0.548	0.050
--------
	x	y	z
A	0.000	0.067	0.933
B	0.663	0.000	0.337
C	0.000	0.975	0.025
D	0.989	0.000	0.011
