# Exam 1 - Baum Welch

---
## Imports

In [None]:
import numpy as np
import json

In [None]:
class HMM(object):
    """Main class for HMM objects
    
    Class for holding HMM parameters and to allow for implementation of
    functions associated with HMMs
    
    Private Attributes:
        _alphabet (set): The alphabet of emissions
        _hidden_states (set): Hidden states in the model
        _transitions (dict(dict)): A dictionary of transition probabilities
        _emissions (dict(dict)): A dictionary of emission probabilities
        _initial (dict): A dictionary of initial state probabilities

    """

    __all__ = ['viterbi', 'forward', 'backward', 'forward_backward']

    def __init__(self, alphabet, hidden_states, A=None, E=None, B=None, seed=None):
        self._alphabet = set(alphabet)
        self._hidden_states = set(hidden_states)
        self._transitions = A
        self._emissions = E
        self._initial = B
        self._seed = seed
        if(self._transitions == None):
            self._initialize_random(self._alphabet, self._hidden_states, self._seed)
            
    def __str__(self):
        out_text = [f'Alphabet: {self._alphabet}',
                    f'Hidden States: {self._hidden_states}',
                    f'Initial Probabilities: {json.dumps(self._initial, sort_keys = True, indent=4)}',
                    f'Transition Probabilities: {json.dumps(self._transitions, sort_keys = True, indent=4)}',
                    f'Emission Probabilities: {json.dumps(self._emissions, sort_keys = True, indent=4)}']
        return '\n'.join(out_text)
    
    @classmethod
    def __dir__(cls):
        return cls.__all__
    
    def _emit(self, cur_state, symbol):
        return self._emissions[cur_state][symbol]
    
    def _transition(self, cur_state, next_state):
        return self._transitions[cur_state][next_state]
    
    def _init(self, cur_state):
        return self._initial[cur_state]

    def _states(self):
        for k in self._hidden_states:
            yield k
    
    
    def _get_alphabet(self):
        for sigma in self._alphabet:
            yield sigma
            
    def _initialize_random(self, alphabet, states, seed):
        alphabet = list(set(alphabet))
        alphabet.sort()
        states = list(set(states))
        states.sort()
        self._alphabet = alphabet
        self._hidden_states = states

        #Initialize empty matrices A and E with pseudocounts
        A = {}
        E = {}
        I = {}
        np.random.seed(seed=seed)
        I_rand = np.random.dirichlet(np.ones(len(self._hidden_states)))
        for i, state in enumerate(self._states()):
            E[state] = {}
            A[state] = {}
            I[state] = I_rand[i]
            E_rand = np.random.dirichlet(np.ones(len(self._alphabet)))
            A_rand = np.random.dirichlet(np.ones(len(self._hidden_states)))
            for j, sigma in enumerate(self._get_alphabet()):
                E[state][sigma] = E_rand[j]
            for j, next_state in enumerate(self._states()):
                A[state][next_state] = A_rand[j]
                
        self._transitions = A
        self._emissions = E
        self._initial = I
        return
    
    def viterbi(self, sequence):
        """ The viterbi algorithm for decoding a string using a HMM

        Args:
            sequence (list): a list of valid emissions from the HMM

        Returns:
            result (list): optimal path through HMM given the model parameters
                           using the Viterbi algorithm
        
        Pseudocode for Viterbi:
            Initialization (𝑖=0): 𝑣𝑘(𝑖)=𝑒𝑘(𝜎)𝑏𝑘.
            Recursion (𝑖=1…𝑇): 𝑣𝑙(𝑖)=𝑒𝑙(𝑥𝑖) max𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙); 
                                ptr𝑖(𝑙)= argmax𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙).
            Termination: 𝑃(𝑥,𝜋∗)= max𝑘(𝑣𝑘(𝑙)𝑎𝑘0); 
                             𝜋∗𝑙= argmax𝑘(𝑣𝑘(𝑙)𝑎𝑘0).
            Traceback: (𝑖=𝑇…1): 𝜋∗𝑖−1= ptr𝑖(𝜋∗𝑖).
        """

        # Initialization (𝑖=0): 𝑣𝑘(𝑖)=𝑒𝑘(𝜎)𝑏𝑘.
        # Initialize trellis and traceback matrices
        # trellis will hold the vi data as defined by Durbin et al.
        # and trackback will hold back pointers
        trellis = {} # This only needs to keep the previous column probabilities
        traceback = [] # This will need to hold all of the traceback data so will be an array of dicts()
        for state in self._states():
            trellis[state] = np.log10(self._init(state)) + np.log10(self._emit(state, sequence[0])) # b * e(0) for all k
            
        # Next we do the recursion step:
        # Recursion (𝑖=1…𝑇): 𝑣𝑙(𝑖)=𝑒𝑙(𝑥𝑖) max𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙); 
        #                 ptr𝑖(𝑙)= argmax𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙).
        for t in range(1, len(sequence)):  # For each position in the sequence
            trellis_next = {}
            traceback_next = {}

            for next_state in self._states():    # Calculate maxk and argmaxk
                k={}
                for cur_state in self._states():
                    k[cur_state] = trellis[cur_state] + np.log10(self._transition(cur_state, next_state)) # k(t-1) * a
                argmaxk = max(k, key=k.get)
                trellis_next[next_state] =  np.log10(self._emit(next_state, sequence[t])) + k[argmaxk] # k * e(t)
                traceback_next[next_state] = argmaxk
                
            #Overwrite trellis 
            trellis = trellis_next
            #Keep trackback pointer matrix
            traceback.append(traceback_next)
            
        # Termination: 𝑃(𝑥,𝜋∗)= max𝑘(𝑣𝑘(𝑙)𝑎𝑘0); 
        #                  𝜋∗𝑙= argmax𝑘(𝑣𝑘(𝑙)𝑎𝑘0).
        max_final_state = max(trellis, key=trellis.get)
        max_final_prob = trellis[max_final_state]
                
        # Traceback: (𝑖=𝑇…1): 𝜋∗𝑖−1= ptr𝑖(𝜋∗𝑖).
        result = [max_final_state]
        for t in reversed(range(len(sequence)-1)):
            result.append(traceback[t][max_final_state])
            max_final_state = traceback[t][max_final_state]

        return result[::-1]

    def forward(self, sequence):
        """ The forward algorithm for calculating probability of sequence given HMM

        Args:
            sequence (list): a list of valid emissions from the HMM

        Returns:
            result (float, list of dicts): P(x) and the f matrix as a list
        
        Pseudocode for Forward:
            Initialization (𝑖=0): 𝑓𝑘(0)=𝑒𝑘(𝜎0)𝑏𝑘.
            Recursion (𝑖=1…𝑇): 𝑓𝑙(𝑖)=𝑒𝑙(𝜎𝑖)∑𝑘(𝑓𝑘(𝑖−1)𝑎𝑘𝑙)
            Termination: 𝑃(𝑥)=∑𝑘𝑓𝑘(𝑇)
        """

        # Initialization (𝑖=0): 𝑓𝑘(0)=𝑒𝑘(𝜎0)𝑏𝑘.
        # Initialize f
        f = [] # For this algorithm it is helpful to keep this entire matrix
        f.append({})
        for state in self._states():
            f[-1][state] = self._init(state) * self._emit(state, sequence[0]) # b * e(0) for all k

        # Next we do the recursion step:
        # Recursion (𝑖=1…𝑇): 𝑓𝑙(𝑖)=𝑒𝑙(𝜎𝑖)∑𝑘(𝑓𝑘(𝑖−1)𝑎𝑘𝑙) 
        for i in range(1, len(sequence)):  # For each position in the sequence
            f.append({})
            for next_state in self._states(): # For each state
                f[-1][next_state] = 0
                for cur_state in self._states():
                    f[-1][next_state] += f[i-1][cur_state] * self._transition(cur_state, next_state) # sum of f(i-1) * a
                f[-1][next_state] =  self._emit(next_state, sequence[i]) * f[-1][next_state] # f * e(i)
        
        # Termination: 𝑃(𝑥)=∑𝑘𝑓𝑘(𝑇)
        Px = 0
        for state in self._states():
            Px += f[-1][state]
            
        return Px, f

    def backward(self, sequence):
        """ The backward algorithm for calculating probability of sequence given HMM

        Args:
            sequence (list): a list of valid emissions from the HMM

        Returns:
            result (float, list of dicts): P(x) and the b matrix as a list
        
        Pseudocode for Backward:
            Initialization (𝑖=T): 𝑟𝑘(𝑇)=1.
            Recursion (𝑖=𝑇−1…1): 𝑟𝑘(𝑖)=∑𝑙𝑟𝑙(𝑖+1)𝑎𝑘𝑙𝑒𝑙(𝜎𝑖+1)
            Termination: 𝑃(𝑥)=∑𝑙𝑟𝑘(1)𝑒𝑙(𝜎1)𝑏𝑙
        """

        # Initialization (𝑖=T): 𝑟𝑘(𝑇)=1.
        # Initialize r
        r = [] # For this algorithm it is helpful to keep this entire matrix
        r.insert(0, {})
        for state in self._states():
            r[0][state] = 1 # 1 for all k

        # Next we do the recursion step:
        # Recursion (𝑖=T-1…1): 𝑟𝑘(𝑖)=∑𝑙𝑟𝑙(𝑖+1)𝑎𝑘𝑙𝑒𝑙(𝜎𝑖+1)
        for i in range(len(sequence)-1, 0, -1):  # For each position in the sequence in reverse
            r.insert(0, {}) # append a new item at the beginning
            for prev_state in self._states(): # For each state
                r[0][prev_state] = 0
                for next_state in self._states():
                    r[0][prev_state] += r[1][next_state] * self._transition(prev_state, next_state) * self._emit(next_state, sequence[i])

        # Termination: 𝑃(𝑥)=∑𝑙𝑟𝑘(1)𝑒𝑙(𝜎1)𝑏𝑙
        Px = 0
        for state in self._states():
            Px += r[0][state] * self._init(state) * self._emit(state, sequence[0])
                        
        return Px, r
    
    def forward_backward(self, sequence):
        """ The forward-backward algorithm for calculating marginal posteriors given HMM

        Args:
            sequence (list): a list of valid emissions from the HMM

        Returns:
            posterior (list of dicts): all posteriors as a list
        
        Pseudocode for Forward-Backward:
            Calculate f[] as forward algorithm
            Calculate r[] as backward algorithm
            for all i in sequence
                for all states
                    posterior[i][state] = f[i][state] * r[i][state] / Px
        """        
        #Calculate forward and backward matrices
        f_Px, f_matrix = self.forward(sequence)
        r_Px, r_matrix = self.backward(sequence)
    
        posterior = []
        for i in range(0, len(sequence)):  # For each position in the sequence
            posterior.append({})
            for state in self._states(): # For each state
                posterior[i][state] = f_matrix[i][state] * r_matrix[i][state] / f_Px
                
        return posterior
    
    def baum_welch(self, sequences, pseudocount=1e-100):

        #Initialize empty matrices A and E with pseudocounts
        A = {}
        E = {}
        I = {}
        for state in self._states():
            E[state] = {}
            A[state] = {}
            I[state] = pseudocount
            for sigma in self._get_alphabet():
                E[state][sigma] = pseudocount
            for next_state in self._states():
                A[state][next_state] = pseudocount
        
        outer_denom = 0
        for j, x in enumerate(sequences):
            #Calculate forward and backward matrices
            f_Px, f_matrix = self.forward(list(x))
            r_Px, r_matrix = self.backward(list(x))
            
            #track outer sum:
            outer_denom += 1/f_Px
  
            #Calculate E and I inner sums (I is 0th position)
            for i, emission in enumerate(list(x)):
                if i == 0: #Calculate I inner sums
                    for state in self._states():
                        I[state] += f_matrix[0][state] * r_matrix[0][state] # this also works
#                        I[state] += self._init(state) * self._emit(state, emission) * r_matrix[0][state]
                for state in self._states():
                    E[state][emission] += f_matrix[i][state] * r_matrix[i][state]


            #Calculate A inner sums
            for i in range(len(x)-1):
                emission = x[i+1]
                for state in self._states():
                    for next_state in self._states():
                        A[state][next_state] += f_matrix[i][state] * self._transition(state, next_state) * self._emit(next_state, emission) * r_matrix[i+1][next_state]                                        
                    
        #Scale by all probablities
        for state in self._states():
            I[state] = I[state] * outer_denom
            for sigma in self._get_alphabet():
                E[state][sigma] = E[state][sigma] * outer_denom
            for next_state in self._states():
                A[state][next_state] = A[state][next_state] * outer_denom
        
        # Update model parameters
        for state in self._states():
            self._initial[state] = I[state] / sum(I.values())
            for sigma in self._get_alphabet():
                self._emissions[state][sigma] = E[state][sigma] / sum(E[state].values())
            for next_state in self._states():
                self._transitions[state][next_state] = A[state][next_state] / sum(A[state].values())
                
        return


        

In [None]:
# This section of code will initialize your HMM with parameters as defined in the lecture slides
# for the identification of CpG Islands.
# All of this should be able to run whether or not you implement the Viterbi function!

hidden_states = ('I', 'G') # CpG Island or Genome
alphabet = ('A', 'C', 'G', 'T') # DNA Alphabet

model = HMM(alphabet, hidden_states, seed=70)

sequence = "ACGCGATCATACTATATTAGCTAAATAGATACGCGCGCGCGCGCGATATATATATATAGCTAATGATCGATTACCCCCCCCCCCAATTA"

print(model)

In [None]:
b =  {
    "G": 0.43969207905818775,
    "I": 0.5603079209418123
}
t = {
    "G": {
        "G": 0.29312969335213396,
        "I": 0.706870306647866
    },
    "I": {
        "G": 0.2246297393682648,
        "I": 0.7753702606317352
    }
}
e = {
    "G": {
        "A": 0.33376003851215813,
        "C": 0.2956074592103019,
        "G": 0.22138501650809111,
        "T": 0.1492474857694488
    },
    "I": {
        "A": 0.2327107586834082,
        "C": 0.6239439268290141,
        "G": 0.10091908426096824,
        "T": 0.04242623022660947
    }
}
model = HMM(alphabet, hidden_states, A=t, E=e, B=b)
print(model)

In [None]:
last_Px = 0
for i in range(100):
    # Check for convergence
    model.baum_welch([sequence])
    f_Px, f_matrix = model.forward(list(sequence))
    if last_Px == f_Px:
        break
    last_Px = f_Px
    print(f_Px)
print(model)

In [None]:
print(sequence)
print (''.join(model.viterbi(list(sequence))))
