In [17]:
from hmmlearn import hmm
import numpy as np
from collections import Counter

In [18]:
seed = 93
np.random.seed(seed)

In [19]:
states = ["left_lane", "right_lane"]
observations = ["low_speed", "high_speed"]

In [20]:
discrete_model = hmm.MultinomialHMM(n_components=2,
                                    algorithm='viterbi',  # decoder algorithm.
                                    random_state=seed,
                                    n_iter=10,
                                    tol=0.01  # EM convergence threshold (gain in log-likelihood)
                                    )

In [21]:
discrete_model.startprob_ = np.array(
    [1/3, 2/3]
)
discrete_model.transmat_ = np.array([
    [0.6, 0.4],  # P(state_t+1|state_t=state_0)
    [0.2, 0.8]]  # P(state_t+1|state_t=state_1)
)
discrete_model.emissionprob_ = np.array(
    [[0.4, 0.6],  # P(obs|state_0)
     [0.8, 0.2]]  # P(obs|state_1)
)

In [22]:
sample_obs, sample_states = discrete_model.sample(100000)
print(Counter(sample_states.flatten()))
print(Counter(sample_obs.flatten()))

Counter({1: 66134, 0: 33866})
Counter({0: 66520, 1: 33480})


In [23]:
# Compute the stationary distribution of states.
eigvals, eigvecs = np.linalg.eig(discrete_model.transmat_.T)
eigvec = np.real_if_close(eigvecs[:, np.argmax(eigvals)])

# normalisation
stat_distr = eigvec / eigvec.sum()
print(stat_distr)

[0.33333333 0.66666667]


In [24]:
for i in range(len(observations)):
    p = discrete_model.predict_proba(np.array([[i]]))
    most_likely = discrete_model.predict(np.array([[i]]))
    print("p(state|'{}') = {} => most likely state is '{}'".format(
        observations[i], p, states[int(most_likely[0])]))

p(state|'low_speed') = [[0.2 0.8]] => most likely state is 'right_lane'
p(state|'high_speed') = [[0.6 0.4]] => most likely state is 'left_lane'


In [25]:
# The Viterbi decoding algorithm uses the `alpha*` values.
# And the first `alpha*` are `alpha` values.
# Hence, the decoding method should return the state with the **highest joint probability**.
obs_0 = np.array([[0]]).T
obs_1 = np.array([[1]]).T
# Log probability of the maximum likelihood path through the HMM
logprob_0, state_0 = discrete_model.decode(obs_0)  # 8/15 = alpha[`low_speed`](`right_lane`)
logprob_1, state_1 = discrete_model.decode(obs_1)  # 2/5 = alpha[`high_speed`](`left_lane`)

print("{} -> {}".format(states[int(state_0)], observations[int(obs_0)]))
print("prob = {}".format(np.exp(logprob_0)))
print("---")
print("{} -> {}".format(states[int(state_1)], observations[int(obs_1)]))
print("prob = {}".format(np.exp(logprob_1)))

right_lane -> low_speed
prob = 0.5333333333333333
---
left_lane -> high_speed
prob = 0.19999999999999998


In [26]:
# For a single observation
# Compute the log probability under the model.
for i in range(len(observations)):
    p = np.exp(discrete_model.score(np.array([[i]])))
    print("p({}) = {} ".format(observations[i], p))

print("---")
# Compute the log probability under the model and compute posteriors.
for i in range(len(observations)):
    p = (discrete_model.score_samples(np.array([[i]])))
    print("p({}) = {}\n   posterior p(state|{}) = {}".format(observations[i], np.exp(p[0]), observations[i],  p[1]))

p(low_speed) = 0.6666666666666667 
p(high_speed) = 0.33333333333333337 
---
p(low_speed) = 0.6666666666666667
   posterior p(state|low_speed) = [[0.2 0.8]]
p(high_speed) = 0.33333333333333337
   posterior p(state|high_speed) = [[0.6 0.4]]


In [27]:
obs_sequence = np.array([[0, 1, 0]]).T
p = np.exp(discrete_model.score(obs_sequence))
print("p({}) = {} ".format([observations[i] for i in obs_sequence.T[0]], p))

p(['low_speed', 'high_speed', 'low_speed']) = 0.13184 


In [28]:
n_state = discrete_model.n_components
start_prob = discrete_model.startprob_
emit_prob = discrete_model.emissionprob_
transmat_prob = discrete_model.transmat_

In [29]:
def forward_algo(obs_seq):
    alpha = np.zeros((len(obs_seq), n_state))
    alpha[0] = np.transpose(emit_prob)[obs_seq[0]] * start_prob
    for t in range(alpha.shape[0]-1):
        alpha[t+1] = np.transpose(emit_prob)[obs_seq[t+1]] * np.dot(alpha[t], transmat_prob)
    return alpha

In [30]:
obs_sequence = np.array([[0, 1, 0]]).T
alpha = forward_algo(obs_sequence)
print(alpha)

[[0.13333333 0.53333333]
 [0.112      0.096     ]
 [0.03456    0.09728   ]]


In [31]:
def backward_algo(obs_seq):
    beta = np.zeros((len(obs_seq), n_state))
    beta[len(obs_seq) - 1] = np.ones((n_state))
    for t in reversed(range(len(obs_seq)-1)):
        beta[t] = np.dot(beta[t + 1] * np.transpose(emit_prob)[obs_seq[t + 1]], np.transpose(transmat_prob))
    return beta

In [32]:
beta = backward_algo(obs_sequence)
print(beta)

[[0.2592 0.1824]
 [0.56   0.72  ]
 [1.     1.    ]]
