# Hidden Markov Model: HMM #

**Def:** An HMM is a stochastic generator of sequences characterised by:

* N states

* {$a_{k,j}$} : Transition Probabilities --> $a_{k,j} = P(\pi(i) = j|\pi(i-1)=k)$

* {$a_{start,k}$} : Initial prob ---> $a_{0,k} = P(\pi(1)=k)$

* {$a_{k,end}$} : Final Probabilities ----> $a_{k,end} = P(\pi(i)=end|\pi(i-1)=k)$

* An alphabet $C$ with $M$ characters

* {$e_k(c)$} : Emission probabilities for each state ---> $e_k(c) = P(s^i = c | \pi(i) = k)$ 

where s : sequence , $\pi$ : path through the states

Constraints: 

$\sum_k a_{0,k} = 1$

$a_{0,k} + \sum_j a_{k,j} \forall k$

$\sum_{c \in C} e_k(c)= 1 $

**Joint prob sequence and path** ---> $ P(s,\pi|M) = P(s|\pi,M) P(\pi,M)$ 

$P(s|\pi,M) = \prod_{i=1}^T e_{\pi(i)}(s^i) $

$P(\pi|M) = a_{0,\pi(1)} \prod_{i=2}^T a_{\pi(i-1) \pi(i)} a_{\pi(T),end}$ 

----> $P(s,\pi|M) = P(s|\pi,M) P(\pi|M) = a_{\pi(T),end} \prod_{i=1}^T e_{\pi(i)}(s^i) a_{\pi(i-1),\pi(i)}$



**3 MAIN PROBLEM** 

1) Computing probability of a sequence given the model ---> $P(S|M)$ :  **FORWARD ALGORITHM & BACKWARD ALGORITHM**

2) Finding the most probable path given the sequence and the model ---> $\pi* = argmax_{\pi} P(\pi|s,M)$ : **VITERBI**

3) Training a model: optimize the transion and emission probabilities : **BAUM-WELCH**


**Oss**

1) $P(s|M) = \sum_{\pi} P(s,\pi|M)$ and we know $P(s,\pi|M)$ --> summing over all the path is infeasible 

2) $\pi* = argmax_{\pi} P(\pi|s,M)$ but $P(\pi|s,M) = \frac{P(\pi,s|M)}{P(s|M)}$ so $P(s|M) \not\propto \pi$ ---> $\pi* = argmax_{\pi} P(\pi,s|M)$



**Solution** 

1) *Forward ALgorithm*: we evaluate $P(s|M)$ in an intellingent way using a good factorization 
   for each state $K$ and each position $i$ in the sequence compute : $F_k(i) = P(s^1..s^i,\pi(i)=k|M)$
    
   Initialization: $F_{BEGIN}(0) = 1$ and $F_{BEGIN,k}(0) = 0$   $\forall k $
    
   Recurrence: $F_l(i+1) = P(s^1....s^{i+1},\pi(i+1) = l) = \sum_k P(s^1...s^i,\pi(i)=k) a_{k,l} e_l(s^{i+1}) =                    \sum_k F_k(i) a_{k,l} e_l(s^{i+1})$
    
   Termination: $P(s) = P(s^1....s^T,\pi(T+1) = end) = \sum_k F_k(T) a_{k,end}$
   
   *Backward Algorithm*: $B_k(i) = P(s^{i+1}...s^T|\pi(i)=k) $
   
   Initialization: $B_k(T) = P(\pi(T+1)=end|\pi(T) = k) =a_{k,end}$
   
   Recurrence $B_l(i-1) = P(s^i...s^T|\pi(i-1)= l) = \sum_k P(s^{i+1} ... s^T |\pi(i) = k )a_{l,k} e_k(s^i) = \sum_k B_k(i) e_k(s^i) a_{l,k}$
   
   Termination: $P(S) = P(s^1..s^T|\pi(0) = BEGIN ) =  \sum_k P(s^2 ... s^T |\pi(1) = k )a_{0,k} e_k(s^1) = \sum_k B_k(1) a_{0,k} e_k(s^1) $
   
2) *Viterbi* : $V_k(i)$ is the probability of the most probable path for generating the subsequence $s^1...s^i$      ending in the state $k$ at iteration $i$

   Initialisation: $V_{BEGIN}(0) = 1 $ and $ V_i(0) = 0$  $ \forall i \neq BEGIN$
   
   Recurrence: $V_l(i+1) = e_l(s^{i+1}) Max_k ( v_k(i) a_{k,l}) $  and  $ ptr_i(l) = argamax_k(V_{k-1}(i) a_{k,l})$
   
   Termination: $P(s,\pi*) = Max_k(V_k(T) a_{k,0})$   and     $\pi*(T) = argmax_k(V_k(T) a_{k0})$
               
   Traceback : $\pi*(i-1) = ptr_i(\pi*(i))$ 
   
   

   
3) *Baum Welch* : 

Initialization: start with random parameters 
    
Compute Forward and Backward matrices on the known sequences
    
Compute $A_{k,l}$ and $E_k(c)$ where:
    
$A_{k,l} \sum_i P(\pi_i = k ,\pi_{i+1} = l |s) = \frac{\sum_i F_k(i) a_{k,l} e_l(s^{i+1}) B_l(i+1)}{P(s)}$
    
$E_k(c) = \sum_{s^i = c} P(s^i = c , \pi= k|s) = \frac{\sum_{s^i = c} F_k(i) B_k(i)}{P(s)}$

Update: 

$a_{k,l} = \frac{A_{k,l}}{\sum_i^N A_{k,i}}$

$e_k(c) = \frac{E_k(c)}{\sum_{d \in C} E_k(d)}$

Termination : Has $P(s|M)$ incremented?

Yes :Recompute Forward and Backward 

No : End

# HMM building, learning and predicting #

Create a HMM using fair and loaded dices. Create your model to predict an unknown model (the provided one).
The provided model is:
$$
Grammar: \\
Begin \rightarrow f1 | l1 \\
l1 \rightarrow l2 \\
l2 \rightarrow l2 | l3 | f1 \\
l3 \rightarrow l3 | f1 |end \\ 
f1 \rightarrow f1 | l1 |end \\
\\
labels: \\ 
l1, l2, l3 \rightarrow loaded-dice (L)\\ 
f1 \rightarrow fair-die (F)
$$

1. Write your onw HMM model and use the provided training and the testing sequences (created with with the **base_HMM **)
2. Implement Forward and Backward algorithms and train the model using Expectation Maximization: <br>
   2.1 Train your model using state labels (Fair/Loaded) and observed sequence (Heads and Tails) <br>
   2.2 Train your model using only the observed sequence (Heads/Tails) **[This is optional]** <br>
3. Implement a Viterbi and Posterior decoding and compare the predicted labels with the observed labels (Testing), evaluate the Coen's k-score, and other measure of accuracies.

In [103]:
%matplotlib inline
import numpy as np

class base_HMM:
    def __init__(self, states, state_labels, symbols, a=[], e =[]):
        ''' HMM(states, symbols, a=None, e = None)
        we assume that first name is begin and last name is end state
        we assume to order the state names and the symbols so that a and e are numpy matrices
        '''
        self.states = states
        self.n_states = len(states) #len states è la lunghezza degli stati con Begin e End
        self.labels = state_labels
        self.n_lab = len(state_labels)
        self.symbols = symbols
        self.n_symb = len(symbols)
        self.a = a
        self.e = e
        if self.a != []:
            self.allowed_tr = np.where(self.a >0,1.0,0)  
        else:
            self.allowed_tr =  []
            
    def string_2_vec(self, s, alph):
        vector = [[0 if char != letter else 1 for char in alph] for letter in s]
        return np.array(vector)
    


    def init_prob(self, mode="random",epsilon =10**(-10)):
        ''' init_prob(mode="random"/ "uniform" '''
        if mode == "random":
            a = np.random.uniform(1,100,(self.n_states,self.n_states))
            e = np.random.uniform(1,100,(self.n_states,self.n_symb))
        else:
            a = np.ones((self.n_states,self.n_states))
            e = np.ones((self.n_states,self.n_symb))
        if self.allowed_tr != []:
            a = a * self.allowed_tr
        a[:,0] = 0 # no transition to begnin
        a[0][-1] = 0 # no trnasition begin to end
        a[-1] = 0 # no transitions from end to other states
        self.a = a / (a.sum(axis=1)[:,np.newaxis] + epsilon) # normalize
        e[0] = 0 # no emissions in begin
        e[-1] = 0 # no emissions in end
        self.e = e / (e.sum(axis=1)[:,np.newaxis]+ epsilon) # normalize
        
        
        
        
        
        
    def forward(self, x_seq, y_seq=None):
        x = self.string_2_vec(x_seq, self.symbols)
        y = y_seq 
        N = len(x)

        F = np.zeros((N+2,self.n_states),dtype=np.double)        
        # forward initialization
        F[0][0] = 1.0
        #print(F)
        # forward loop
        for i in range(1,N+1): # 
            for s in range(self.n_states): # 
                #print(s,i)
                if y_seq == None or self.labels[s] == y[i-1]: #Masking 
                    
                    for t in range(self.n_states):
                        F[i][s] += self.a[t,s]*F[i-1][t]
                        
                    F[i][s] *= np.dot(self.e[s],x[i-1]) # moltiplico emissione per simbolo, simbolo di fatto è un vettore di zeri e 1
                    
        s = self.n_states -1  # end state
        i = N + 1
        for t in range(self.n_states):
            F[i][s] += self.a[t,s]*F[i-1][t]
        return F
    
    
    
    
    def backward(self, x_seq, y_seq=None):
        x = self.string_2_vec(x_seq, self.symbols)
        y = y_seq 
        N = len(x)
        B = np.zeros((N+2,self.n_states),dtype=np.double)        
        # Backward initialization
        B[-1][-1]=1.0

        for t in range(self.n_states):
            if y_seq == None or self.labels[t] == y[N-1]:
                
                B[N][t] = self.a[t,self.n_states-1] 
        # backward loop
        for i in range(N-1,0,-1): # 
            for s in range(1,self.n_states): # 
                if y_seq == None or self.labels[s] == y[i-1]: #Masking
        
                    for t in range(self.n_states):
                        B[i][s] += self.a[s,t]*B[i+1][t]*np.dot(self.e[t],x[i])
                        
                    
        s = 0  # end state
        i = 0
        for t in range(self.n_states):
            B[i][s] += self.a[s,t]*B[i+1][t]*np.dot(self.e[t],x[0])
        return B
    
#################################################################################################################    
    
    def viterbi(self,x_seq):
        hidden_gramatical_pi_str=[]
        labels_pi_str=[]
        x = self.string_2_vec(x_seq, self.symbols)
        N=len(x)
        v = np.zeros((N+2,self.n_states))
        ptr = np.zeros((self.n_states,N+1))
        pi_star = np.zeros(N)
        #inizializzazione 
        v[0][0]=1.0
        max_v = 0
        argmax =0
        max_index = 0
        #Ricorrenza
        for i in range(1,N+1): # 
            for s in range(self.n_states):
                max_index = 0
                max_v = 0
                argmax =0
                #calcolo max
                for t in range(self.n_states):
                    temp_v = max_v
                    max_v = v[i-1][t]*self.a[t,s]
                    if max_v > temp_v:
                        argmax = max_v
                        max_index = t
                v[i][s] = np.dot(self.e[s],x[i-1])*argmax
                ptr[s][i] = max_index
        
        maxx = 0.  
        max_ind = 0
        P=0.
        mazz = 0.
        #Termination---> Final state = self.n_states-1
        for j in range(self.n_states):
            temp = maxx
            maxx = v[N][j]*self.a[j,self.n_states-1]
            #print("deb1",maxx)#maxx -> probabilità di ogni sequenza
            if maxx > temp:
                mazz = maxx
                max_ind = j
        v[N+1][self.n_states-1] = mazz #mazz -> probabilità sequenza più probabile
        #print('deb2',mazz)
        pi_star[N-1] = max_ind
        hidden_gramatical_pi_str.append(self.states[max_ind])
        labels_pi_str.append(self.labels[max_ind])
        for i in range(N-1,0,-1):
            pi_star[i-1] = ptr[int(pi_star[i])][i+1]
            hidden_gramatical_pi_str.insert(0,self.states[int(pi_star[i-1])])
            labels_pi_str.insert(0,self.labels[int(pi_star[i-1])])
        return hidden_gramatical_pi_str,labels_pi_str  #pi_star
    
    
################################################################################################################## 
    
    
    
    def posterior_decoding(self,x_seq,y=None):
        
        posterior_index = []
        labels_posterior_str=[]
        hidden_gramatical_posterior_str = []
        N=len(x_seq)
        F = self.forward(x_seq,y)  
        B = self.backward(x_seq,y)
        
        for i in range(1,N+1):
            idx = []
            idx = [F[i][k] * B[i][k] /F[-1][-1] for k in range(self.n_states)]
            max_ind=np.argmax(idx)
            posterior_index.append(max_ind)
            hidden_gramatical_posterior_str.append(self.states[max_ind])
            labels_posterior_str.append(self.labels[max_ind])
            
        return hidden_gramatical_posterior_str,labels_posterior_str
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    def baumwelch_Nseq(self,X_train,y_seq=None):
        n_iter=10000
        temp_p =0.
        # inizializzo randomicamente a,e 
        self.init_prob( mode="random",epsilon =10**(-10))
        
        for iteration in range(n_iter):
            #Inizializzazione A,E
            A = np.zeros((self.n_states,self.n_states),dtype=np.double)
            E = np.zeros((self.n_states,self.n_symb),dtype=np.double)
            A_totseq = np.zeros((self.n_states,self.n_states),dtype=np.double)
            E_totseq = np.zeros((self.n_states,self.n_symb),dtype=np.double)
            P_s = 0.

            for seq in range(len(X_train)):
                x_seq = X_train[seq]
                if y_seq==None:
                    y = y_seq
                else:
                    y = y_seq[seq]
                x = self.string_2_vec(x_seq, self.symbols)
                N = len(x)
                #F , B per ogni sequenza 
                F = self.forward(x_seq,y) #sequence 
                B = self.backward(x_seq,y) #sequence 

            #Expected number of transitions A per sequence 
                for k in range(1,self.n_states):
                    num =0.
                    for t in range(N+1):
                        num += F[t][0]*self.a[0,k]*B[t+1][k] #no emissione 
                    A[0][k] = num/F[-1][-1]
                        
                for i in range(1,self.n_states):
                    for k in range(1,self.n_states):
                        num=0.
                        for t in range(N):
                            num += F[t][i] * self.a[i,k] * np.dot(self.e[k],x[t]) * B[t+1][k]
            
                        A[i][k] = num/(F[-1][-1])
            
                for k in range(1,self.n_states):
                    num = 0.
                    for t in range(N+1):
                        num += F[t][k]*self.a[k,-1]*B[t+1][-1] #no emissione
                    A[k][-1] = num/F[-1][-1]
            
            #Expected number of emission per sequence 
                for k in range(1,self.n_states):
                    for c in range(self.n_symb):
                        num = 0.
                        for t in range(1,N+1):
                            if x[t-1][c]==1:
                                num += F[t][k]*B[t][k]
                        E[k][c] = num/F[-1][-1]
            
            #Sommo su su tutte le sequenze A e E 
                for j in range(self.n_states):
                    for t in range(self.n_states):
                        A_totseq[j][t] += A[j][t]
                        
                for j in range(self.n_states):
                    for c in range(self.n_symb):
                        E_totseq[j][c] += E[j][c]
                             
                P_s += F[-1][-1]
                
            
            A = A_totseq
            E = E_totseq
            P_s = P_s/len(X_train)

            #Ricostruisco prob_tranisizion e emission
            for k in range(1,self.n_states-1):
                for l in range(1,self.n_states):
                    den = 0.
                    for i in range(1,self.n_states):
                        #print("k",k,"i",i)
                        den += A[k][i]

                    self.a[k][l]=A[k][l]/den

            for k in range(1,self.n_states-1):
                for c in range(self.n_symb):
                    den =0.
                    for d in range(self.n_symb):
                        den += E[k][d]

                    self.e[k][c] = E[k][c]/den
                    
            #media delle probilità di P(S|M) per ogni sequenza come condizione di uscita 
            if P_s<=temp_p:
                #print(temp_p,P_s)
                break 
            else:
                temp_p = P_s


        return self.a,self.e,iteration
    
    



######################################################################################################################    
    
    
    def random_path(self, min_len=10):
        hp, hlab, op = [], [], []
        hp.append(self.states[0])
        hlab.append(self.labels[0])
        op=[None]
        cs = 0
        i = 0
        while i< min_len:
            new_state = np.random.choice(range(1,self.n_states),1,p=self.a[cs][1:])[0]
            if self.states[new_state] != self.states[-1]:
                cs = new_state
                hp.append(self.states[cs])
                hlab.append(self.labels[cs])
                symb = np.random.choice(range(self.n_symb),1,p=self.e[cs])[0]
                op.append(self.symbols[symb])
                i +=1
        while self.states[new_state] != self.states[-1]:
            new_state = np.random.choice(range(1,self.n_states),1,p=self.a[cs][1:])[0]
            if self.states[new_state] != self.states[-1]:
                cs = new_state
                hp.append(self.states[cs])
                hlab.append(self.labels[cs])
                symb = np.random.choice(range(self.n_symb),1,p=self.e[cs])[0]
                op.append(self.symbols[symb])
        return hp, hlab, op

    


**Esempio appunti**

In [128]:

transitions=np.array([ [0, 0.2, 0.8, 0],
                       [0, 0.7, 0.2, 0.1],
                       [0, 0.1, 0.8, 0.1],
                       [0, 0,   0,   0],
                     ])

#create a model
#EMISSIONS A-C-G-T
emissions = np.array([ [0, 0, 0, 0], [0.1, 0.4, 0.4, 0.1], [0.25, 0.25, 0.25, 0.25], [0, 0, 0, 0] ])
hmm = base_HMM(["begin","Y","N","end"],["-","Y","N","-"], ["A","C", "G", "T"], a=transitions, e=emissions )    

    



  if self.a != []:


**Viterbi Decoding**

In [129]:
hmm.viterbi("AGCGCGTAATCTG")[0]

['N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N']

**Posterior Decoding**

In [130]:
hmm.posterior_decoding("AGCGCGTAATCTG")[0]

['N', 'N', 'N', 'Y', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N']

**Esempio compito**

In [107]:
transitions = np.array([[0., 0.30,   0,   0, 0.7,      0.],
                           [0.,    0, 1.0,   0,   0,       0.],
                           [0.,    0, 0.05, 0.90,  0.05,   0.],
                           [0.,    0.,   0, 0.6, 0.3,      0.1], 
                           [0.,    0.1,   0., 0, 0.8,      0.1],
                           [0.,    0,   0,   0,   0,       0]],dtype=np.double)
        
    
    #create a model         #il begin non emette nulla                              end non emette nulla 
emissions = np.array([ [0, 0], [0.2, 0.8], [0.2, 0.8], [0.2, 0.8], [0.5, 0.5], [0, 0] ])
hmm = base_HMM(["begin","l1","l2","l3","f1","end"],["-","L","L","L","F","-"], ["H","T"], a=transitions, e=emissions )
    

  if self.a != []:


In [18]:
def mkset(N,hmm, ml):
    X, y =[], []
    for n in range(N):
        states, state_labels, observed = hmm.random_path(ml)
        X.append("".join(observed[1:])) 
        y.append("".join(state_labels[1:]))
    return X,y

Ntr = 100  # set this eg. 100 or 1000
Nts = 100  # set this eg. 100
min_seq_len = 50 # set this 10 or 50 or 100
X_train, y_train = mkset(Ntr,hmm, min_seq_len) 
X_test, y_test = mkset(Nts,hmm, min_seq_len) 

**Training Supervised**

In [19]:
hmm.baumwelch_Nseq(X_train,y_train)

  if self.allowed_tr != []:


(array([[0.00000000e+00, 5.59657104e-01, 0.00000000e+00, 0.00000000e+00,
         4.40342896e-01, 0.00000000e+00],
        [0.00000000e+00, 0.00000000e+00, 1.00000000e+00, 0.00000000e+00,
         0.00000000e+00, 0.00000000e+00],
        [0.00000000e+00, 0.00000000e+00, 6.42145670e-01, 2.93897928e-01,
         6.39564023e-02, 0.00000000e+00],
        [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.54014174e-13,
         9.45711463e-01, 5.42885373e-02],
        [0.00000000e+00, 1.10208228e-01, 0.00000000e+00, 0.00000000e+00,
         8.69730828e-01, 2.00609446e-02],
        [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
         0.00000000e+00, 0.00000000e+00]]),
 array([[0.        , 0.        ],
        [0.18046709, 0.81953291],
        [0.22383721, 0.77616279],
        [0.17938585, 0.82061415],
        [0.49492128, 0.50507872],
        [0.        , 0.        ]]),
 661)

**Training Unsupervised**

In [20]:
hmm.baumwelch_Nseq(X_train)

  if self.allowed_tr != []:


(array([[0.00000000e+00, 4.45953724e-01, 0.00000000e+00, 0.00000000e+00,
         5.54046276e-01, 0.00000000e+00],
        [0.00000000e+00, 0.00000000e+00, 1.00000000e+00, 0.00000000e+00,
         0.00000000e+00, 0.00000000e+00],
        [0.00000000e+00, 0.00000000e+00, 7.16646895e-01, 1.37780131e-01,
         1.45572974e-01, 0.00000000e+00],
        [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 9.65810675e-01,
         1.13974279e-03, 3.30495818e-02],
        [0.00000000e+00, 8.05863731e-02, 0.00000000e+00, 0.00000000e+00,
         9.19413627e-01, 1.75565022e-90],
        [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
         0.00000000e+00, 0.00000000e+00]]),
 array([[0.        , 0.        ],
        [0.11845897, 0.88154103],
        [0.11018183, 0.88981817],
        [0.4118994 , 0.5881006 ],
        [0.4917217 , 0.5082783 ],
        [0.        , 0.        ]]),
 2506)

**Cohen Score**

In [126]:
vit_cohen=[]
post_cohen=[]
for i in range(len(X_train)):
    viterbi_path=hmm.viterbi(X_train[i])[1]
    posterior_path = hmm.posterior_decoding(X_train[i])[1]
    true_sequence = y_train[i]
               
    cohen_viterbi=[]
    cohen_posterior=[]
    cohen_true=[]
    for i in viterbi_path:
        if i =='L':
            cohen_viterbi.append(0)
        else:
            cohen_viterbi.append(1)

    for i in posterior_path:
        if i =='L':
            cohen_posterior.append(0)
        else:
            cohen_posterior.append(1)
        

    for i in true_sequence:
        if i =='L':
            cohen_true.append(0)
        else:
            cohen_true.append(1)
               
    vit_cohen.append(cohen_kappa_score(cohen_viterbi,cohen_true))
    post_cohen.append(cohen_kappa_score(cohen_posterior,cohen_true))
               
print('Viterbi Cohen Score : ',np.mean(vit_cohen),'       Posterior Cohen Score : ',np.mean(post_cohen))

Viterbi Cohen Score :  0.07644063483642555        Posterior Cohen Score :  0.22729770681767048


**oss**  Il posterior ha un Cohen score più alto però non preserva la grammatica!