<h1><b>Statistique en Bioinformatique : </b> TME 5 et 6 </h1>
<br>
L’objectif de ce TME est:
<br>
<ul>
<li> implémenter l'algorithme de Viterbi et l'estimation des paramètres (en utilisant le Viterbi training)
pour l'exemple du occasionally dishonest casino.   </li> 
</ul>
<br>
<div class="alert alert-warning" role="alert" style="margin: 10px">
<p><b>Soumission</b></p>
<ul>
<li>Renomer le fichier TME5_6.ipynb pour NomEtudiant1_NomEtudiant2.ipynb </li>
<li>Soumettre via moodle </li>
</div>
</div>

Nom etudiant 1 :
<br>
Nom etudiant 2 :
<br>

<h3>Introduction</h3>
Un casino parfois malhonnête (occasionally dishonest casino) utilise 2 types de pieces : fair et unfair. <br>
La matrice de transition entre les états cachés est:<br>
${\cal S}=\{F,U\}$ (fair, unfair):
$$
p = \left(
\begin{array}{cc}
0.99 & 0.01\\
0.05 & 0.95
\end{array}
\right)\ ,
$$

les probabilités d'émission des symboles 
${\cal O} = \{H,T\}$ (head, tail):
\begin{eqnarray}
e_F(H) =  0.5 &\ \ \ \ &
e_F(T) = 0.5 \nonumber\\
e_U(H) = 0.9 &\ \ \ \ &
e_U(T) = 0.1 \nonumber
\end{eqnarray}

<br> Et la condition initiale $\pi^{(0)} = (0.999,0.001)$ (le jeux commence presque toujours avec le pieces juste (fair).

<b>Exercice 1</b>:
<u>Simulation</u>: Écrire une fonction qui simule $T$ jets de pièces. 
La fonction renverra un tableau à deux colonnes correspondant 
aux valeurs simulées pour les états cachés $X_t$ 
(type de dés utilisée, “F” ou “U”) et aux symboles observées $Y_t$ 
(résultat du jet de dés, “H” ou “T”). On simulera une séquence
de longueur 2000 qu'on gardera pour les applications ultérieures.


In [6]:
import numpy as np
import matplotlib.pyplot as plt

#states
S = { 0:'F',1 :'U'}

#transition probability matrix
Pij = np.array([[0.99,0.01], [0.05,0.95]])

#emision symbols 
O = {0:'H', 1: 'T'}

#emission probability matrix
Ei = np.array([[0.5,0.5], [0.9,0.1]]) #ça aurait dû être Eio

# initial Condition
pi0=np.array([0.999,0.001])

#number of jets
T = 2000

In [7]:
import random

# Fonction qui simule T jets de pieces
def jets(T, pi0, Eij, Pij):
    """
      simulation of occasionally dishonest casino
      input1 T: number of jets
      input2 pi0: initial condition
      input3 Eij: emission probability matrix
      input4 Pij: transition probability matrix
      output1 jetsRes: matrix |2xT| containing simulations
    """

    # Creation du tableau
    jetsRes = np.zeros((T,len(pi0)),dtype=int)

    
    # création d'une matrice pour les observations avec des chiffres
    Traj_ind = []
    
    # création d'une séquence aléatoire de longueur T 
    nb_alea = []
    for i in range (T): 
        alea = random.random()
        nb_alea.append(alea)
#     print("ALEA ====> ", "len=",len(nb_alea),nb_alea)
    
#     normalisation de la matrice
    taille_Pi_0 = len(pi0)
    Pi_0_normal = np.zeros(taille_Pi_0)
    somme = np.sum(pi0)
#     print(somme)

    for i in range (len(pi0)): 
        Pi_0_normal[i]= pi0[i]/somme
#     print("pi0",pi0)
#     print("Pi_0_normal = ",Pi_0_normal)

    # création de la matrice pi' (avec les sommes cumulées croissantes)
    Pi_0_bis = np.zeros(taille_Pi_0)
#     print("Pi_0_bis",Pi_0_bis)
    
    Pi_0_bis[0]= Pi_0_normal[0]
    for i in range (1,taille_Pi_0): 
        Pi_0_bis[i]= Pi_0_normal[i] + Pi_0_bis[i-1]
#     print("Pi_0_bis",Pi_0_bis) 
    
    # normalisation de la matrice Pij 
    for i in range (int(len(Pij))): 
        Somme = np.sum(Pij[i])
        for j in range (int(len (Pij[i]))):
            Pij[i][j]= (Pij[i][j])/Somme   
    
    # création de la matrice P' (avec les sommes cumulées croissantes)
    taille_P = len(Pij)
#     print(taille_P)
    P_bis = np.zeros((taille_P, taille_P))
#     print("P_bis\n",P_bis)
    
    for i in range(taille_P): 
        for j in range(taille_P): 
            if j == 0 : 
                P_bis[i][j]=Pij[i][j]
            else : 
                P_bis[i][j]=Pij[i][j] + P_bis[i][j-1]
#     print("P=\n",Pij)
#     print("P_bis = \n",P_bis)


    # trouver le premier état
    premier = nb_alea[0]
#     print("premier =",premier)
#     print("Pi_0_bis =", Pi_0_bis)
    if premier < Pi_0_bis[0]:
        Traj_ind.append('0')
    
    else: 
        Traj_ind.append('1')
        
    
    # trouver les états suivants
    for i in range(1, T): 
#         print("n=", n, "i=", i)
        if Traj_ind[-1] == "0": 
            indice = 0
        if Traj_ind[-1] == "1": 
            indice = 1
#         print("indice =",indice)

        suivant = nb_alea[i]
#         print("suivant = ",suivant)
#         print("P_bis[indice] =",P_bis[indice])
        if suivant <= P_bis[indice][0]:
            
            Traj_ind.append('0')
        else: 
            
            Traj_ind.append('1')
#     
#     print ('Traj_ind --> ',Traj_ind)
    
   
    Obs_ind = []
    
    
    for i in range(T): 
        if Traj_ind[i]== '0': 
            liste = ["0", "1"]
            e = random.choices(liste, weights = (1,1), k = 1)            
            Obs_ind.append(e)
          
    
        if Traj_ind[i]== '1': 
            liste = ["0", "1"]
            e = random.choices(liste, weights = (9,1), k = 1)
            Obs_ind.append(e)
        
#     print("Obs_ind", Obs_ind)
    Obs_ind = sum(Obs_ind, []) # pour transformer liste de listes en une seule et unique liste 
#     print("UNE seule liste --> Obs_ind", Obs_ind)
    
    for i in range (len(jetsRes)): 
        jetsRes[i][0]= Traj_ind[i]
        jetsRes[i][1]= Obs_ind[i]
    
                
#     print("jetsRes = ",jetsRes)

    
   
    return jetsRes



def printSimulation(resultat):
    for i in resultat : 
        print (S[i[0]], O[i[1]])
        
# T = 10 # pour tester !!
jetsRes = jets(T, pi0, Ei, Pij) 
# print(jetsRes)
printSimulation(jetsRes)


F T
F H
F H
F T
F H
F T
F H
F T
F H
F H
F H
F H
F H
F H
F T
F T
F T
F H
F T
F T
F H
F H
F T
F H
F H
F T
F H
F T
F H
F H
F T
F H
F T
F H
F H
F H
F H
F H
F T
F H
F H
F H
F T
F H
F T
F H
F T
F T
F H
F H
F T
F T
F T
F T
F T
F T
F T
F T
F H
F H
F H
F H
F H
F H
F H
F H
F H
F T
F T
F T
F T
F H
F T
F T
F T
F T
F T
F H
F H
F H
F H
F T
F T
F T
F H
F T
F H
F H
F H
F T
F T
F H
F T
F H
F H
F H
F T
F T
F H
F T
F H
F H
F T
F T
F H
F T
F H
F H
F T
F T
F T
F T
F H
F H
F T
F H
F H
F H
F T
F H
F H
F T
F H
F T
F H
F T
F H
F H
F H
F H
F T
F T
F T
F H
F H
F H
F H
F H
F T
F H
F T
F H
F T
F T
F H
F T
F H
F H
F T
F H
F T
F T
F T
F T
F H
F H
F T
F H
F T
F T
F H
F H
F H
F T
F T
F T
F T
F T
F H
F H
F H
F T
F T
F T
F T
F H
F T
F H
F H
F H
F T
F H
F T
F T
F H
F H
F H
F H
F H
F T
F T
F H
F H
F T
F H
F H
F H
F H
F T
F H
F H
F T
F T
F H
F T
F T
F T
F T
F T
F T
F H
F H
F H
F H
F T
F H
F T
F T
F H
F T
F T
F T
F T
F H
F T
F H
F H
F T
F T
F H
F H
F T
F T
F T
F H
F H
F H
U H
U H
U H
U H
U H
U H
U H
U H
U H
U H
U H
U H
U H


In [8]:
# A = np.zeros((6,2))
# # print(A, len(A))
# i,j = A.shape
# print("i=", i,"j=",j)

# val = 1 
# for j in range (j):
#     for i in range(i):
#         print(i,j)
#         A[i][j] = val 
#         val +=1
# A[5][1]=12.

# print("matrice A")
# print(A)
# print("A[:,1] --> ",A[:,1]) #pour n'imprimer que la colonne de droite
# print("A[:,0] --> ",A[:,0]) #pour n'imprimer que la colonne de gauche

<b>Exercice 2</b>: <u>Algorithme de Viterbi </u>: Écrire une fonction qui permet
de déterminer la séquence $(i^\star_t)_{t=0:T}$ d'états cachés
plus probable, ainsi que sa probabilité. Pour tester votre fonction utiliser le résultat de la 
simulation (2éme colonne) de la question 1. Comparer $(i^\star_t)_{t=0:T}$ avec
les vrais états cachés (1ère colonne de la simulation).


In [9]:
print(jetsRes)
print('---------')
print(Pij)
print('---------')
print(Ei)
print(Ei[1][0])
test = [0, 0, 1, 1, 1, 1, 0, 1, 0, 0]
print(Ei[1][test[0]])
print('---------')
print(pi0)

[[0 1]
 [0 0]
 [0 0]
 ...
 [0 0]
 [0 1]
 [0 0]]
---------
[[0.99 0.01]
 [0.05 0.95]]
---------
[[0.5 0.5]
 [0.9 0.1]]
0.9
0.9
---------
[0.999 0.001]


In [10]:
# Algorithme de Viterbi
import operator

def viterbi(jets, Pij, Ei, pi0, nS, nO, enLog):
    """
    Implement Viterbi algorithm
    input1 jets: matrix |2xT| containing simulations
    input4 Pij: transition probability matrix
    input3 Ei: emission probability matrix
    input4 pi0: initial condition
    input5 nS: number of states
    input6 nO: number of observations
    input7 enLog: bool, when True we apply log to avoid underflow
    output1 i_star: most problable path
    output2 prob: probability associated to the most problable path
    """
    
    obs = jets[:,1]
#     print(obs, type(obs))
#     print(list(obs))

    T = len(obs) #Nombre d'observations (longueur des observations)
    etat = jets[:,0]
    
    i_star = np.zeros((T))
    prob = 1
    path = np.zeros((T))
    
    from_F = pi0[0] * Ei[0][obs[0]]
    from_U = pi0[1] * Ei[1][obs[0]]
    i_star[0] = max(from_F, from_U)
    
    if path[0] == from_F : 
        i_star[0] = 0
    else: 
        i_star[0] = 1
    
        
    for i in range(1,T): 
        from_F = Ei[0][obs[i]] * max (from_F * Pij[0][0], from_U * Pij [1][0])
        from_U = Ei[1][obs[i]] * max (from_F * Pij[0][1], from_U * Pij [1][1])
#         print("VALEUR ===> ", from_F, from_U, "MAX = ", max(from_F, from_U))
        path[i]= max(from_F, from_U)
        
        if path[i] == from_F : 
            i_star[i] = 0
        else: 
            i_star[i] = 1
        
    prob = path[-1]
#     print("etat   = ",list(etat))
#     print("i_star = ", i_star)
    return i_star, prob

def analyseResultats(jets, estimation):    
    """
    Compare expected and obtained paths
    input1 jets: expected path
    input2 estimation: obtained path
    output1 error: percentage error
    """
    l = len(jets)
    faute = 0
    
    print(jets)
    print(estimation)
    
    etat = jets[:,0]
    
    for i in range (l): 
        if etat[i] != estimation [i]: 
            faute += 1
    error = (faute/l)*100
        
    return error

i_est, P_est = viterbi(jetsRes, Pij, Ei, pi0, 2, 2, False)
print("i_est", i_est)
print("P_est", P_est)
error = analyseResultats(jetsRes, i_est)
print('erreur d\'estimation de viterbi:')
print(error,'%')
print('Probabilité estimé:')
print(P_est)

print("=========================================================")
i_est2, P_est2 = viterbi(jetsRes, Pij, Ei, pi0, 2, 2, True)
error2 = analyseResultats(jetsRes, i_est2)
print('erreur d\'estimation de viterbi:')
print(error2,'%')

i_est [1. 0. 0. ... 0. 0. 0.]
P_est 0.0
[[0 1]
 [0 0]
 [0 0]
 ...
 [0 0]
 [0 1]
 [0 0]]
[1. 0. 0. ... 0. 0. 0.]
erreur d'estimation de viterbi:
14.399999999999999 %
Probabilité estimé:
0.0
[[0 1]
 [0 0]
 [0 0]
 ...
 [0 0]
 [0 1]
 [0 0]]
[1. 0. 0. ... 0. 0. 0.]
erreur d'estimation de viterbi:
14.399999999999999 %


### TEST sur les données du TD 

In [11]:
# les données pour le test 

#states
S_TEST = { 0:'H',1 :'C'}

#transition probability matrix
Pij_test = np.array([[0.7,0.3], [0.4,0.6]])

#emision symbols 
O_TEST = {0:'S', 1: 'M'}

#emission probability matrix
Ei_test = np.array([[0.1,0.4, 0.5], [0.7,0.2,0.1]]) #ça aurait dû être Eio


# initial Condition
pi0_test = np.array([0.6,0.4])

T_test = 2


In [12]:
jetsRes_test = jets(T_test, pi0_test, Ei_test, Pij_test) 
print(jetsRes_test)


i_est_TEST, P_est_TEST = viterbi(jetsRes_test, Pij_test, Ei_test, pi0_test, 2, 2, False)
print("i_est_TEST", i_est_TEST)
print("P_est_TEST", P_est_TEST)
error_TEST = analyseResultats(jetsRes_test, i_est_TEST)
print('erreur d\'estimation de viterbi:')
print(error_TEST,'%')
#print('Probabilité estimé:')
#print(p_est)


[[0 1]
 [0 0]]
i_est_TEST [1. 1.]
P_est_TEST 0.033600000000000005
[[0 1]
 [0 0]]
[1. 1.]
erreur d'estimation de viterbi:
100.0 %


In [17]:
import math
def viterbi(allx,Pi,A,B):
    """
    Parameters
    ----------
    allx : array (T,)
        Sequence d'observations.
    Pi: array, (K,)
        Distribution de probabilite initiale
    A : array (K, K)
        Matrice de transition
    B : array (K, M)
        Matrice d'emission matrix

    """
    ## Initialisation
    psi = np.zeros((len(A), len(allx))) # A = N
    psi[:,0]= -1
    delta = np.zeros((len(A), len(allx)))  # initialisation en dimension mais pas en contenu !
    
    
    ## Recursion
    #on parcourt la sequence avec les colonnes de delta avec t
    for t in range(1,len(allx)) :
        #on parcourt les lignes de delta avec j
        for j in range(len(A)) :
            
            #boucle pour calculer max𝑖𝛿𝑡−1(𝑖)+log𝑎𝑖𝑗
            liste = np.zeros(len(A))
            for i in range(len(A)) :
                if A[i,j] != 0 :
                    liste[i]= delta[i,t-1]  + np.log(A[i,j])
                else :
                    liste[i]= delta[i,t-1] + np.log(1e-10)
            if B[j,int(allx[t])] != 0 :
                delta[j,t] = np.max(liste) + np.log(B[j,int(allx[t])])
            else :
                delta[j,t] = np.max(liste) + np.log(1e-10)
            psi[j,t] = np.argmax(liste)
            
    ## Terminaison
    S = np.zeros(len(allx))
    S[len(allx)-1] = np.argmax(delta[:,(len(allx)-1)])

    ## chemin    
    for i in range(len(allx)-2,-1,-1):
        S[i] =psi[int(S[i+1]),i]
    
    
    return  S, psi, delta


In [23]:
obs = jetsRes[:,1]
etat_predits, psi, delta=viterbi(obs,pi0, Pij, Ei)
print(psi[:,0:25])
print(delta[:,0:25])
print(etat_predits[0:50])

[[-1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.]
 [-1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.
   1.  0.  0.  1.  1.  0.  1.]]
[[  0.          -0.70319752  -1.40639503  -2.10959255  -2.81279007
   -3.51598758  -4.2191851   -4.92238261  -5.62558013  -6.32877765
   -7.03197516  -7.73517268  -8.4383702   -9.14156771  -9.84476523
  -10.54796275 -11.25116026 -11.95435778 -12.6575553  -13.36075281
  -14.06395033 -14.76714784 -15.47034536 -16.17354288 -16.87674039]
 [  0.          -0.15665381  -0.31330762  -2.66718601  -2.82383982
   -5.1777182   -5.33437201  -7.6882504   -7.84490421  -8.00155802
   -8.15821183  -8.31486564  -8.47151945  -8.62817326 -10.98205165
  -13.33593004 -15.68980842 -15.84646223 -18.20034062 -19.56531057
  -18.07128351 -18.22793732 -20.58181571 -20.18087606 -20.33752987]]
[-1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0

<b>Exercice 3</b>: <u>Estimation des paramètres</u>
<br>
3.1) Écrire une fonction qu'utilise tous les résultats de la simulation
(états et symboles) pour compter les nombres d'occurrence $N_{ij}$ est $M_{iO}$ définis en cours. Estimer $P_{ij}$ est $E_i(O)$. Attention, pour éviter les probabilités à zéro nous allons utiliser les pseudo-count.

In [13]:
# Estimation de Parametres par contage
def nombresOccurrence(jets, nS, nO):
    """
    Parameter estimation
    input1 jets: matrix |2xT| containing data
    input2 nS: number of states
    input3 nO: number of observations
    output1 Nij: transition probability matrix
    output2 Mio: emission probability matrix
    output3 pi0: initial condition 
    """

    Nij = np.ones((nS,nS)) #pseudo-count = 1
    Mio = np.ones((nS,nO))  #pseudo-count = 1
    pi0 = np.ones((nS))


    
    return Nij, Mio, pi0

Nij, Mio, pi0 = nombresOccurrence(jetsRes, 2, 2)

print('Pij estimé:')
print(Nij)
print('\nEio estimé:')
print(Mio)
print('\npi0 estimé:')
print(pi0)

Pij estimé:
[[1. 1.]
 [1. 1.]]

Eio estimé:
[[1. 1.]
 [1. 1.]]

pi0 estimé:
[1. 1.]


3.2) <u> Viterbi training </u>: Écrire une fonction qui utilise
seulement la séquence $(O_t)_{t=0:T}$ (2emme colonne de la simulation) pour estimer les
paramètres $P_{ij}$ est $Ei(O)$. On s’arrêtera quand les différences entre les logVraissamblance est inférieur à 1e-04. Comparer les résultats de 3.1 et de 3.2 (3.2 avec plusieurs restarts,
et avec initialisation des paramètres aléatoire).


In [14]:
import matplotlib.pyplot as plt

# Initialisation aleatoire de Pij, Eij, pi0
def InititRandom(nS, nO):
    """
    randomly initialisations 
    input1 nS: number of states
    input2 nO: number of observations
    output1 Pij_init: transition probability matrix
    output2 Ei_init: emission probability matrix
    output3 pi0_init: initial condition 
    """
    random.seed(10)


    return Pij_init, Ei_init, pi0_init

# Calcule log Vraissamblance
def logLikelhihood(Aij,Bij,pi0,jets):
    """
    compute Log Likelihood 
    input1 Pij: transition probability matrix
    input2 Ei: emission probability matrix
    input3 pi0: initial condition 
    input4 jets: matrix |2xT| containing data
    """
    etat_seq = jets[:,0]
    obs_seq = jets[:,1]
    T = len(obs_seq) #Nombre d'observations (longueur des observations)
    lLikelihood = np.log(pi0[etat_seq[0]])


    return lLikelihood


def Training(jets, nS, nO):
    """
    Viterbi Training
    input1 jets: matrix |2xT| containing data
    input2 nS: number of states
    input3 nO: number of observations
    output1 Pij_est: transition probability matrix
    output2 Ei_est: emission probability matrix
    output3 pi0_est: initial condition 
    output4 lLikelihood: log Likelihood
    """
    jets_est = np.array(jets)    
    Pij_est, Ei_est, pi0_est = InititRandom(nS, nO)
    
    nIteration = 10000
    criterion = 1e-04
    lLikelihood = np.zeros((nIteration))
   


    return Pij_est, Ei_est, pi0_est, lLikelihood
    
#imprimer les Parametres du Viterbi Training
Pij_est, Ei_est, pi0_est, lLikelihood = Training(jetsRes, 2, 2)
itCount = len(lLikelihood)
print('Le modèle est convergé après '+str(itCount)+' itérations.')
print('\nPij estimée:')
print(Pij_est)
print('\nEij estimée:')
print(Ei_est)

NameError: name 'Pij_init' is not defined

<font color="blue">
Remark:
</font>

3.3) <u>Viterbi training deuxième version</u>. 
<BR>Écrivez une version de 3.2 qui:
- part plusieurs fois (100x) d'une initialisation aléatoire desparamètres de l'HMM,
- utilise Viterbi training pour estimer les paramètres,
- calcul la log-vraisemblance pour les paramètres estimés,
- sauvegarde seulement l'estimation avec la valeur maximale de lalog-vraisemblance.

Qu'est-ce que vous observez?



In [None]:
# Viterbi Training  deuxiemme version

def TrainingV2(jets, nS, nO, nIterat=100):
    """
    Viterbi Training version 2.0
    input1 jets: matrix |2xT| containing data
    input2 nS: number of states
    input3 nO: number of observations
    input4 nIterat: number of iterations
    output1 Pij_best: best transition probability matrix
    output2 Ei_best: best emission probability matrix
    output3 pi0_best: best initial condition 
    output4 lLikelihood_best: best log Likelihood
    """

    Pij_best = []
    Ei_best = []
    pi0_best = []
    lLikelihood_best = -10000
    

    return Pij_best, Ei_best, pi0_best, lLikelihood_best
    

# Imprimer les Parametres du Viterbi Training deuxiemme version
nIterat = 100
Pij_best, Ei_best, pi0_best, lLikelihood_best = TrainingV2(jetsRes, 2, 2, nIterat)

print('Meilleur Pij estimée:');
print(Pij_best)
print('\nMeilleur Eij estimée:')
print(Ei_best)

<font color="blue">
Remark:
</font>