# ML Experiment 7: Hidden Markov Model 
**Aim : Implement HMM to predict the sequential data.**

The three fundamental problems of HMMs: 

1. **Likelihood:** Compute the likelihood of a given observation sequence.
2. **Decoding:** Determine the most likely hidden state sequence for a given observation sequence.
3. **Learning:** Learn the HMM parameters given an observation sequence.

In [1]:
import numpy as np


class HiddenMarkovModel:

    """A Hidden Markov Model (HMM).

    Attributes
    ----------
    states : array_like or numpy ndarray
        List of states.

    observations : array_like or numpy ndarray
        Observations space array.

    tp : array_like or numpy ndarray
        Transition probability matrix which stores probability of
        moving from state i (row) to state j (col).

    ep : array_like or numpy ndarray
        Emission probability matrix which stores probability of
        seeing observation o (col) from state s (row).

    pi : array_like or numpy ndarray
        Initial state probabilities array.

    """

    def __init__(self, states, observations, tp, ep, pi):

        self.states = np.array(states)
        self.observations = np.array(observations)
        self.num_states = self.states.shape[0]
        self.num_observations = self.observations.shape[0]
        self.tp = np.array(tp)
        self.ep = np.array(ep)
        self.pi = np.array(pi)

    def likelihood(self, obs):
        """Compute the likelihood of an observation sequence.

        Parameters
        ----------
        obs : array_like or numpy ndarray
            Sequence of observations.

        Returns
        -------
        prob : float
            Probability likelihood for observation sequence.

        """

        prob, _ = self.likelihood_forward(obs)
        return prob

    def likelihood_forward(self, obs):
        """Compute observation likelihood using the forward algorithm.

        Parameters
        ----------
        obs : array_like or numpy ndarray
            Sequence of observations of size T.

        Returns
        -------
        prob : float
            Probability likelihood for observation sequence.

        alpha : numpy ndarray
            Forward probability matrix of shape (num_states x T).

        """

        T = len(obs)
        alpha = np.zeros((self.num_states, T))

        # initialization
        o_0 = self._get_observation_idx(obs[0])
        alpha[:, 0] = self.pi * self.ep[:, o_0]

        # recursion
        for t in range(1, T):
            o_t = self._get_observation_idx(obs[t])
            alpha[:, t] = alpha[:, t-1].dot(self.tp) * self.ep[:, o_t]

        # termination
        prob = alpha[:, T-1].sum()

        return prob, alpha

    def likelihood_backward(self, obs):
        """Compute observation likelihood using the backward algorithm.

        Parameters
        ----------
        obs : array_like or numpy ndarray
            Sequence of observations of size T.

        Returns
        -------
        prob : float
            Probability likelihood for observation sequence.

        beta : numpy ndarray
            Backward probability matrix of shape (num_states x T).

        """

        T = len(obs)
        beta = np.zeros((self.num_states, T))

        # initialization
        beta[:, T-1] = 1

        # recursion
        for t in range(T-2, -1, -1):
            o_t1 = self._get_observation_idx(obs[t+1])
            beta[:, t] = self.tp.dot(self.ep[:, o_t1] * beta[:, t+1])

        # termination
        o_0 = self._get_observation_idx(obs[0])
        prob = self.pi.dot(self.ep[:, o_0] * beta[:, 0])

        return prob, beta

    def decode(self, obs):
        """Determine the best hidden sequence using the Viterbi algorithm.

        Parameters
        ----------
        obs : array_like or numpy ndarray
            Sequence of observations of size T.

        Returns
        -------
        path : numpy ndarray
            Sequence of states of size T.

        prob : float
            Probability likelihood for observation sequence along path.

        """

        T = len(obs)
        delta = np.zeros((self.num_states, T))

        # initialization
        o_0 = self._get_observation_idx(obs[0])
        delta[:, 0] = self.pi * self.ep[:, o_0]

        # recursion
        for t in range(1, T):
            o_t = self._get_observation_idx(obs[t])
            delta_prev = delta[:, t-1].reshape(-1, 1)
            delta[:, t] = (delta_prev * self.tp).max(axis=0) * self.ep[:, o_t]

        # termination
        path = self.states[delta.argmax(axis=0)]
        prob = delta[:, T-1].max()

        return path, prob

    def learn(self, obs, iterations=1):
        """Learn parameters from an observation sequence using Baum-Welch.

        Parameters
        ----------
        obs : array_like or numpy ndarray
            Sequence of observations of size T.

        iterations : int, optional
            Number of Expectation-Maximization (EM) iterations.
            Defaults to 1.

        """

        for _ in range(iterations):
            T = len(obs)

            # expectation step
            likelihood, alpha = self.likelihood_forward(obs)
            _, beta = self.likelihood_backward(obs)
            gamma = alpha * beta / (alpha * beta).sum(axis=0)
            xi = np.zeros((self.num_states, self.num_states, T-1))
            for t in range(T-1):
                o_t1 = self._get_observation_idx(obs[t+1])
                for i in range(self.num_states):
                    xi[i, :, t] = alpha[i, t] * self.tp[i, :] \
                                    * self.ep[:, o_t1] * beta[:, t+1]
            xi /= xi.sum(axis=(0, 1))

            # maximization step
            self.pi = gamma[:, 0]
            self.tp = xi.sum(axis=2) / gamma[:, :-1].sum(axis=1).reshape(-1, 1)
            for idx, o in enumerate(self.observations):
                indices = np.argwhere(obs == o).flatten()
                self.ep[:, idx] = gamma[:, indices].sum(axis=1) \
                    / gamma.sum(axis=1)

    def _get_observation_idx(self, o):
        """Get the vocabulary index value of an observation."""
        return np.argwhere(self.observations == o).flatten().item()


### Build the model

In [2]:
# initialize model parameters
state_space = ["hot", "cold"]
observation_space = [1, 2, 3]
initial_probabilities = [0.8, 0.2]
transition_probabilities = [[0.6, 0.4], [0.5, 0.5]]
emission_probabilities = [[0.2, 0.4, 0.4], [0.5, 0.4, 0.1]]

In [3]:
# build model
hmm = HiddenMarkovModel(state_space, observation_space, transition_probabilities, 
                        emission_probabilities, initial_probabilities)

### Problem 1: Likelihood

Given an HMM $\lambda = (A, B, \pi)$ and an observation sequence $O$, determine the likelihood $P(O|\lambda)$.

In [4]:
# calculate likelihood
observation_sequence = [1, 2, 3, 2, 2, 1, 2]
likelihood = hmm.likelihood(observation_sequence)
print("* Observation sequence: {}".format(observation_sequence))
print("* Likelihood: {:.2e}".format(likelihood))

* Observation sequence: [1, 2, 3, 2, 2, 1, 2]
* Likelihood: 5.92e-04


### Problem 2: Decoding

Given an observation sequence $O$ and an HMM $\lambda = (A, B, \pi)$, discover the best hidden state sequence $Q$.

In [5]:
# determine most likely state sequence
path, prob = hmm.decode(observation_sequence)
print("* Observation sequence: {}".format(observation_sequence))
print("* Most likely hidden state path: {}".format(path))
print("* Likelihood for observation sequence along path: {:.2e}".format(prob))

* Observation sequence: [1, 2, 3, 2, 2, 1, 2]
* Most likely hidden state path: ['hot' 'hot' 'hot' 'hot' 'hot' 'cold' 'hot']
* Likelihood for observation sequence along path: 2.12e-05


### Problem 3: Learning

Given an observation sequence $O$ and the set of states in the HMM, learn the HMM parameters $A$, $B$, and $\pi$.

In [6]:
# print original parameters
print("PROBABILITIES BEFORE LEARNING")
print("-----------------------------")
print("* Initial:")
print(hmm.pi)
print("\n* Transition:")
print(hmm.tp)
print("\n* Emission:")
print(hmm.ep)

PROBABILITIES BEFORE LEARNING
-----------------------------
* Initial:
[0.8 0.2]

* Transition:
[[0.6 0.4]
 [0.5 0.5]]

* Emission:
[[0.2 0.4 0.4]
 [0.5 0.4 0.1]]


In [7]:
# learn from observation sequence
hmm.learn(observation_sequence, iterations=1)

In [8]:
# print parameters after learning
print("PROBABILITIES AFTER LEARNING")
print("----------------------------")
print("* Initial:")
print(hmm.pi)
print("\n* Transition:")
print(hmm.tp)
print("\n* Emission:")
print(hmm.ep)

PROBABILITIES AFTER LEARNING
----------------------------
* Initial:
[0.61804435 0.38195565]

* Transition:
[[0.60945272 0.39054728]
 [0.50990073 0.49009927]]

* Emission:
[[0.23642616 0.55648474 0.2070891 ]
 [0.35240033 0.59164734 0.05595234]]
