# HMM building, learning and predicting #

Create a HMM using fair and loaded dices. Create your model to predict an unknown model (the provided one).
The provided model is:
$$
Grammar: \\
Begin \rightarrow f1 | l1 \\
l1 \rightarrow l2 \\
l2 \rightarrow l2 | l3 | f1 \\
l3 \rightarrow l3 | f1 |end \\ 
f1 \rightarrow f1 | l1 |end \\
\\
labels: \\ 
l1, l2, l3 \rightarrow loaded-dice (L)\\ 
f1 \rightarrow fair-die (F)
$$

1. Write your onw HMM model and use the provided training and the testing sequences (created with with the **base_HMM **)
2. Implement Forward and Backward algorithms and train the model using Expectation Maximization: <br>
   2.1 Train your model using state labels (Fair/Loaded) and observed sequence (Heads and Tails) <br>
   2.2 Train your model using only the observed sequence (Heads/Tails) **[This is optional]** <br>
3. Implement a Viterbi and Posterior decoding and compare the predicted labels with the observed labels (Testing), evaluate the Coen's k-score, and other measure of accuracies.

# Class base_HMM:
- string_2_vec: returns a two columns (head/tail) matrix with the lenght of the sequence as number of rows. If the i-th sequence element is H/T, the i-th row of the matrix has a 1 in the correspondig column
- forward function: this function generates the forward matrix given x and y string
- backward function: this function generates the backward matrix given x and y string
- viterbi function: returns the hidden path given the emission states (decoding problem)
- A posteriori function
- Baum Welch
                               

In [8]:
%matplotlib inline
import numpy as np

class base_HMM:
    def __init__(self, states, state_labels, symbols, a=[], e =[]):
        ''' HMM(states, symbols, a=None, e = None)
        we assume that first name is begin and last name is end state
        we assume to order the state names and the symbols so that a and e are numpy matrices
        '''
        self.states = states
        self.n_states = len(states)
        self.labels = state_labels
        self.n_lab = len(state_labels)
        self.symbols = symbols
        self.n_symb = len(symbols)
        self.a = a
        self.e = e
        self.eps=0.00001 #for overflow problems (denom / exp / log etc)
        if self.a != []:
            self.allowed_tr = np.where(self.a >0,1.0,0)  
        else:
            self.allowed_tr =  []
            
    def string_2_vec(self, s, alph):
        vector = [[0 if char != letter else 1 for char in alph] for letter in s]
        return np.array(vector)


    def init_prob(self, mode="random",epsilon =10**(-10)):
        ''' init_prob(mode="random"/ "uniform" '''
        if mode == "random":
            a = np.random.uniform(1,100,(self.n_states,self.n_states))
            e = np.random.uniform(1,100,(self.n_states,self.n_symb))
        else:
            a = np.ones((self.n_states,self.n_states))
            e = np.ones((self.n_states,self.n_symb))
        if self.allowed_tr != []:
            a = a * self.allowed_tr
        a[:,0] = 0 # no transition to begnin
        a[0][-1] = 0 # no trnasition begin to end
        a[-1] = 0 # no transitions from end to other states
        self.a = a / (a.sum(axis=1)[:,np.newaxis] + epsilon) # normalize
        e[0] = 0 # no emissions in begin
        e[-1] = 0 # no emissions in end
        self.e = e / (e.sum(axis=1)[:,np.newaxis]+ epsilon) # normalize
        
    def forward(self, x_seq, y_seq):
        x = self.string_2_vec(x_seq, self.symbols)
        y = y_seq 
        N = len(x)
        F = np.zeros((N+2,self.n_states))        
        scale = np.zeros(N+2)
        # forward initialization
        F[0][0] = 1.0
        scale[0] = 1.0
        # forward loop
        for i in range(1,N+1):
            for s in range(self.n_states):
                if self.labels[s] == y[i-1]:
                    #print(s, self.labels[s], y[i-1])
                    for t in range(self.n_states):
                        F[i][s] += self.a[t,s]*F[i-1][t]
                    F[i][s] *= np.dot(self.e[s],x[i-1])
            scale[i] = np.sum(F[i])
            F[i]/=scale[i]
        s = self.n_states -1  # end state
        i = N + 1
        for t in range(self.n_states):
            F[i][s] += self.a[t,s]*F[i-1][t]
        scale[i] = np.sum(F[i])
        F[i]/=scale[i]
        return F

    
    def backward(self, x_seq, y_seq): #faccio la stessa cosa del forward ma parto dalla fine
        x = self.string_2_vec(x_seq, self.symbols)
        y = y_seq 
        N = len(x)
        bkw = np.zeros((N+2,self.n_states))        
        scale = np.zeros(N+2)
        # backward initialization
        bkw[-1][-1] = 1.0
        scale[0] = 1.0
        
        for s in range (self.n_states):
            if self.labels[s] == y[-1]:
                #print(s, self.labels[s], y[-1])
                bkw[-2][s] = self.a[s, -1]
                scale[-2] += self.a[s, -1]
        bkw[-2]/=scale[-2]
        
        for i in range(N-1, 0, -1):
            #print(i)
            for s in range (self.n_states):
                if self.labels[s] == y[i-1]:
                    #print(s, self.labels[s], y[i])
                    for t in range(self.n_states):
                        #print(bkw[i-1][s])
                        bkw[i][s] += self.a[s,t]*bkw[i+1][t]*np.dot(self.e[t],x[i])
                    scale[i] = np.sum(bkw[i])
            bkw[i]/=scale[i]  
   
    #filling begin state
       
        for t in range(self.n_states):
            bkw[0][0] += self.a[0,t]*bkw[i+1][t]*np.dot(self.e[t],x[i])
        
        scale[0] = np.sum(bkw[0])
        bkw[0]/=scale[0]
        
        return bkw
    
   ### ### ### ### ### ### ###
    
#FINDING THE PATH
 
#these two functions return the hidden path: pie_labels
    
    def viterbi(self, x_seq):
        x = self.string_2_vec(x_seq, self.symbols)
        #y = y_seq 
        N = len(x)
        T=N+2
        
        p_tr = np.zeros((N,self.n_states))
        pie_star=np.zeros(N)
        #viterbi initialization
        W = np.zeros((T,self.n_states))
        scale = np.zeros(T)
        W[0][0] = 1.0
        scale[0] = 1.0
        # Viterbi Recurrence
        
        for i in range(0,N):
            for s in range(self.n_states):
                maximum= max([self.a[k,s]*W[i][k] for k in range(self.n_states)])
                W[i+1][s] = maximum*np.dot(self.e[s],x[i])
                #print(W[i+1][s])
                p_tr[i][s] = np.argmax([self.a[k,s]*W[i][k] for k in range(self.n_states)])
            scale[i+1] = np.sum(W[i+1])
            W[i+1]/=scale[i+1] 
        #Viterbi Termination
        q=self.n_states-1
        W[T-1][q] = max([self.a[k,q]*W[T-1][k] for k in range(self.n_states)])
        pie_star[N-1]=np.argmax([self.a[k,q]*W[N][k] for k in range(self.n_states)])
        #Traceback
        for i in range(N-1, 0, -1):
            #print(i)
            pie_star[i-1] = p_tr[i][int(pie_star[i])]
        pie_label=[]
        for r in range(N):
            pie_label.append(self.labels[int(pie_star[r])])
        return W, pie_label
        #return pie_label
    
    def posteriori(self, x_seq, y_seq):
        x = self.string_2_vec(x_seq, self.symbols)
        N = len(x)
        alpha = self.forward(x_seq, y_seq)
        beta = self.backward(x_seq, y_seq)
        pie_star = [np.argmax([alpha[i+1][k] * beta[i+1][k] for k in range(self.n_states)]) for i in range(N)]
        pie_label=[]
        for r in range(N):
            pie_label.append(self.labels[int(pie_star[r])])
        return pie_star, pie_label
        #return pie_label
    
    ### ### ### ### ### ### ### ### 
    
    def baum_welch(self,x_seq, y_seq, n_iter=1):
       
        x = self.string_2_vec(x_seq, self.symbols) 
        xseq = np.zeros((len(x_seq),1))
        #inizializzo a random a (transiz) ed e (emiss) e normalizzo
        self.a = np.random.rand(self.n_states, self.n_states)
        self.e = np.random.rand(self.n_states, self.n_symb)
        for i in range(self.n_states):
            self.a[i]/=np.sum(self.a[i])
            self.e[i]/=np.sum(self.e[i])
    
        T = len(x)  
        N = T+2
        # loop
        csi = np.zeros((self.n_states, self.n_states))
        gamma = np.zeros((self.n_states, self.n_symb))
        a_star = np.zeros((self.n_states, self.n_states))
        b_star = np.zeros((self.n_states, self.n_symb))
        for i in range(n_iter): #n iter è il tempo che passa
            
            #build forward and backward matrix
            Fwd = self.forward(x_seq, y_seq)  
            Bwd = self.backward(x_seq, y_seq)
            
            #compute b star 
            for r in range(self.n_states):   
                for p in range(len(self.symbols)):
                    b_star[r][p] = sum([Fwd[j+1][r]*Bwd[j+1][r] for j in range(T) if x[j,p]==1])      
      
            #compute a star
            for k in range(self.n_states):
                for l in range(self.n_states):
                    a_star[k][l] = sum([Fwd[j][k]*self.a[k][l]*np.dot(self.e[l],x[j])*Bwd[j+1][l] for j in range(T)])
            
            
            
        self.a=a_star
        self.e=b_star
        
        return self.a, self.e
        

    ### ### ### ### ### ### ### ###  
    
    
    def random_path(self, min_len=10):
        hp, hlab, op = [], [], []
        hp.append(self.states[0])
        hlab.append(self.labels[0])
        op=[None]
        cs = 0
        i = 0
        while i< min_len:
            new_state = np.random.choice(range(1,self.n_states),1,p=self.a[cs][1:])[0]
            if self.states[new_state] != self.states[-1]:
                cs = new_state
                hp.append(self.states[cs])
                hlab.append(self.labels[cs])
                symb = np.random.choice(range(self.n_symb),1,p=self.e[cs])[0]
                op.append(self.symbols[symb])
                i +=1
        while self.states[new_state] != self.states[-1]:
            new_state = np.random.choice(range(1,self.n_states),1,p=self.a[cs][1:])[0]
            if self.states[new_state] != self.states[-1]:
                cs = new_state
                hp.append(self.states[cs])
                hlab.append(self.labels[cs])
                symb = np.random.choice(range(self.n_symb),1,p=self.e[cs])[0]
                op.append(self.symbols[symb])
        return hp, hlab, op

    ### ### ### ### ### ### ### ###

    

In [9]:
#create the model
transitions = np.array([[0, 0.30,   0,   0, 0.7,   0],
                       [0,    0, 1.0,   0,   0,   0],
                       [0,    0, 0.05, 0.90,  0.05,   0],
                       [0,    0,   0, 0.6, 0.3, 0.1], 
                       [0,    0.1,   0, 0, 0.8, 0.1],
                       [0,    0,   0,   0,   0,   0]])

emissions = np.array([ [0, 0], [0.2, 0.8], [0.2, 0.8], [0.2, 0.8], [0.5, 0.5], [0, 0] ])

hmm = base_HMM(["begin","l1","l2","l3","f1","end"],
               ["-","L","L","L","F","-"], ["H","T"],
               a=transitions, 
               e=emissions )




In [10]:
#function that creates training or testing set 
def mkset(N,hmm, ml):
    X, y =[], []
    for n in range(N):
        states, state_labels, observed = hmm.random_path(ml)
        X.append("".join(observed[1:-1])) 
        y.append("".join(state_labels[1:-1]))
    return X,y

# Testing class functions #
In this last part, we test class function we some training set that we can generate through mkset function .



In [12]:
Ntrain = 10  # set this eg. 100 or 1000
Ntest = 5   # set this eg. 100
min_seq_len = 4 # set this 10 or 50 or 100
X_train, y_train = mkset(Ntrain,hmm, min_seq_len) 
X_test, y_test = mkset(Ntest,hmm, min_seq_len) 

#print(X_train, y_train)

#print(X_test, y_test)

print(X_train[0])
print(y_train[0])

HHTTTTTHHHTTTHHH
FFFFLLLLFFFFFLLL


# Forward and backward matrices

In [13]:
print(hmm.forward(X_train[0],y_train[0]))

[[1.         0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         1.         0.         0.         0.         0.        ]
 [0.         0.         1.         0.         0.         0.        ]
 [0.         0.         0.05263158 0.94736842 0.         0.        ]
 [0.         0.         0.00425532 0.99574468 0.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         1.         0.        

In [14]:
print(hmm.backward(X_train[0],y_train[0]))

[[1.         0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.50699143 0.30594497 0.1870636  0.         0.        ]
 [0.         0.48999775 0.3158013  0.19420094 0.         0.        ]
 [0.         0.09950249 0.54228856 0.35820896 0.         0.        ]
 [0.         0.         0.14285714 0.85714286 0.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.48780488 0.31707317

# Decoding algorithms 

These algorithms find the most probable state of the hidden variables in the model given the observations.

- Viterbi 
- A posteriori


In [491]:
def accuracy(y_pred, y):
    c=0
    m=len(y_pred)
    for i in range (m):
        if y_pred[i]==y[i]:
            c+=1
    return c/m

In [492]:
Predict=hmm.viterbi(X_train[0])[1]

acc=accuracy(Predict, y_train[0])

print("Hidden path for Viterbi algorithm is " + str(Predict))
print("Accuracy for Viterbi algorithm is " + str(acc))


Hidden path for Viterbi algorithm is ['F', 'F', 'F', 'L', 'L', 'L', 'L', 'L']
Accuracy for Viterbi algorithm is 0.875


In [493]:
hmm.posteriori(X_train[0],y_train[0])

Predict=hmm.posteriori(X_train[0], y_train[0])[1]

acc=accuracy(Predict, y_train[0])

print("Hidden path for a posteriori algorithm is " + str(Predict))
print("Accuracy for a posteriori algorithm is " + str(acc))


Hidden path for a posteriori algorithm is ['F', 'F', 'L', 'L', 'L', 'L', 'L', 'L']
Accuracy for a posteriori algorithm is 1.0


# Baum - Welch algorithm
doesn't work.

The idea is to 
- generate random matrices a and e
- compute Forward and backward
- compute gamma and csi matrices in order to create
- matrices a  and e with a correction
- reiterate this process using last found a and e

In this way given X train and Y_train, we should get matrices of transition and emission.
This doesn't happen in my code: function outcome does not really make sense to me (not normalized for exemple)

In [495]:
hmm.baum_welch("HTTT", "FLLL", n_iter=10)

(array([[0.        , 0.        , 0.        , 0.        , 0.07106037,
         0.        ],
        [0.        , 0.03096929, 0.01347873, 0.02212412, 0.        ,
         0.        ],
        [0.        , 0.01755207, 0.01239235, 0.00426059, 0.        ,
         0.        ],
        [0.        , 0.01852467, 0.01164643, 0.00675649, 0.        ,
         0.        ],
        [0.        , 0.06887662, 0.02940199, 0.01369551, 0.        ,
         0.        ],
        [0.        , 0.        , 0.        , 0.        , 0.        ,
         0.        ]]),
 array([[0.        , 0.        ],
        [0.        , 0.45003242],
        [0.        , 0.23191105],
        [0.        , 0.18347252],
        [1.        , 0.        ],
        [0.        , 0.        ]]))