# 1. Cài đặt thuật toán tiến trước, thuật toán Viterbi, và thuật toán Baum-Welch.

In [48]:
import numpy as np
import pandas as pd

## 1.1. Thuật toán tiến trước

[https://github.com/adeveloperdiary/HiddenMarkovModel/blob/master/part2/forward.py](https://github.com/adeveloperdiary/HiddenMarkovModel/blob/master/part2/forward.py)

In [49]:
def forward_algorithm(observation, transition_prob, emission_prob, initial_distribution):
    alpha = np.zeros((observation.shape[0], transition_prob.shape[0]))

    alpha[0, :] = initial_distribution * emission_prob[:, observation[0]]

    for i in range(1, observation.shape[0]):
        for j in range(transition_prob.shape[0]):
            alpha[i, j] = alpha[i - 1].dot(transition_prob[:, j]) * emission_prob[j, observation[i]]

    return alpha

## 1.2. Thuật toán Viterbi

[https://github.com/adeveloperdiary/HiddenMarkovModel/blob/master/part4/Viterbi.py](https://github.com/adeveloperdiary/HiddenMarkovModel/blob/master/part4/Viterbi.py)

In [50]:
def viterbi_algorithm(observation, transition_prob, emission_prob, initial_distribution):
    T = observation.shape[0]
    M = transition_prob.shape[0]

    omega = np.zeros((T, M))
    omega[0, :] = np.log(initial_distribution * emission_prob[:, observation[0]])

    prev = np.zeros((T - 1, M))

    for i in range(1, T):
        for j in range(M):
            # the same as forward probability
            probability = omega[i - 1] + np.log(transition_prob[:, j]) + np.log(emission_prob[j, observation[i]])

            # the most probable state given previous state at time i    (1)
            prev[i - 1, j] = np.argmax(probability)

            # the probability of the most probable state                (2)
            omega[i, j] = np.max(probability)
    
    # path array
    path = np.zeros(T)

    # the most probable state at the last time step
    last_state = np.argmax(omega[T - 1, :])
    path[0] = last_state

    backtrack_index = 1

    for i in range(T - 2, -1, -1):
        path[backtrack_index] = prev[i, int(last_state)]
        last_state = prev[i, int(last_state)]
        backtrack_index += 1

    # flip the path array
    path = np.flip(path, axis = 0)

    # return path

    # convert numeric values to actual hidden states
    result = []

    for p in path:
        if p == 0:
            result.append("A")
        else:
            result.append("B")
    
    return result


## 1.3. Thuật toán Baum - Welch

In [51]:
def backward_algorithm(observation, transition_prob, emission_prob):
    beta = np.zeros((observation.shape[0], transition_prob.shape[0]))
    beta[observation.shape[0] - 1] = np.ones(transition_prob.shape[0])

    # loop backwards from t - 2 to 0
    for i in range(observation.shape[0] -2, -1, -1):
        for j in range(transition_prob.shape[0]):
            beta[i, j] = (beta[i + 1] * emission_prob[:, observation[i + 1]]).dot(transition_prob[j, :])
    
    return beta

In [52]:
def baum_welch_algorithm(observation, transition_prob, emission_probability, initial_distribution, n_iter=100):
    M = transition_prob.shape[0]
    T = len(observation)

    for n in range(n_iter):
        alpha = forward_algorithm(observation, transition_prob, emission_probability, initial_distribution)
        beta = backward_algorithm(observation, transition_prob, emission_probability)

        xi = np.zeros((M, M, T - 1))
        for t in range(T - 1):
            denominator = np.dot(np.dot(alpha[t, :].T, transition_prob) * emission_probability[:, observation[t + 1]].T, beta[t + 1, :])
            for i in range(M):
                numerator = alpha[t, i] * transition_prob[i, :] * emission_probability[:, observation[t + 1]].T * beta[t + 1, :].T
                xi[i, :, t] = numerator / denominator

        gamma = np.sum(xi, axis=1)
        transition_prob = np.sum(xi, 2) / np.sum(gamma, axis=1).reshape((-1, 1))

        # Add additional T'th element in gamma
        gamma = np.hstack((gamma, np.sum(xi[:, :, T - 2], axis=0).reshape((-1, 1))))

        K = emission_probability.shape[1]
        denominator = np.sum(gamma, axis=1)
        for l in range(K):
            emission_probability[:, l] = np.sum(gamma[:, observation == l], axis=1)

        emission_probability = np.divide(emission_probability, denominator.reshape((-1, 1)))

    return (transition_prob, emission_probability)

## 1.4 Testing

In [53]:
data = pd.read_csv('data.csv')
obs = data['Visible'].values

# transition probabilities
trans_prob = np.ones((2, 2))
trans_prob = trans_prob / np.sum(trans_prob, axis = 1)

# emission probabilities
emiss_prob = np.array(((1, 3, 5), (2, 4, 6)))
emiss_prob = emiss_prob / np.sum(emiss_prob, axis = 1).reshape((-1, 1))

# equal probabilities for the initial distribution
init_dist = np.array((0.5, 0.5))

trans_prob, emiss_prob = baum_welch_algorithm(obs, trans_prob, emiss_prob, init_dist)

result = viterbi_algorithm(obs, trans_prob, emiss_prob, init_dist)

f = open("result.txt", "w")
for row in result:
    f.write(row + ' ')
f.close()