# Viterbi

---

## Before Class
In class today we will be implementing the Viterbi algorithm to identify the most likely path through states given model parameters.
Prior to class, please do the following:
1. Review slides on Hidden Markov models in detail
* Focus on how to conceptually translate the algorithm to code
* Understand what argmax versus max means
* How does one implement a max function? argmax?
* Take a look at what arithmetic underflow is.

---
## Learning Objectives

1. Conceptually understand Hidden Markov Models
* Implement a basic HMM
* Implement the Viterbi algorithm

---
## Background

In the last class we described Markov chains. Here we expand this idea to the concept of a hidden state variable along with observed emissions from the model. We will be using the example of CpG islands from the lecture slides. I have provided the class structure of a simple HMM below. All parameters to this model must be provided as inputs, so essentially this is a class containing the parameters described below:

We define a categorical Hidden Markov Model as $M = (\Sigma, Q, \Theta)$ with the following parameters:

$\Sigma$ : Finite alphabet of symbols (eg. A, C, G, T)

$Q$ : Finite discrete hidden states

$\Theta$: set of probabilities containing: $A$ as transition probabilites $a_{kl}$ for all $k,l \in Q$ and $E$ as emission probabilities $e_k(\sigma)$ for all $k \in Q$ and $\sigma \in \Sigma$ and $B$ as starting probabilities $b_k$ for all $k \in Q$.

We also define a number of $T$ emissions as $y_t = 1 \dots T$ that are drawn from $\Sigma$ and hidden states as $\pi_t = 1 \dots T$ that are drawn from $Q$.

The goal today will be to estimate $\pi^*$, the most probable path through the hidden states $Q$ when a HMM $M$ is provided.

We will be following the definition described in the slides as described below:


---
## Imports

In [5]:
import numpy as np

---
## Viterbi algorithm

To estimate $\pi^*$, the most probable path through the hidden states, we will use the Viterbi algorithm, which is a dynamic programming exercise. Calculate viterbi algorithm $v()$ as:

Initialization ($i = 0$): $v_{k}(i) = e_{k}(\sigma)b_{k}$.

Recursion ($i = 1 \dots T$): 
$v_{l}(i) = e_{l}(x_{i})$ max$_{k}(v_{k}(i-1)a_{kl})$;  ptr$_{i}(l) = $ argmax$_{k}(v_{k}(i-1)a_{kl})$.

Termination: $P(x, \pi^{*}) =$ max$_{k}(v_{k}(l)a_{k0})$; $\pi^{*}_{l} = $ argmax$_{k}(v_{k}(l)a_{k0})$.

Traceback: ($i = T\dots1$): $\pi^{*}_{i-1} = $ ptr$_{i}(\pi^{*}_{i})$.

A few implementation notes:
1. Break the code up into each of the above phases of the algorithm!
2. You will probably want to move all of your probabilities into log space so that you don't get underflow errors!

In [6]:
class HMM(object):
    """Main class for HMM objects
    
    Class for holding HMM parameters and to allow for implementation of
    functions associated with HMMs
    
    Private Attributes:
        _alphabet (set): The alphabet of emissions
        _hidden_states (set): Hidden states in the model
        _transitions (dict(dict)): A dictionary of transition probabilities
        _emissions (dict(dict)): A dictionary of emission probabilities
        _initial (dict): A dictionary of initial state probabilities

    """

    def __init__(self, alphabet, hidden_states, A=None, E=None, B=None):
        self._alphabet = set(alphabet)
        self._hidden_states = set(hidden_states)
        self._transitions = A
        self._emissions = E
        self._initial = B
        
    def _emit(self, cur_state, symbol):
        return self._emissions[cur_state][symbol]
    
    def _transition(self, cur_state, next_state):
        return self._transitions[cur_state][next_state]
    
    def _init(self, cur_state):
        return self._initial[cur_state]

    def _states(self):
        for k in self._hidden_states: #will want to use these for states. 
            yield k
        
    def viterbi(self, sequence):
        """ The viterbi algorithm for decoding a string using a HMM

        Args:
            sequence (list): a list of valid emissions from the HMM

        Returns:
            result (list): optimal path through HMM given the model parameters
                           using the Viterbi algorithm
        
        Pseudocode for Viterbi:
            Initialization (𝑖=0): 𝑣𝑘(𝑖)=𝑒𝑘(𝜎)𝑏𝑘.
            Recursion (𝑖=1…𝑇): 𝑣𝑙(𝑖)=𝑒𝑙(𝑥𝑖) max𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙); 
            
            for each observation, calclate all probabilties that you came from. 
            take the max of the value, and keep track with ptr(trace back array), which state you chose that was the max, 
            keep track of which five you came from. 
            
            pick the largest value, that's the optimal state, then trace back through. 
            
                                ptr𝑖(𝑙)= argmax𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙).
            Termination: 𝑃(𝑥,𝜋∗)= max𝑘(𝑣𝑘(𝑙)𝑎𝑘0); 
                             𝜋∗𝑙= argmax𝑘(𝑣𝑘(𝑙)𝑎𝑘0).
            Traceback: (𝑖=𝑇…1): 𝜋∗𝑖−1= ptr𝑖(𝜋∗𝑖).
        """
        
        grid = {}
        backward_pointer = []
        
        for state in self._states(): #island or genome
            grid[state] = np.log10(self._init(state)) + np.log10(self._emit(state, sequence[0])) # b * e(0) for all k
        
        for pos in range(1, len(sequence)): #need to exclude the first element since that is already done. 
            grid_next = {}
            backward_pointer_next = {}
            
            for next_state in self._states():
                k = {}
                for cur_state in self._states():
                    k[cur_state] = grid[cur_state] + np.log10(self._transition(cur_state, next_state))
                max_value = max(k, key = k.get)
                print(max_value)
                grid_next[next_state] = np.log10(self._emit(next_state, sequence[pos])) + k[max_value]
                backward_pointer_next[next_state] = max_value
            
            grid = grid_next 
            backward_pointer.append(backward_pointer_next)
            
        
        max_final_state = max(grid, key = grid.get) #find the max but only return the key 
        max_final_prob = grid[max_final_state] #use the key for that max to get the value and set that equal to this variable. 
        
        # result = [max_final_state]
        # print(result)
        
        result = [max_final_state]
        for i in reversed(range(len(sequence) - 1)): #you have to subtract one to remain within the index range. 
            result.append(backward_pointer[i][max_final_state]) #for every position give me the max final state, which we have calculated above. 
            # print(backward_pointer[i][max_final_state])
                  
                  
            # max_final_state = traceback[t][max_final_state]
            # print(max_final_state)
            
        return result[::-1]
        
        
        
        
        
        

In [7]:
# # this is my work that was not working for some reason. 

# # move everything to log space to avoid underflow errors.
#         # Initialization (𝑖=0): 𝑣𝑘(𝑖)=𝑒𝑘(𝜎)𝑏𝑘.
#         grid = {} #will hold the vi data that's in the matrix in the PPT. So it holds all the probabilites at a given state. 
#         #make this a dictionary "KEY" = "PROB"
#         #will only keep the previous column because those values are what are needed to calculate the next column. So we only need the previous column. 
        
#         backward_pointer = [] #a list of dictionaries. we need to keep track of all the traceback data. a list for every position and a dictionary for every 
#         #state, will need all the traceback data. 
#         #initialize the grid
#         #Initialization (𝑖=0): 𝑣𝑘(𝑖)=𝑒𝑘(𝜎)𝑏𝑘.
#         #whatever bases emitted in first position x prob of going into that at the beginning of the state. 
        
#         #so I need to go through each state. 
#         for state in self._states(): #for every hidden state either island or genome. 
#             #we need to multiply the emission state by the beginning probabilities
#             grid[state] = np.log10(self.init(state)) + np.log10(self._emit(state, sequence[0])) #b * e(0) for all k.  
#             #we want the first nc in the sequence. 
            
#         # print(grid)
        

#         #Recursion (𝑖=1…𝑇): 𝑣𝑙(𝑖)=𝑒𝑙(𝑥𝑖) max𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙); loop through all the states. 
#         #ptr 𝑖(𝑙)=  argmax 𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙) completing the rest of the PPT and calcualting the v position and keeping track of the pointer. 
#         for pos in range(1, len(sequence)): #want to exclude the first position since it's already calculated. 
#             grid_next = {}
#             backward_pointer_next = {}
#             #vl is next item in the grid = emission at next pos times max of the prob of one of the previous states times the transition from that state.
            
#             #emission of the next position x max of probability of one of the previous states AFTER calculating both states x transition of that state 
#             #for every possible next state 
#             for next_state in self._states(): #genome or CpG island, for every possible next state. 
#                 #next state relies on the max probabilties of each current state. So we need a for loop to calculate the maximum for each state
#                 k = {}  #this will contain the max
#                 #calcualte the two values for each current and take the max
#                 for cur_state in self._states(): #going to take the max from these two. 
#                     #now going to fill in the empty dictionary to get the values to take the max. 
#                     #here we're adding the log space of the value from the first column in the grid to the transition state. 
                    
#                     #𝑣𝑙(𝑖)=𝑒𝑙(𝑥𝑖) max𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙)
#                     #this is just filling in the empty k dictionary 
#                     k[cur_state] = grid[cur_state] + np.log10(self._transition(cur_state, next_state))
               
               
#                 #this is the equation to find the max in k, and ton only return the key and not the value. 
#                 value_max = max(k, key = k.get)
#                 #now we have the max value so we need to multiply it by the emission from the next state. 
#                 #now we want to grab the maximum value from k. 
#                 grid_next[next_state] = k[value_max] + np.log10(self._emit(next_state, sequence[pos])
                
#                 #ptr 𝑖(𝑙)=  argmax 𝑘(𝑣𝑘(𝑖−1)𝑎𝑘𝑙) keep track of the pointer. It's just the argmax. 
#                 backward_pointer_next[next_state] = 
                
#                 #Overwrite the exisiting grid values so that it is always the most previous values. 
#                 grid = grid_next 
#                 backward_pointer.append(backward_pointer_next) #appending the dictionary. 
            
        
        
        
#         #Termination: 𝑃(𝑥,𝜋∗)= max𝑘(𝑣𝑘(𝑙)𝑎𝑘0); 
                             
#         # 𝜋∗𝑙= argmax𝑘(𝑣𝑘(𝑙)𝑎𝑘0).  
#         #we've alredy calculated it since the loop continues through to the very end of the sequence. 
#         #so the maximum of the final two states. 
        
#         #so first set up the max equation. Give us the name of the final maximum state. 
#         max_final_state = max(grid, key = grid.get) #gives us the final state
#         max_final_prob = grid[max_final_state] #gives the prob 
        
#         # print(grid)
#         # print(backward_pointer)
            
#         #Traceback: (𝑖=𝑇…1): 𝜋∗𝑖−1= ptr𝑖(𝜋∗𝑖).
#         #which state did the max_final_state come from? 
#         result = [max_final_state] #putting the last element already into a list. 
#         for pos in reversed(range(len(sequence) -1)): #excluding the last element. 
#             result.append(backward_pointer[pos][max_final_state])
#             # max_final_state = backward_pointer[pos][max_final_state]
        
        
#         return result[::-1]
            

In [8]:
# This section of code will initialize your HMM with parameters as defined in the lecture slides
# for the identification of CpG Islands.
# All of this should be able to run whether or not you implement the Viterbi function!

hidden_states = ('I', 'G') # CpG Island or Genome
alphabet = ('A', 'C', 'G', 'T') # DNA Alphabet

# These are the initial probabilities as defined in the lecture slides
initial_probabilities = {
    'I' : 0.1,
    'G' : 0.9
}

# These are the probabilities of transitioning from outer state to inner state
#  as defined in the lecture slides
transition_probabilities = {
    'I': { 'I' : 0.6, 'G' : 0.4 },
    'G': { 'I' : 0.1, 'G' : 0.9 }
}

# These are the probabilites of each state emmitting each alphabet character
emission_probabilities = {
    'I': { 'A' : 0.1, 'C' : 0.4, 'G' : 0.4, 'T' : 0.1 },
    'G': { 'A' : 0.4, 'C' : 0.1, 'G' : 0.1, 'T' : 0.4 }
}

# Build the model
model = HMM(alphabet, hidden_states, transition_probabilities, emission_probabilities, initial_probabilities)

In [9]:
# Exact example from slides
sequence = "ACGCGATC"
print(sequence)
print (''.join(model.viterbi(list(sequence))))

# A slightly more complex example
sequence = "ACGCGATCATACTATATTAGCTAAATAGATACGCGCGCGCGCGCGATATATATATATAGCTAATGATCGATTACCCCCCCCCCCAATTA"
print(sequence)
print (''.join(model.viterbi(list(sequence))))

ACGCGATC
G
G
G
I
G
I
I
I
I
I
G
I
G
G
GGGIIGGG
ACGCGATCATACTATATTAGCTAAATAGATACGCGCGCGCGCGCGATATATATATATAGCTAATGATCGATTACCCCCCCCCCCAATTA
G
G
G
I
G
I
I
I
I
I
G
I
G
G
G
I
G
G
G
G
G
G
G
I
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
I
G
I
G
I
G
G
G
G
G
G
G
G
G
G
G
I
G
G
G
G
G
G
G
I
G
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
G
I
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
G
I
G
I
G
I
G
G
G
G
G
G
G
I
G
G
G
G
G
I
G
I
G
I
G
G
G
G
G
G
G
I
G
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
I
G
I
G
G
G
G
G
G
GGGIIGGGGGGGGGGGGGGGGGGGGGGGGGGGGIIIIIIIIIIIIGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGIIIIIIIIIGGGGG


In [70]:
hidden_states = ('Ai', 'Ci', 'Gi', 'Ti', 'Ag', 'Cg', 'Gg', 'Tg')
alphabet = ('A', 'C', 'G', 'T')

initial_probabilities = {
    'Ai' : 0.125,
    'Ci' : 0.125,
    'Gi' : 0.125,
    'Ti' : 0.125,
    'Ag' : 0.125,
    'Cg' : 0.125,
    'Gg' : 0.125,
    'Tg' : 0.125
}

transition_probabilities = {
    'Ai': { 'Ai' : 0.2, 'Ci' : 0.36, 'Gi' : 0.2, 'Ti' : 0.2, 'Ag' : 0.01, 'Cg' : 0.01, 'Gg' : 0.01, 'Tg' : 0.01 },
    'Ci': { 'Ai' : 0.1, 'Ci' : 0.1, 'Gi' : 0.66, 'Ti' : 0.1, 'Ag' : 0.01, 'Cg' : 0.01, 'Gg' : 0.01, 'Tg' : 0.01 },
    'Gi': { 'Ai' : 0.1, 'Ci' : 0.39, 'Gi' : 0.1, 'Ti' : 0.1, 'Ag' : 0.1, 'Cg' : 0.01, 'Gg' : 0.1, 'Tg' : 0.1 },
    'Ti': { 'Ai' : 0.2, 'Ci' : 0.36, 'Gi' : 0.2, 'Ti' : 0.2, 'Ag' : 0.01, 'Cg' : 0.01, 'Gg' : 0.01, 'Tg' : 0.01 },
    'Ag': { 'Ai' : 0.01, 'Ci' : 0.1, 'Gi' : 0.01, 'Ti' : 0.01, 'Ag' : 0.2175, 'Cg' : 0.2175, 'Gg' : 0.2175, 'Tg' : 0.2175 },
    'Cg': { 'Ai' : 0.01, 'Ci' : 0.1, 'Gi' : 0.01, 'Ti' : 0.01, 'Ag' : 0.2175, 'Cg' : 0.2175, 'Gg' : 0.2175, 'Tg' : 0.2175 },
    'Gg': { 'Ai' : 0.01, 'Ci' : 0.1, 'Gi' : 0.01, 'Ti' : 0.01, 'Ag' : 0.2175, 'Cg' : 0.2175, 'Gg' : 0.2175, 'Tg' : 0.2175 },
    'Tg': { 'Ai' : 0.01, 'Ci' : 0.1, 'Gi' : 0.01, 'Ti' : 0.01, 'Ag' : 0.2175, 'Cg' : 0.2175, 'Gg' : 0.2175, 'Tg' : 0.2175 }
}

emission_probabilities = {
    'Ai': { 'A' : 1, 'C' : 0.001, 'G' : 0.001, 'T' : 0.001 },
    'Ci': { 'A' : 0.001, 'C' : 1, 'G' : 0.001, 'T' : 0.001 },
    'Gi': { 'A' : 0.001, 'C' : 0.001, 'G' : 1, 'T' : 0.001 },
    'Ti': { 'A' : 0.001, 'C' : 0.001, 'G' : 0.001, 'T' : 1 },
    'Ag': { 'A' : 1, 'C' : 0.001, 'G' : 0.001, 'T' : 0.001 },
    'Cg': { 'A' : 0.001, 'C' : 1, 'G' : 0.001, 'T' : 0.001 },
    'Gg': { 'A' : 0.001, 'C' : 0.001, 'G' : 1, 'T' : 0.001 },
    'Tg': { 'A' : 0.001, 'C' :0.0010, 'G' : 0.001, 'T' : 1 }
}

model = HMM(alphabet, hidden_states, transition_probabilities, emission_probabilities, initial_probabilities)

In [71]:
sequence = "ACGCGATCATACTATATTAGCTAAATAGATACGCGCGCGCGCGCGATATATATATATAGCTAATGATCGATTACCCCCCCCCCCAATTA"

print(sequence)

result = ''.join(model.viterbi(sequence))
result = result.replace("A", "")
result = result.replace("C", "")
result = result.replace("G", "")
result = result.replace("T", "")
result = result.replace("i", "I")

print(result)

ACGCGATCATACTATATTAGCTAAATAGATACGCGCGCGCGCGCGATATATATATATAGCTAATGATCGATTACCCCCCCCCCCAATTA
ggIgIgggggggggggggggggggggggggggggIgIIIIIIIIIgggggggggggggggggggggggggggggggggggggggggggg
