# Hidden Markov Model - Uncovering Hidden Variables
Let's take our `HiddenMarkovChain` class to the next level and supplement it with more methods.
The methods will help us to discover the most probable sequence of hidden variables behind the observation sequence.

To save time, we have moved the previously derived classes to `definitions.py` and `base.py` files.

In [1]:
%pylab inline
import pandas as pd
import numpy as np
from itertools import product

from definitions import ProbabilityMatrix, ProbabilityVector
from base import HiddenMarkovChain
from chains import HiddenMarkovChain_Simulation

Populating the interactive namespace from numpy and matplotlib


## Expanding the class
In the last notebook, we have defined $\vec{\alpha}$ to be the probability of partial observation of the sequence up to time $t$.

$$
\vec{\alpha}_t = [\vec{\alpha}_{t-1} \cdot \mathbf{A}] \star (\vec{b})^T(\mathcal{O}_t)
$$

Now, let's define the "opposite" probability.
Namely, the probability of observing the sequence from $T-1$ down to $t$.

For $t = 0, 1, ..., T-1$ and $i = 0, 1, ..., N-1$, we define:

$$
\beta_t(i) = p(\mathcal{O}_{t+1}, \mathcal{O}_{t+2}, ..., \mathcal{O}_{T-1}|x_t = 1_i, \lambda)
$$

As before, we can calulate $\beta_t(i)$ recursively:

$$
\beta_{T-1}(i) = 1 \quad \text{for} \quad i = 0, 1, ..., N-1
$$

Then for $t \ne T-1$

$$
\beta_t(i) = \sum_{j=0}^{N-1} a_{i,j}b_j(\mathcal{O}_{t+1})\beta_{t+1}(j)
$$

which in vectorized form, will be:

$$
\vec{\beta}_t = [\mathbf{A} \cdot \vec{b}(\mathcal{O}_{t+1})] \star \vec{\beta}_{t+1}
$$

Finally, we also define a new quantity $\gamma$ to indicate the state $q_i$ at time $t$, for which the probability (calculated forwards and backwards) is the maximum:

$$
\gamma_t(i) = \frac{\alpha_t(i)\beta_t(i)}{p(\mathcal{O}|\lambda)}
$$

Consequently, for any step $t = 0, 1, ..., T-1$, the state of the maximum likelihood can be found using:

$$
q_i^{(t)} = \arg \max_i (\vec{\alpha}_t \star \vec{\beta}_t)
$$

In [2]:
class HiddenMarkovChain_Uncover(HiddenMarkovChain_Simulation):
    def _alphas(self, observations: list) -> np.ndarray:
        alphas = np.zeros((len(observations), len(self.states)))
        alphas[0, :] = self.pi.values * self.E[observations[0]].T
        for t in range(1, len(observations)):
            alphas[t, :] = (alphas[t - 1, :].reshape(1, -1) @ self.T.values) \
                         * self.E[observations[t]].T
        return alphas
    
    def _betas(self, observations: list) -> np.ndarray:
        betas = np.zeros((len(observations), len(self.states)))
        betas[-1, :] = 1
        for t in range(len(observations) - 2, -1, -1):
            betas[t, :] = (self.T.values @ (self.E[observations[t + 1]] \
                        * betas[t + 1, :].reshape(-1, 1))).reshape(1, -1)
        return betas
    
    def uncover(self, observations: list) -> list:
        alphas = self._alphas(observations)
        betas = self._betas(observations)
        maxargs = (alphas * betas).argmax(axis=1)
        return list(map(lambda x: self.states[x], maxargs))

## Validation

To validate, let's generate some observable sequence $\mathcal{O}$.
For that, we can use our model's `.run` method.
Then, we will use the `.uncover` method to find the most likely latent variable sequence.

In [3]:
np.random.seed(42)

a1 = ProbabilityVector({'1H': 0.7, '2C': 0.3})                                                                
a2 = ProbabilityVector({'1H': 0.4, '2C': 0.6})                                                                
                                                                                                               
b1 = ProbabilityVector({'1S': 0.1, '2M': 0.4, '3L': 0.5})                                                     
b2 = ProbabilityVector({'1S': 0.7, '2M': 0.2, '3L': 0.1})                                                     
                                                                                                                
A = ProbabilityMatrix({'1H': a1, '2C': a2})                                                                   
B = ProbabilityMatrix({'1H': b1, '2C': b2})                                                                   
pi = ProbabilityVector({'1H': 0.6, '2C': 0.4})

hmc = HiddenMarkovChain_Uncover(A, B, pi)

observed_sequence, latent_sequence = hmc.run(5)

print("Observed sequence:")
print(observed_sequence)
print("Latent sequence:")
print(latent_sequence)

Observed sequence:
['3L', '2M', '1S', '3L', '3L', '3L']
Latent sequence:
['1H', '2C', '1H', '1H', '2C', '1H']


In [4]:
uncovered_sequence = hmc.uncover(observed_sequence)
print(uncovered_sequence)

['1H', '1H', '2C', '1H', '1H', '1H']


As we can see, the most likely latent state chain (according to our algorithm) is not the same as the one that actually caused the observations.

However, this is to be expected.
Afterall, each observation sequence can only be manifested with certain probability, based on the latent sequence.
The code below, evaluates the likelihood of different latent sequences resulting in our observation sequence.

In [5]:
all_possible_states = {'1H', '2C'}
chain_length = 6  # any int > 0
all_states_chains = list(product(*(all_possible_states,) * chain_length))

df = pd.DataFrame(all_states_chains)
dfp = pd.DataFrame()

for i in range(chain_length):
    dfp['p' + str(i)] = df.apply(lambda x: hmc.E.df.loc[x[i], observed_sequence[i]], axis=1)

scores = dfp.sum(axis=1).sort_values(ascending=False)
df = df.iloc[scores.index]
df['score'] = scores

In [6]:
df.head(10).reset_index()

Unnamed: 0,index,0,1,2,3,4,5,score
0,55,1H,1H,2C,1H,1H,1H,3.1
1,39,1H,2C,2C,1H,1H,1H,2.9
2,23,2C,1H,2C,1H,1H,1H,2.7
3,53,1H,1H,2C,1H,2C,1H,2.7
4,54,1H,1H,2C,1H,1H,2C,2.7
5,51,1H,1H,2C,2C,1H,1H,2.7
6,63,1H,1H,1H,1H,1H,1H,2.5
7,7,2C,2C,2C,1H,1H,1H,2.5
8,37,1H,2C,2C,1H,2C,1H,2.5
9,35,1H,2C,2C,2C,1H,1H,2.5


The result above shows the sorted table of the latent sequences, given the observation sequence.
The actual latent sequence (the one that caused the observations) places itself on the 35th position (we counted index from zero).

In [7]:
dfc = df.copy().reset_index()
for i in range(chain_length):
    dfc = dfc[dfc[i] == latent_sequence[i]]
    
dfc

Unnamed: 0,index,0,1,2,3,4,5,score
34,45,1H,2C,1H,1H,2C,1H,1.9


## Conclusion
In this notebook, we have shown the implementation of how to find the most likely hidden (latent) state sequence given the observation sequence.