### ENSIAS, Mohamed V University
# <center> Hidden Markov Models : Project  </center> 
### <center> Branch 2IA - second year</center> 
### <center>Group: Mohammed NECHBA, and Mohamed MOUHAJIR</center> 
### <center>Prof. Naoum Mohamed</center> 

In [23]:
import numpy as np
from sympy import *

## 1. Programmer l algorithme de Baum welch ainsi que ses fonctions intermédiaires

>### Algorithme Forward

The $\alpha$ equation is: 
$$\alpha_t(s) = P(o_1,o_2,...,o_t, S_t = s) $$

For t=1, the $\alpha$ equation is: 
 $$\alpha_1(s) = P(o_1, S_1 = s) = P(o_1|S_1=s)P(S_1=s) = q_{s,o_1}b_0(s)$$

For $t=2,...,T$, the $\alpha$ equation is: 
$$\alpha_t(s) = q_{s,o_t}\sum_{s'\in S} P_{s',s}\alpha_{t-1}(s')$$

In [24]:
def forward(sequence, transition_matrix, emission_matrix, observation, start=None):
    num_states = transition_matrix.shape[0]
    T = len(observation)
    alpha = np.zeros((num_states, T))
    
    if start.all()!=None:
        alpha[:, 0] = start
    else:
        alpha[:, 0] = emission_matrix[:, observation[0]]
    for t in range(1, T):
        for j in range(num_states):
            alpha[j, t] = emission_matrix[j, observation[t]] * np.sum(transition_matrix[:, j] * alpha[:, t-1])
    return alpha

>>#### Exemple

![image.png](attachment:image.png)

In [25]:
sequence = [0,1,2,3,4]
transition_matrix = np.array([[0.75,0.25,0,0,0],
                             [0.25,0.5,0.25,0,0],
                             [0,0.25,0.5,0.25,0],
                             [0,0,0.25,0.5,0.25],
                             [0,0,0,0.25,0.75]])
emission_matrix = np.array([[0,0,0,1,0],
                           [0,0,1,0,0],
                           [0,0,1,0,0],
                            [0,0,1,0,0],
                           [0,0,0,1,0]])
start =np.array([0,0,1,0,0])

![image.png](attachment:image.png)

In [26]:
observation=[2,2,2]
arr = forward(sequence, transition_matrix, emission_matrix, observation, start)
Matrix(arr)

Matrix([
[  0,    0,     0],
[  0, 0.25,  0.25],
[1.0,  0.5, 0.375],
[  0, 0.25,  0.25],
[  0,    0,     0]])

In [27]:
observation=[2,2,3]
arr = forward(sequence, transition_matrix, emission_matrix, observation, start)
Matrix(arr)

Matrix([
[  0,    0, 0.0625],
[  0, 0.25,      0],
[1.0,  0.5,      0],
[  0, 0.25,      0],
[  0,    0, 0.0625]])

![image.png](attachment:image.png)

In [28]:
observation=[2,2,3,2,3,2,2,2,3,3]
arr = forward(sequence, transition_matrix, emission_matrix, observation, start)
Matrix(arr)

Matrix([
[  0,    0, 0.0625,        0, 0.00390625,            0,             0,               0, 9.1552734375e-5, 6.866455078125e-5],
[  0, 0.25,      0, 0.015625,          0, 0.0009765625, 0.00048828125, 0.0003662109375,               0,                 0],
[1.0,  0.5,      0,        0,          0,            0, 0.00048828125,   0.00048828125,               0,                 0],
[  0, 0.25,      0, 0.015625,          0, 0.0009765625, 0.00048828125, 0.0003662109375,               0,                 0],
[  0,    0, 0.0625,        0, 0.00390625,            0,             0,               0, 9.1552734375e-5, 6.866455078125e-5]])

>### Algorithme Backward

The $\beta$ equation is: 
$$ \beta_t(s) = P(o_{t+1}...o_T| S_t = s) $$

 $$\beta_T(s) = 1$$

For $t=T-1,...,1$, the $\beta$ equation is: 
$$\beta_t(s) = \sum_{s'\in S} P_{s,s'}q_{s',o_{t+1}}\beta_{t+1}(s')$$

In [29]:
def backward(sequence, transition_matrix, emission_matrix, observation):
    num_states = transition_matrix.shape[0]
    T = len(observation)
    beta = np.zeros((num_states, T))
    beta[:, T-1] = 1
    for t in reversed(range(T-1)):
        for j in range(num_states):
            beta[j, t] = np.sum(transition_matrix[j, :] * emission_matrix[:, observation[t+1]] * beta[:, t+1])
    return beta

>>#### Exemple

![image.png](attachment:image.png)

In [30]:
observation=[2,2,3,2,3,3]
arr = backward(sequence, transition_matrix, emission_matrix, observation)
Matrix(arr)

Matrix([
[0.0029296875, 0.03515625, 0.046875, 0.5625, 0.75, 1.0],
[ 0.005859375, 0.01171875,  0.09375, 0.1875, 0.25, 1.0],
[ 0.005859375,          0,  0.09375,      0,    0, 1.0],
[ 0.005859375, 0.01171875,  0.09375, 0.1875, 0.25, 1.0],
[0.0029296875, 0.03515625, 0.046875, 0.5625, 0.75, 1.0]])

>### Algorithme forward-backward

* Calculez tous les $\alpha_t(s)$, en utilisant l'algorithme Forward
* Calculez tous les $\beta_t(s)$, en utilisant l'algorithme Backward
* Pour tout $s \in S$ et $t \in {1,. . . T}$:
$$ P(S_1=s|o_1,...,o_T) = \frac{P(o_1,...o_t,S_t=s)P(o_{t+1},...o_T|S_t=s)}{P(o_1,...,o_T)}=\frac{\alpha_t(s)\beta_t(s)}{\sum_{s'\in S}\alpha_T(s)}$$

In [31]:
def forward_backward(sequence, transition_matrix, emission_matrix, observation, start = None):
    # Initialisation des paramètres
    num_states = transition_matrix.shape[0]
    T = len(observation)
    
    # Etape de forward
    forward_probs = forward(sequence, transition_matrix, emission_matrix, observation, start)
    # Etape de backward
    backward_probs = backward(sequence, transition_matrix, emission_matrix, observation)
    
    observation_probability = np.multiply(forward_probs, backward_probs)/ np.sum(forward_probs[:, T-1])
    
    return observation_probability
    

>>#### Exemple

![image.png](attachment:image.png)

In [32]:
observation = [2,2,3,2,3,3]
arr = forward_backward(sequence, transition_matrix, emission_matrix, observation, start)
Matrix(arr)

Matrix([
[  0,   0, 0.5,   0, 0.5, 0.5],
[  0, 0.5,   0, 0.5,   0,   0],
[1.0,   0,   0,   0,   0,   0],
[  0, 0.5,   0, 0.5,   0,   0],
[  0,   0, 0.5,   0, 0.5, 0.5]])

>### Algorithme de Baum welch

**1.** Initialiser les matrices $T, Q$ et $\pi_i$, en normalisant les lignes pour que chaque ligne somme à 1.

**2.** Répéter pour un nombre prédéfini d'itérations:

   * <span style="color:red">Pour chaque séquence d'observation $O$ dans l'ensemble d'entraînement:</span>
   
       **i.** Calculer $\alpha$ en utilisant l'algorithme de forward.
       
       **ii.** Calculer $\beta$ en utilisant l'algorithme de backward.
       
       **iii.** Calculer les variables $\gamma_{t}(i,j)$ et $\xi_{t}(i,j)$ pour chaque état $i$ et $j$, et pour chaque instant $t$ de la séquence d'observation $O$ en utilisant les équations suivantes:

  $$\gamma_{t}(i) = \frac{\alpha_{t}(i)\beta_{t}(i)}{P(O|\lambda)}$$
  
  $$\xi_{t}(i,j) = \frac{\alpha_{t}(i)T_{ij}Q_{j}(O_{t+1})\beta_{t+1}(j)}{P(O|\lambda)}$$
   
       **iv.** Mettre à jour les matrices $T, Q$ et $\pi_i$ à partir des variables $\gamma$ et $\xi$ en utilisant les équations suivantes:
        $$T_{ij} = \frac{ \sum_{t=1}^{T-1} \xi_{t}(i,j)}{\sum_{t=1}^{T-1} \sum_{m}  \xi_{t}(i,m)}$$

        $$Q_{jk} = \frac{ \sum_{t=1}^{T} \gamma_{t}(j) \delta_{o_t,k}}{ \sum_{t=1}^{T} \gamma_{t}(j)}$$

        $$\pi_{i} = \frac{ \gamma_{1}(i)}{\sum_{O} 1}$$

In [33]:
def initialize_random_matrices(num_states, num_symbols):
    # Transition matrix
    transition_matrix = np.random.rand(num_states, num_states)
    transition_matrix /= np.sum(transition_matrix, axis=1, keepdims=True)

    # Emission matrix
    emission_matrix = np.random.rand(num_states, num_symbols)
    emission_matrix /= np.sum(emission_matrix, axis=1, keepdims=True)

    # Initial state probabilities
    pi = np.random.rand(num_states)
    pi /= np.sum(pi)

    return transition_matrix, emission_matrix, pi

In [190]:
def baum_welch(training_set, num_states, num_symbols, num_iterations):
    # Initialisation des paramètres
    transition_matrix, emission_matrix, pi = initialize_random_matrices(num_states, num_symbols)
    sequence = list(range(num_states))
    
    # Boucle d'apprentissage
    for iteration in range(num_iterations):
         for observation in training_set:
            # Initialisation
            T = len(observation)
            
            # Forward algorithm
            alpha = forward(sequence, transition_matrix, emission_matrix, observation, start = pi)
            
            # Backward algorithm
            beta = backward(sequence, transition_matrix, emission_matrix, observation)
            
            # Compute the element-wise product between the alpha and beta values.
            prod = alpha*beta
            
            # Compute gamma
            gamma = prod/np.sum(prod,axis=0, keepdims=True)
            
            # Compute xi
            xi = np.zeros((T-1, num_states, num_states))
            for t in range(T-1):
                s = np.sum(alpha * beta, axis=0, keepdims=True)[0][t]
                if s:
                    for i in range(num_states):
                        for j in range(num_states):
                                xi[t, i, j] = (alpha[i,t] * transition_matrix[i,j] *emission_matrix[j, observation[t+1]] * beta[j,t+1]) /s 

            # Update transition matrix
            for i in range(num_states):
                for j in range(num_states):
                    transition_matrix[i, j] = np.sum(xi[:, i, j])/np.sum(xi[:, i, :])
            
            # Update emission matrix
            for j in range(num_states):
                for k in range(T):
                    mask = [observation[k]==e for e in observation] 
                    #print( mask)
                    emission_matrix[j, k] = np.sum(gamma[j,:] * mask) / np.sum(gamma[j,:])
            
            # Update initial state probabilities
            pi = np.sum(gamma[0, :]) / len(training_set)
    return transition_matrix, emission_matrix, pi

In [159]:
# Example DNA sequences
training_set = [
    [1, 1, 2, 3, 4, 2, 1, 3, 4],  # Sequence 1
    [3, 2, 1, 4, 3, 2, 1, 4, 3]   # Sequence 2
]

num_states = 4  # Number of hidden states (e.g., functional elements in DNA)
num_symbols = 9  # Number of observed symbols (e.g., A, C, G, T)

num_iterations = 100  # Number of iterations for the Baum-Welch algorithm

# Apply the Baum-Welch algorithm
transition_matrix, emission_matrix, pi = baum_welch(training_set, num_states, num_symbols, num_iterations)

# Print the learned parameters
print("Learned Transition Matrix:")
print(transition_matrix)
print("Learned Emission Matrix:")
print(emission_matrix)
print("Learned Initial State Probabilities:")
print(pi)

Learned Transition Matrix:
[[3.17752398e-001 5.28664428e-042 6.82247602e-001 3.84457880e-109]
 [5.54366283e-006 0.00000000e+000 9.99994456e-001 0.00000000e+000]
 [4.44099872e-001 5.55900128e-001 8.76694382e-066 2.90661713e-283]
 [1.00000000e+000 0.00000000e+000 1.66158267e-184 0.00000000e+000]]
Learned Emission Matrix:
[[7.21479749e-002 4.83459052e-001 1.23219305e-001 3.21173668e-001
  7.21479749e-002 4.83459052e-001 1.23219305e-001 3.21173668e-001
  7.21479749e-002]
 [1.59782981e-003 3.62458186e-001 9.68438211e-008 6.35943888e-001
  1.59782981e-003 3.62458186e-001 9.68438211e-008 6.35943888e-001
  1.59782981e-003]
 [5.92875943e-001 9.96325439e-004 4.06125463e-001 2.26933665e-006
  5.92875943e-001 9.96325439e-004 4.06125463e-001 2.26933665e-006
  5.92875943e-001]
 [1.00000000e+000 6.87887599e-320 2.84504326e-108 3.86223592e-217
  1.00000000e+000 6.87887599e-320 2.84504326e-108 3.86223592e-217
  1.00000000e+000]]
Learned Initial State Probabilities:
1.4248325289601063



**1.** Initialiser les matrices de transition et d'émission aléatoirement, en normalisant les lignes pour que chaque ligne somme à 1.

**2.** Répéter pour un nombre prédéfini d'itérations:
   * <span style="color:red">Initialiser les variables:</span>\
      **i.** $N$ : le nombre de séquences d'observations dans l'ensemble d'entraînement
      
      **ii.** $\gamma_{t}(i)$ : la probabilité d'être dans l'état $i$ à l'instant $t$, sachant l'ensemble des observations et le modèle $\lambda = (T, Q)$, où $T$ est la matrice de transition et $Q$ est la matrice d'émission.\
      $$ \gamma_{t}(i) = \frac{P(X_t=i, O| \lambda)}{P(O|\lambda)}= \frac{\alpha_t(i)\beta_t(i)}{\sum_i \alpha_t(i)\beta_t(i)} $$
      
      **iii.** $\xi_{t}(i,j)$ : la probabilité d'être dans l'état $i$ à l'instant $t$ et dans l'état $j$ à l'instant $t+1$, sachant l'ensemble des observations et le modèle $\lambda = (T, Q)$, où $T$ est la matrice de transition et $Q$ est la matrice d'émission.
      $$\xi_{t}(i,j)=P(X_t=i,X_{t+1}=j|O,\lambda)=\frac{P(X_t=i,X_{t+1}=j,O|\lambda)}{P(O|\lambda)}=\frac{\alpha_t(i)POIDS_t(i,j) \beta_{t+1}(j)}{\sum_i \alpha_{t+1}(i)\beta_{t+1}(i)}$$
      **iv.** $P(O|\lambda)$ : la probabilité totale de l'ensemble des observations donné le modèle $\lambda = (T, Q)$.\
      **v.** $T_{ij}$ : le nombre de transitions de l'état $i$ à l'état $j$.\
      **vi.** $Q_{jk}$ : le nombre de fois que l'état $j$ a émis le symbole $k$.\
      **vii.** $\pi_i$ : la probabilité d'être dans l'état initial $i$.
   * <span style="color:red">Pour chaque séquence d'observation $O$ dans l'ensemble d'entraînement:</span>\
        **i.** Calculer la probabilité de l'observation donné le modèle courant $\lambda$ en utilisant l'algorithme de forward et backward: $P(O|\lambda) = \sum_{i=1}^{N} \alpha_{t}(i)\beta_t(i)$, où $\alpha_{t}(i)$ est la probabilité de l'observation partielle jusqu'à l'instant $t$ et d'être dans l'état $i$ à l'instant $t$, sachant le modèle $\lambda$, et $\beta_t(i)$ est la probabilité de l'observation partielle de l'instant $t+1$ jusqu'à l'instant $T+1$, sachant le modèle $\lambda$,.
        
        **ii.** Calculer les variables $\gamma_{t}(i,j)$ et $\xi_{t}(i,j)$ pour chaque état $i$ et $j$, et pour chaque instant $t$ de la séquence d'observation $O$ en utilisant les équations suivantes:

  $$\gamma_{t}(i) = \frac{\alpha_{t}(i)\beta_{t}(i)}{P(O|\lambda)}$$
  
  $$\xi_{t}(i,j) = \frac{\alpha_{t}(i)T_{ij}Q_{j}(O_{t+1})\beta_{t+1}(j)}{P(O|\lambda)}$$
   * <span style="color:red">Mettre à jour les matrices de transition et d'émission à partir des variables $\gamma$ et $\xi$ calculées pour chaque séquence d'observation $O$:</span>

        **i.** $$T_{ij} = \frac{ \sum_{t=1}^{T-1} \xi_{t}(i,j)}{\sum_{t=1}^{T-1} \sum_{m}  \xi_{t}(i,m)}$$

        **ii.** $$Q_{jk} = \frac{ \sum_{t=1}^{T} \gamma_{t}(j) \delta_{o_t,k}}{ \sum_{t=1}^{T} \gamma_{t}(j)}$$

        **iii.** $$\pi_{i} = \frac{ \gamma_{1}(i)}{\sum_{O} 1}$$

In [None]:
def baum_welch(observations, num_states, num_symbols, num_iterations):
    # Initialisation des paramètres
    transition_matrix = np.random.rand(num_states, num_states)
    emission_matrix = np.random.rand(num_states, num_symbols)
    transition_matrix /= np.sum(transition_matrix, axis=1, keepdims=True)
    emission_matrix /= np.sum(emission_matrix, axis=1, keepdims=True)
    
    # Boucle d'apprentissage
    for iteration in range(num_iterations):
        # Initialisation des variables
        num_sequences = len(observations)
        state_posterior = np.zeros((num_sequences, num_states, len(observations[0])))
        observation_probability = np.zeros((num_sequences,))
        transition_count = np.zeros((num_states, num_states))
        emission_count = np.zeros((num_states, num_symbols))
        state_count = np.zeros((num_states,))
        
        # Etape de rétro-propagation
        for i in range(num_sequences):
            sequence = observations[i]
            forward_probs = forward(sequence, transition_matrix, emission_matrix)
            backward_probs = backward(sequence, transition_matrix, emission_matrix)
            state_posterior[i] = np.multiply(forward_probs, backward_probs)
            observation_probability[i] = np.sum(state_posterior[i][0])
            state_posterior[i] /= observation_probability[i]
            
        # Etape de mise à jour des paramètres
        for i in range(num_sequences):
            sequence = observations[i]
            T = len(sequence)
            for t in range(T - 1):
                transition_count += np.outer(state_posterior[i][:,t], state_posterior[i][:,t+1])
            emission_count += np.dot(state_posterior[i], sequence.T)
            state_count += state_posterior[i][:,0]
            
        transition_matrix = transition_count / np.sum(transition_count, axis=1, keepdims=True)
        emission_matrix = emission_count / np.sum(emission_count, axis=1, keepdims=True)
        pi = state_count / np.sum(state_count)
    
    # Retourner le modèle HMM entraîné
    return transition_matrix, emission_matrix, pi