In [None]:
!pip install git+https://github.com/hmmlearn/hmmlearn

In [None]:
import numpy as np
from hmmlearn import hmm
import matplotlib.pyplot as plt

### Observable and Hidden States
Hidden Markov Models (HMMs) are useful when analyzing time series. 

To define a HMM, we must define the following:

* **observable states** - measurable data which serves as an input to HMM. You could think of it as something analogous to features of datapoints.
* **hidden states** - information we want to infer from a series of observations. The idea is that hidden states an observations are somehow correlated.

Moreover, the assumption is that hidden states satisfy the **Markov property** - the value of next state in the series depends only on the value of the previous state.

### What constitutes a HMM?
As for the HMMs themselves, a trained model posesses the following information:

* probability distribution of the initial hidden state in the series -$I$
* probabilities of transitions between hidden states - $T$ 
* emission probablities - given a particular hidden state, how likely are we to make a particular observation? - $E$

# Ice cream!

In this notebook we'll play with a classic ice cream example (it's July after all, y'all!)

In [None]:
# hidden states - is the day cold or hot?
hidden_states = {'cold': 0,
                 'hot': 1
                }
# observable states - how many ice creams have been eaten on that day?
observable_states = [0, 1, 2]

HMMs have three primary use cases:

## Likelihood - the forward algorithm

In this case, we're working with a pre trained HMM - which means all the essential parameters must be set manually.
Having this pretrained model, we can calculate the probability of given sequence of events occuring.

In [None]:
model = hmm.MultinomialHMM(n_components = len(hidden_states), verbose=True)

# initial probability vector of length n_hidden_states
# intuition - how probable is it that the first day is cold/hot?
model.startprob_ = np.array([0.5, 0.5])

# transition matrix - likelihood of transitioning between hidden states
# shape = n_hidden_states x n_hidden_states
# intuition - given that the current day is cold/hot, how likely is it that the next day will be cold/hot?
model.transmat_ = np.array([
  [0.7, 0.3],
  [0.4, 0.6]
])

# emission matrix - likelihood of particular observation given each hidden state
# shape = n_hidden_states x n_observable_states
# intuition - given that the current day is cold/hot how likely are we to eat 0/1/2 ice creams?
model.emissionprob_ = np.array([
  [0.7, 0.3, 0.0],
  [0.0, 0.4, 0.6]
])

The probability is calculated using the *forward algorithm*. It's main chunks are:

We know the probability distribution of hidden states in the initial timestep. For every next timestep:

If in the $n-1^{th}$ timestep the probabilities of particular hidden states are: 

$$
H^1_{n-1} \\
H^2_{n-1}\\
H^3_{n-1}
$$

Then the likelihoods of hidden states in the $n^{th}$ timestep are:

$$
H^1_n = H^1_{n-1} \cdot T_{1 | 1} + H^2_{n-1} \cdot T_{1 | 2} \cdot H^3_{n-1} \cdot T_{1 | 3}
$$

...and analogusly for $H^2_n$ and $H^3_n$.

Then, we can calculiate the likelihood of making a particular observation in the $n^{th}$ timestep:

$$
O^1_n =  H^1_{n} \cdot E_{1 | 1} + H^2_{n} \cdot T_{1 | 2} \cdot H^3_{n} \cdot T_{1 | 3} 
$$

... and analogously for $O^2_n$.

Finally, having calculated likelihood of given observations in the given timesteps, we can multiply the likelihoods of the observations we've actually made and get our result.

In [None]:
observations = np.atleast_2d([0, 0, 2]).T
np.e ** model.score(observations)

## Most likely sequence - Viterbi algorithm

Given a sequence of observations, we want to find the most likely sequence of hidden states that fits the observations. This is called a *Viterbi Path* and is obtained using recursion:

The probability that a time series of length $t$ which ends with a hidden state $s_t$ and observation $o_t$ is obtained with:

$$
V^{s_t}_t = max_{h \in H} (E_{o_t | s_t} \cdot T_{s_t | h} \cdot V^h_{t-1} ) \\
V^{s_1}_1 = E_{o_1 | s_1} \cdot I_{s_1}
$$


The task is to find for the final observation ($o_t$) a hidden state ($s_t$) for which $V^{s_t}_t$ will be maximal. For that, we have to recursively find that for $t-1$, $t-2$, etc...



In [None]:
observations = np.atleast_2d([0, 0, 2]).T

logprob, states = model.decode(observations)
[list(hidden_states.keys())[s] for s in states]

## Training a HMM

This is the most difficult problem and there is no analytical way to solve it. There are some iterative algorithms, for example gradient-based methods. Some approaches are discussed here:

https://www.ece.ucsb.edu/Faculty/Rabiner/ece259/Reprints/tutorial%20on%20hmm%20and%20applications.pdf

We'll focus on an example in the code:


First some helper functions for data generation:

In [None]:
def categorical_distribution(center, distr_range, n_categories):
    assert(center in range(n_categories))
    assert(distr_range < n_categories)
    broad_result = np.zeros(2*n_categories)
    distr_range += 1
    for i in range(distr_range):
        broad_result[n_categories + i] = broad_result[n_categories - i] = distr_range - i
        
    left = n_categories - center
    right = 2*n_categories - center
    result = broad_result[left:right]
    return result / result.sum() # normalization, so that sum =1

categorical_distribution(1, 2, 5)

In order to generate data, let's generate probabilities of transistions between observable states.

In [None]:
obs_trans_probs = np.array(
    [categorical_distribution(i, 2, len(observable_states)) for i in observable_states]
)

obs_initial_probs = np.ones(len(observable_states)) / len(observable_states)
print('initial observations probabilities')
print(obs_initial_probs)
print()
print('transitions between observations probabilities')
print(obs_trans_probs)

Having those utilities, we can generate sequences of observations:

In [None]:
def generate_sequence(length, init_probs=obs_initial_probs, trans_probs=obs_trans_probs, n_categories=len(observable_states)):
    result = []
    result.append(np.random.choice(n_categories, p=init_probs))
    for i in range(length -1):
        result.append(np.random.choice(n_categories, p=obs_trans_probs[result[i]]))
    
    presence = [i in result for i in range(n_categories)]
    return result if all(presence) else generate_sequence(length, init_probs, trans_probs, n_categories)

generate_sequence(10)

In [None]:
observations_train = np.array([generate_sequence(10) for _ in range(1000)])
observations_test = np.array([generate_sequence(10) for _ in range(10)])
observations_test

Then, all that's left is to train the model:

In [None]:
model = hmm.MultinomialHMM(n_components=len(hidden_states), n_iter=100)
model = model.fit(observations_train)

model.startprob_ = [0.5, 0.5]
print('startprob')
print(model.startprob_)
print('transmat')
print(model.transmat_)
print('emissions')
print(model.emissionprob_)

In [None]:
for example in observations_test:
    logprob, decoded = model.decode(example.reshape(-1,1))
    states = [list(hidden_states.keys())[d] for d in decoded]
    print([z for z in zip(example, states)])
