In [27]:
from hmmlearn import hmm
import numpy as np

In [28]:
seed = 93
np.random.seed(seed)

In [29]:
states = ["left_lane", "right_lane"]
observations = ["low_speed", "high_speed"]

In [30]:
discrete_model = hmm.MultinomialHMM(n_components=2,
                                    algorithm='viterbi',  # Decoder algorithm.
                                    # algorithm='map'  # todo: what does MaP?
                                    random_state=seed,
                                    n_iter=10,
                                    tol=0.01  # EM Convergence threshold (gain in log-likelihood)
                                   )

# Q1 - How to easily estimate the parameters of our HMM?
Here are the results of the estimation procedure based on **counting from the labelled data**. It constitutes the **MLE**.

In [31]:
discrete_model.startprob_ = np.array(
    [1/3, 2/3]
)
discrete_model.transmat_ = np.array([
    [0.6, 0.4],  # P(state_t+1|state_t=state_0)
    [0.2, 0.8]]  # P(state_t+1|state_t=state_1)
)
discrete_model.emissionprob_ = np.array(
    [[0.4, 0.6],  # P(obs|state_0)
     [0.8, 0.2]]  # P(obs|state_1)
)

### Sampling
Generate random samples from the HMM model.

In [32]:
# use the parameters to generates samples
# using a large sample size to estimate the stationary state distributions: 1/3 vs 2/3
# (n_samples, n_features)
Obss, States = discrete_model.sample(100000)
States = States.flatten()
Obss = Obss.flatten()
from collections import Counter
print(Counter(States))
print(Counter(Obss))

Counter({1: 66134, 0: 33866})
Counter({0: 66520, 1: 33480})


### Stationary distribution from the transition matrix
The **stationary distribution** is a **left eigenvector** (as opposed to the usual right eigenvectors) of the **transition matrix**.

In [33]:
# Compute the stationary distribution of states.
eigvals, eigvecs = np.linalg.eig(discrete_model.transmat_.T)
eigvec = np.real_if_close(eigvecs[:, np.argmax(eigvals)])
print(eigvecs)
print(eigvals)
print(eigvec)
# normalisation
stat_distr = eigvec / eigvec.sum()
print(stat_distr)

# The stationary distribution is proportional to the left-eigenvector
# associated with the largest eigenvalue (i.e., 1) of the transition matrix
x1 = np.asarray([-0.70710678, 0.70710678 ])
res1 = np.dot(discrete_model.transmat_.T, x1)
print(res1)
print(1/0.28284271)

x2 = np.asarray([-0.4472136, -0.89442719])  # invariant
res2 = np.dot(discrete_model.transmat_.T, x2)
print(res2)

[[-0.70710678 -0.4472136 ]
 [ 0.70710678 -0.89442719]]
[0.4 1. ]
[-0.4472136  -0.89442719]
[0.33333333 0.66666667]
[-0.28284271  0.28284271]
3.5355339368654755
[-0.4472136  -0.89442719]


In [34]:
0.4472136 /0.70710678

0.6324555394589767

todo: Compute a alpha table 

# Q2 - Given a single speed observation, what is the probability for the car to be in each of the two lanes?

In the [readme](README.md), the **posterior conditional probabilities** were computed using Bayes' rule. For instance:
- p(`left_lane`|`low_speed`) = p(`low_speed`|`left_lane`) * p(`left_speed`) / p(`low_speed`) 

**Posterior probabilities** for each state can be computed with `predict_proba()` and corresponding most likely states with `predict()`:
- P(`left lane`  | `high speed`) = `1/3` `*` `0.6` / `1/3` = `0.6`
- P(`left lane`  | `low speed`)  = `1/3` `*` `0.4` / `2/3` = `0.2`
- P(`right lane` | `high speed`) = `2/3` `*` `0.2` / `1/3` = `0.4`
- P(`right lane` | `low speed`)  = `2/3` `*` `0.8` / `2/3` = `0.8`

In [62]:
for i in range(len(observations)):
    p = discrete_model.predict_proba(np.array([[i]]))
    most_likely = discrete_model.predict(np.array([[i]]))
    print("p(state|'{}') = {} => most likely state is '{}'".format(
        observations[i], p, states[int(most_likely[0])]))

p(state|'low_speed') = [[0.2 0.8]] => most likely state is 'right_lane'
p(state|'high_speed') = [[0.6 0.4]] => most likely state is 'left_lane'


A second approach is to compute the **join probabilities**, as used in the **Viterbi algorithm**:
- p(`left_lane`,`low_speed`) = p(`low_speed`|`left_lane`) * p(`left_speed`)
- This is **alpha(`right_lane`, `t=1`)** for observation [`low_speed`]

Join probabilities:
- P(`left lane`  , `high speed`) = `1/3` `*` `0.6` = `3/15`  # *highest alpha for* `obs` = [`high_speed`] 
- P(`right lane` , `high speed`) = `2/3` `*` `0.2` = `2/15`
- P(`left lane`  , `low speed`)  = `1/3` `*` `0.4` = `2/15`
- P(`right lane` , `low speed`)  = `2/3` `*` `0.8` = `8/15`  # *highest alpha for* `obs` = [`low_speed`] 

In [63]:
# The Viterbi decoding algorithm uses the `alpha*` values. And the first `alpha*` are `alpha` values.
# Hence, the decoding method should return the state with the **highest joint probability**.
obs_0 = np.array([[0]]).T
obs_1 = np.array([[1]]).T
# Log probability of the maximum likelihood path through the HMM
logprob_0, state_0 = discrete_model.decode(obs_0)  # 8/15 = alpha[`low_speed`](`right_lane`)
logprob_1, state_1 = discrete_model.decode(obs_1)  # 2/5 = alpha[`high_speed`](`left_lane`)

print("{} -> {}".format(states[int(state_0)], observations[int(obs_0)]))
print("prob = {}\nlog_prob = {}".format(np.exp(logprob_0), logprob_0))
print("---")
print("{} -> {}".format(states[int(state_1)], observations[int(obs_1)]))
print("prob = {}\nlog_prob = {}".format(np.exp(logprob_1), logprob_1))

right_lane -> low_speed
prob = 0.5333333333333333
log_prob = -0.6286086594223741
---
left_lane -> high_speed
prob = 0.19999999999999998
log_prob = -1.6094379124341005


# Q3 - What is the probability to observe a particular sequence of speed measurements?

The **marginal probabilities of an observation sequence** can be found by:
- **The sum over any column of product `alpha` \* `beta`**.
- In particular, it is the **sum over the first column of the `alpha` values**:
- In `hmmlearn`, this is given by the `score()` method.

#### For a single observation
- P(`low speed`) = P(`left lane`, `low speed`) + P(`right lane`, `low speed`) = `2/15` + `8/15` = `2/3`
- P(`high speed`) = P(`left lane`, `high speed`) + P(`right lane`, `high speed`) = `3/15` + `2/15` = `1/3`

In [53]:
# For a single observation
# Compute the log probability under the model.
for i in range(len(observations)):
    p = np.exp(discrete_model.score(np.array([[i]])))
    print("p({}) = {} ".format(observations[i], p))

print("---")
# Compute the log probability under the model and compute posteriors.
for i in range(len(observations)):
    p = (discrete_model.score_samples(np.array([[i]])))
    print("p({}) = {}\n   posterior p(state|{}) = {}".format(observations[i], np.exp(p[0]), observations[i],  p[1]))

p(low_speed) = 0.6666666666666667 
p(high_speed) = 0.33333333333333337 
---
p(low_speed) = 0.6666666666666667
   posterior p(state|low_speed) = [[0.2 0.8]]
p(high_speed) = 0.33333333333333337
   posterior p(state|high_speed) = [[0.6 0.4]]


In [37]:
# For the example used: [`low_speed`, `high_speed`, `low_speed`]
obs_sequence = np.array([[0, 1, 0]]).T
p = np.exp(discrete_model.score(obs_sequence))
print("p({}) = {} ".format([observations[i] for i in obs_sequence.T[0]], p))

p(['low_speed', 'high_speed', 'low_speed']) = 0.13184 


# Q4 - Given a sequence of speed observations, what is the most likely current lane?

In [38]:
# Filtering can be obtained by normalizing `alpha` values.

# Q5 - Given a sequence of speed observations, what is the most likely underlying lane sequence?

### Decoding
The **inferred optimal hidden states** can be obtained by calling `decode()` method.

In [13]:
obs_sequence = np.array([[0, 1, 0]]).T
# Find most likely state sequence corresponding to obs_sequence.
logprob, state_sequence = discrete_model.decode(obs_sequence)

# Log probability of the produced state sequence
for o, s in zip(obs_sequence.T[0], state_sequence):
    print("{} -> {}".format(states[int(s)], observations[int(o)]))
print("\nprob = {}\nlog_prob = {}".format(np.exp(logprob), logprob))

right_lane -> low_speed
right_lane -> high_speed
right_lane -> low_speed

prob = 0.05461333333333335
log_prob = -2.9074772257991035


# Q6 - How to estimate the parameters of our HMM when no state annotations are present in the training data?

### Unsupervised Learning
**The HMM parameters** are estimated based on some observation data using the `fit()` method. It implements the **Baum-Welch algorithm**.

#### Random unbalanced sampling

In [25]:
# The `fit()` method is not "const". Save here the original parameters for comparison.
old_transmat = discrete_model.transmat_
old_emissionprob = discrete_model.emissionprob_
old_startprob = discrete_model.startprob_

In [26]:
# On purpose, one could generate an unbalanced sampling.
biased_sampling = np.random.choice([0, 1], size=(100,), p=[1./10, 9./10])
print("biased_sampling = \n{}".format(biased_sampling))

obs_sequence = np.array([biased_sampling]).T
new_model = discrete_model.fit(obs_sequence)

# Due to random sampling, two consecutive observations are independent. Hence, an uniform transition matrix.
# The unbalance is captured by the emission model.
print("---\nold_transmat = \n{}".format(old_transmat))
print("transmat = \n{}".format(new_model.transmat_))
print("---\nold_emissionprob = \n{}".format(old_emissionprob))
print("emissionprob = \n{}".format(new_model.emissionprob_))
print("---\nold_startprob = \n{}".format(old_startprob))
print("startprob = \n{}".format(new_model.startprob_))

biased_sampling = 
[1 1 1 0 1 0 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 0 1 0 0 1 1 1 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 0 1 1 0]
---
old_transmat = 
[[0.54907509 0.45092491]
 [0.55092127 0.44907873]]
transmat = 
[[0.48366555 0.51633445]
 [0.48380939 0.51619061]]
---
old_emissionprob = 
[[0.07984999 0.92015001]
 [0.12465517 0.87534483]]
emissionprob = 
[[0.13336734 0.86663266]
 [0.10748579 0.89251421]]
---
old_startprob = 
[0.58684749 0.41315251]
startprob = 
[0.46159954 0.53840046]


#### Todo: test
After training, it would be interesting to **assess the new model** by **submitting an observation drawn from the same distribution**.

#### Todo: monitor_
Monitor object used to check the convergence of EM.

In [65]:
def test_converged_by_logprob():
    m = ConvergenceMonitor(tol=1e-3, n_iter=10, verbose=False)
    for logprob in [-0.03, -0.02, -0.01]:
        m.report(logprob)
        assert not m.converged

    m.report(-0.0101)
    assert m.converged

In [67]:
from hmmlearn.base import ConvergenceMonitor
from hmmlearn import hmm
class ThresholdMonitor(ConvergenceMonitor):
    @property
    def converged(self):
        return (self.iter == self.n_iter or
                self.history[-1] >= self.tol)
model = hmm.GaussianHMM(n_components=2, tol=5, verbose=True)
model.monitor_ = ThresholdMonitor(model.monitor_.tol,
                                  model.monitor_.n_iter,
                                  model.monitor_.verbose)

AttributeError: 'GaussianHMM' object has no attribute 'monitor_'