In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [80]:
class HMM:
    def __init__(self, output_states, hidden_states, i_vector=None, t_matrix=None, e_matrix=None):
        # The set of possible emissions
        self.output_set = output_states
        # The number of possible emissions
        self.n_emissions = len(output_states)
        # Observable-to-index map
        self.observable_to_index_map = {o: i for i, o in enumerate(self.output_set)}
        # Index-to-observable map
        self.index_to_observable_map = {i: o for o, i in self.observable_to_index_map.items()}
        # The set of hidden states
        self.hidden_set = hidden_states
        # The number of hidden states
        self.n_states = len(hidden_states)
        # Observable-to-index map
        self.hidden_to_index_map = {o: i for i, o in enumerate(self.hidden_set)}
        # Index-to-observable map
        self.index_to_hidden_map = {i: o for o, i in self.hidden_to_index_map.items()}
        # The probability of starting in a particular state
        if i_vector is None:
            self.i_vector = np.ones(self.n_states, 1) / self.n_states
        else:
            assert i_vector.shape == (self.n_states,)
            self.i_vector = i_vector
        self.i_vector = self.i_vector
        # The transition probabilities
        # T[i, j] = probability of going from i to j
        if t_matrix is None:
            self.t_matrix = self.ones((self.n_states, self.n_states)) / self.n_states ** 2
        else:
            assert t_matrix.shape == (self.n_states, self.n_states)
            self.t_matrix = t_matrix
        # The emission probabilities
        # E[i, o] = probability of emitting o from i
        if e_matrix is None:
            self.e_matrix = self.ones((self.n_states, self.n_emissions)) / (self.n_states * self.n_emissions)
        else:
            assert e_matrix.shape == (self.n_states, self.n_emissions)
            self.e_matrix = e_matrix
    
    def observable_to_index(self, X):
        # Convert observables to indices
        return [self.observable_to_index_map[x] for x in X]

    def index_to_observable(self, X):
        # Convert indices to observables
        return [self.index_to_observable_map[x] for x in X]

    def hidden_to_index(self, X):
        # Convert hiddens to indices
        return [self.hidden_to_index_map[x] for x in X]

    def index_to_hidden(self, X):
        # Convert indices to hiddens
        return [self.index_to_hidden_map[x] for x in X]

    def observable_sequence_emission_probability(self, O):
        # Find the probability of generating the observable sequence O given the model
        # parameters. We will use the Forward algorithm for this.

        # Convert to indices
        X = self.observable_to_index(O)

        # Probabilities at first timestep
        alpha = self.i_vector * self.e_matrix[:, X[0]]
        # Iteration
        for i in range(1, len(X)):
            alpha = (alpha @ self.t_matrix) * self.e_matrix[:, X[i]]
        return np.sum(alpha)
    
    def most_likely_hidden_sequence(self, O):
        # Find the most likely hidden sequence H that generated the observable sequence O.
        # We will use the Viterbi algorithm for this.

        # Convert to indices
        X = self.observable_to_index(O)

        # Probabilities at first timestep
        alpha = self.i_vector * self.e_matrix[:, X[0]].T
        # Backtrace
        beta = -np.ones((len(X), self.n_states), dtype=np.int32)
        # Iteration
        for i in range(1, len(X)):
            nxt = alpha * self.t_matrix.T
            alpha = nxt.max(axis=1) * self.e_matrix[:, X[i]]
            beta[i, :] = nxt.argmax(axis=1)
        
        # Termination
        i, j = alpha.argmax(), -1
        H = []
        while i >= 0:
            H.append(i)
            i = beta[j, i]
            j -= 1
        H.reverse()

        return self.index_to_hidden(H)

    def single_sample_train(self, O):
        # Find the maximum likelihood estimate for the model parameters that could generate
        # the observable sequence O. We will use the Baum-Welch algorithm for this.
        # The Baum-Welch algorithm does not guarantee a global maximum will be found.
        alpha = np.zeros((self.n_states, self.n_emissions))
        beta = np.zeros((self.n_states, self.n_emissions))
        gamma = np.zeros((self.n_states, self.n_emissions))
        xi = np.zeros((self.n_states, self.n_states, self.n_emissions))

        # Convert to indices
        X = self.observable_to_index(O)
        # Calculate the forward probabilities
        alpha[:, 0] = self.i_vector * self.e_matrix[:, X[0]]
        for i in range(1, len(X)):
            alpha[:, i] = (alpha[:, i - 1] @ self.t_matrix) * self.e_matrix[:, X[i]]
        # Calculate the backward probabilities
        beta[:, -1] = np.ones(self.n_states)
        for i in range(-2, -len(X)-1, -1):
            beta[:, i] = self.t_matrix @ (self.e_matrix[:, X[i + 1]] * beta[:, i + 1])
        # Calculate gamma and xi
        gamma = alpha * beta / np.sum(alpha * beta)
        for i in range(len(X) - 1):
            xi[:, :, i] = ((alpha[:, i] * self.t_matrix).T * (self.e_matrix[:, X[i + 1]] * beta[:, i + 1])).T
        xi /= np.sum(alpha * beta)
        # Calculate the updated values
        self.i_vector = gamma[:, 0]
        self.t_matrix = np.sum(xi[:, :, :len(X) - 1], axis=2) / np.sum(gamma[:, :len(X) - 1], axis=1)
        for k in range(self.n_emissions):
            for j in range(self.n_states):
                self.e_matrix[j, k] = np.sum(np.where(np.array(X) == k, gamma[j, :], 0.0)) / np.sum(gamma[j, :])


In [109]:
hidden = ['Healthy', 'Fever']
output = ['Normal', 'Cold', 'Dizzy']
init = np.array([0.6, 0.4])
transition = np.array([
    [0.7, 0.3],
    [0.4, 0.6],
])
emission = np.array([
    [0.5, 0.4, 0.1],
    [0.1, 0.3, 0.6],
])
hmm = HMM(output, hidden, init, transition, emission)
hmm.most_likely_hidden_sequence(['Normal', 'Cold', 'Dizzy'])

['Healthy', 'Healthy', 'Fever']

In [122]:
hmm.single_sample_train(['Normal', 'Cold', 'Dizzy'])
hmm.e_matrix

array([[6.37840370e-163, 1.00000000e+000, 6.25599121e-165],
       [3.33333333e-001, 3.33333333e-001, 3.33333333e-001]])