# Beaum Welch Algorithm Project

IMRANE OU EL FAQUIR Filière IIA

In [17]:
import numpy as np
import pandas as pd

`` p `` : transition probability

`` q ``: emission probability

`` observ_seq ``: observed sequence

`` b_0 `` Initial state

In [18]:
# b_0 est l'état initial 
b_0 = np.array([0.0, 0.0, 1.0, 0.0, 0.0])
# q c'est la probabilité d'émission
q   = np.array([[ 0, 0, 0, 1, 0],
                [ 0, 0, 1, 0, 0],
                [ 0, 0, 1, 0, 0],
                [ 0, 0, 1, 0, 0],
                [ 0, 0, 0, 1, 0]])
# p c'est la probabilité de transition
p   = np.array([[ 0.75, 0.25, 0.00, 0.00, 0.00],
                [ 0.25, 0.75, 0.25, 0.00, 0.00],
                [ 0.00, 0.25, 0.75, 0.25, 0.00],
                [ 0.00, 0.00, 0.25, 0.75, 0.25],
                [ 0.00, 0.00, 0.00, 0.25, 0.75]])
# observ_seq c'est la séquence d'observation o = (o_1, o_2, ... , o_n)
observ_seq = np.array([ 2, 2, 3, 2, 3, 2, 2, 2, 3, 3])

## Forward Algorithm

In [19]:
def forward(observ_seq, p, q, b_0):
    # Initialisation de  alpha_1(s) = q_(s,o_1) * b_0(s)
    alpha = np.zeros((p.shape[0], observ_seq.shape[0]))
    alpha[:,0] = b_0*q[:,observ_seq[0]]
    # calcul des  alpha_t(s) = q_(s,o_t) sum p(s', s) * alpha_(t-1) (s')
    for t in range(1, observ_seq.shape[0]):
        for s in range(q.shape[0]):
            alpha[s, t] = q[s, observ_seq[t]] * np.dot(p[:, s], alpha[:, t-1])
    return alpha

In [20]:
alpha = forward(observ_seq, p, q, b_0)
pd.DataFrame(alpha)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,0.0,0.0,0.0625,0.0,0.003906,0.0,0.0,0.0,0.000168,0.000126
1,0.0,0.25,0.0,0.015625,0.0,0.000977,0.000732,0.000671,0.0,0.0
2,1.0,0.75,0.0,0.0,0.0,0.0,0.000488,0.000732,0.0,0.0
3,0.0,0.25,0.0,0.015625,0.0,0.000977,0.000732,0.000671,0.0,0.0
4,0.0,0.0,0.0625,0.0,0.003906,0.0,0.0,0.0,0.000168,0.000126


## Backward Algorithm

In [21]:
def backward(observ_seq, p, q):
    # Initialisation de beta_T (s) = 1
    beta = np.zeros((observ_seq.shape[0], p.shape[0]))
    beta[observ_seq.shape[0] - 1] = np.ones((p.shape[0]))
    # calcul des  beta_t(s) = sum p(s, s') *  q(s', o_(t+1)) *beta_(t+1) (s')
    for t in range(observ_seq.shape[0] - 2, -1, -1):
        for s in range(q.shape[0]):
            beta[t, s] = np.dot(beta[t + 1] * q[:, observ_seq[t + 1]], p[s, :])
    return beta

In [22]:
beta = backward(observ_seq, p, q)
pd.DataFrame(beta.T)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,0.000126,0.001511,0.002014,0.02417,0.032227,0.035156,0.046875,0.5625,0.75,1.0
1,0.000378,0.000504,0.006042,0.008057,0.131836,0.128906,0.140625,0.1875,0.25,1.0
2,0.000252,0.0,0.004028,0.0,0.169922,0.140625,0.09375,0.0,0.0,1.0
3,0.000378,0.000504,0.006042,0.008057,0.131836,0.128906,0.140625,0.1875,0.25,1.0
4,0.000126,0.001511,0.002014,0.02417,0.032227,0.035156,0.046875,0.5625,0.75,1.0


## Baum Welch Algorithm

In [136]:
def baum_welch(observ_seq, p, q, b_0, max_iter=100):
    for _ in range(max_iter):
        alpha = forward(observ_seq, p, q, b_0)
        beta = backward(observ_seq, p, q)
        epsilon = np.zeros((p.shape[0], p.shape[0], len(observ_seq) - 1))
        for t in range(len(observ_seq) - 1):
            epsilon_1 = np.dot(np.dot(alpha[:, t].T, p) * q[:, observ_seq[t + 1]].T, beta[t + 1, :])
            for i in range(p.shape[0]):
                epsilon_2 = alpha[i, t] * p[i, :] * q[:, observ_seq[t + 1]].T * beta[t + 1, :].T
                epsilon[i, :, t] = epsilon_2 / epsilon_1
        gamma = np.sum(epsilon, axis=1)
        p = np.sum(epsilon, 2) / np.sum(gamma, axis=1).reshape((-1, 1))
        gamma = np.hstack((gamma, np.sum(epsilon[:, :, len(observ_seq) - 2], axis=0).reshape((-1, 1))))
        for l in range(q.shape[1]):
            q[:, l] = np.sum(gamma[:, observ_seq == l], axis=1)
        q = np.divide(q, np.sum(gamma, axis=1).reshape((-1, 1)))
    return {"Transition":p, "Emission":q}

In [139]:
baum = baum_welch(observ_seq, p, q, b_0, max_iter = 100)

In [140]:
baum['Transition']

array([[0.33333333, 0.66666667, 0.        , 0.        , 0.        ],
       [0.75      , 0.        , 0.25      , 0.        , 0.        ],
       [0.        , 0.5       , 0.        , 0.5       , 0.        ],
       [0.        , 0.        , 0.25      , 0.        , 0.75      ],
       [0.        , 0.        , 0.        , 0.66666667, 0.33333333]])

In [141]:
baum['Emission']

array([[0., 0., 0., 1., 0.],
       [0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0.],
       [0., 0., 0., 1., 0.]])

In [147]:
class BaumWelch():
    def __init__(self, observ_seq, p, q, b_0, max_iter = 100):
        self.observ_seq = observ_seq
        self.p = p
        self.q = q
        self.b_0 = b_0
        self.max_iter = max_iter
    def forward(self):
        # Initialisation de  alpha_1(s) = q_(s,o_1) * b_0(s)
        alpha = np.zeros((self.p.shape[0], self.observ_seq.shape[0]))
        alpha[:,0] = self.b_0*self.q[:,self.observ_seq[0]]
        # calcul des  alpha_t(s) = q_(s,o_t) sum p(s', s) * alpha_(t-1) (s')
        for t in range(1, self.observ_seq.shape[0]):
            for s in range(self.q.shape[0]):
                alpha[s, t] = self.q[s, self.observ_seq[t]] * np.dot(self.p[:, s], alpha[:, t-1])
        return alpha
    def backward(self):
        # Initialisation de beta_T (s) = 1
        beta = np.zeros((self.observ_seq.shape[0], self.p.shape[0]))
        beta[self.observ_seq.shape[0] - 1] = np.ones((self.p.shape[0]))
        # calcul des  beta_t(s) = sum p(s, s') *  q(s', o_(t+1)) *beta_(t+1) (s')
        for t in range(self.observ_seq.shape[0] - 2, -1, -1):
            for s in range(self.q.shape[0]):
                beta[t, s] = np.dot(beta[t + 1] * self.q[:, self.observ_seq[t + 1]], self.p[s, :])
        return beta
    def fit(self):
        for _ in range(self.max_iter):
            alpha = self.forward()
            beta = self.backward()
            epsilon = np.zeros((self.p.shape[0], self.p.shape[0], len(self.observ_seq) - 1))
            for t in range(len(self.observ_seq) - 1):
                epsilon_1 = np.dot(np.dot(alpha[:, t].T, self.p) * self.q[:, self.observ_seq[t + 1]].T, beta[t + 1, :])
                for i in range(self.p.shape[0]):
                    epsilon_2 = alpha[i, t] * self.p[i, :] * self.q[:, self.observ_seq[t + 1]].T * beta[t + 1, :].T
                    epsilon[i, :, t] = epsilon_2 / epsilon_1
            gamma = np.sum(epsilon, axis=1)
            self.p = np.sum(epsilon, 2) / np.sum(gamma, axis=1).reshape((-1, 1))
            gamma = np.hstack((gamma, np.sum(epsilon[:, :, len(observ_seq) - 2], axis=0).reshape((-1, 1))))
            for l in range(self.q.shape[1]):
                self.q[:, l] = np.sum(gamma[:, self.observ_seq == l], axis=1)
            self.q = np.divide(self.q, np.sum(gamma, axis=1).reshape((-1, 1)))
        return {"Transition":self.p, "Emission":self.q}

In [148]:
baumbaum = BaumWelch(observ_seq, p, q, b_0, max_iter = 100)

In [149]:
baumbaum.fit()

{'Transition': array([[0.33333333, 0.66666667, 0.        , 0.        , 0.        ],
        [0.75      , 0.        , 0.25      , 0.        , 0.        ],
        [0.        , 0.5       , 0.        , 0.5       , 0.        ],
        [0.        , 0.        , 0.25      , 0.        , 0.75      ],
        [0.        , 0.        , 0.        , 0.66666667, 0.33333333]]),
 'Emission': array([[0., 0., 0., 1., 0.],
        [0., 0., 1., 0., 0.],
        [0., 0., 1., 0., 0.],
        [0., 0., 1., 0., 0.],
        [0., 0., 0., 1., 0.]])}