# Tutorial: Hidden Markov model

This tutorial demonstrates modeling and running inference on a hidden Markov model (HMM) in Bean Machine. The flexibility of this model allows us to demonstrate some of the great unique features of Bean Machine, such as block inference, compositional inference, and separation of data from the model.

# Problem

HMMs are a class of probabilistic models which are popular for doing inference on discrete-time stochastic processes. In general, Markov models are used to study a sequence of random variables, $X_1, \ldots, X_N$, where the sequence is "memoryless" such that the distribution of $X_{n+1}$ depends only on the value of $X_n$; any sequence which is memoryless is said to satisfy the Markov property. One reason Markov models are popular is because this flexible framework can be used to model many time-evolving processes, such as words in a sentence, the position of a robot, or the weather.

An HMM is a Markov model in which observations are modeled as being noisy. While we are interested in doing inference on each $X_n$, we are actually observing variables $Y_1, \ldots, Y_N$ which can depend on the values of $X_1, \ldots, X_N$ in a variety of ways. In specific settings, HMMs can be very tractable, and lend themselves towards provably-optimal algorithms such as Kalman Filtering. Here, we illustrate how to do general inference with MCMC as applicable to general HMMs. The single-site algorithms underlying Bean Machine enable inference algorithms which scale favorably with the size of the HMM.

# Prerequisites

Let's code this in Bean Machine! Import the Bean Machine library, some fundamental PyTorch classes, and optionally typing for our code.

In [None]:
import sys
if 'google.colab' in sys.modules and 'beanmachine' not in sys.modules:
    !pip install beanmachine
import os 
import beanmachine.ppl as bm

from torch import Tensor, tensor
import torch
import torch.distributions as dist
from typing import Dict, List, Optional, Tuple
import matplotlib.pyplot as plt
import numpy as np

torch.manual_seed(11);

# for CI testing
smoke_test = ('SANDCASTLE_NEXUS' in os.environ or 'CI' in os.environ)

# Model

We model the hidden state $X$ as being a discrete-time Markov chain with $K$ states and transition matrix (a.k.a. kernel) $\Theta$. We model $N$ timesteps of this chain with variables $X_1, \ldots, X_N$, and use variables $Y_1, \ldots, Y_N$ to model observable emissions of each $X$ with Gaussian noise.

Formally, the transition and emission probabilities are as follows, for $n \in 1, \ldots, N$:

- $ X_{n+1} | X_n \sim \text{Categorical}(\Theta[X_n]) $
- $ Y_n | X_n \sim \text{Normal}(\mu[X_n], \sigma[X_n]) $

Accordingly, priors can be assigned as follows, for $k \in 1, \ldots, K$: 

- $ \mu[k] \sim \text{Normal}(\mu_\text{loc}, \mu_\text{scale}) $
- $ \sigma[k] \sim \text{Gamma}(\sigma_\text{shape}, \sigma_\text{rate}) $
- $ \Theta[k] \sim \text{Dirichlet}([\frac{c}{K}, \ldots, \frac{c}{K}]) $

Finally, assume that the value of $X_0$ is known:
- $ X_0 = 1 $

So the model is set by choosing the prior through the values of: $\mu_\text{loc}, \mu_\text{scale}, \sigma_\text{shape}, \sigma_\text{rate}, c$. ($c$ stands for concentration).

We can implement this model in Bean Machine by defining random variable objects with the `@bm.random_variable` decorator. These functions behave differently than ordinary Python functions.

<div style="background: #daeaf3; border-left: 3px solid #2980b9; display: block; margin: 16px 0; padding: 12px;">
  Semantics for <code>@bm.random_variable</code> functions:
  <ul>
    <li>They must return PyTorch <code>Distribution</code> objects.
    <li>Though they return distributions, callees actually receive <i>samples</i> from the distribution. The machinery for obtaining samples from distributions is handled internally by Bean Machine.
    <li>Inference runs the model through many iterations. During a particular inference iteration, a distinct random variable will correspond to exactly one sampled value: <b>calls to the same random variable function with the same arguments will receive the same sampled value within one inference iteration</b>. This makes it easy for multiple components of your model to refer to the same logical random variable.
    <li>Consequently, to define distinct random variables that correspond to different sampled values during a particular inference iteration, an effective practice is to add a dummy "indexing" parameter to the function. Distinct random variables can be referred to with different values for this index.
    <li>Please see the documentation for more information about this decorator.
  </ul>
</div>

Note also that, compared to the statistical notation above, we switch from 1-indexing to 0-indexing.

In [None]:
class HiddenMarkovModel(object):
    def __init__(
        self,
        N: int,
        K: int,
        concentration: float,
        mu_loc: float,
        mu_scale: float,
        sigma_shape: float,
        sigma_rate: float,
    ) -> None:
        self.N = N
        self.K = K

        self.concentration = concentration
        self.mu_loc = mu_loc
        self.mu_scale = mu_scale
        self.sigma_shape = sigma_shape
        self.sigma_rate = sigma_rate

    @bm.random_variable
    def Theta(self, k):
        return dist.Dirichlet(torch.ones(self.K) * self.concentration / self.K)

    @bm.random_variable
    def Mu(self, k):
        return dist.Normal(self.mu_loc, self.mu_scale)

    @bm.random_variable
    def Sigma(self, k):
        return dist.Gamma(self.sigma_shape, self.sigma_rate)

    @bm.random_variable
    def X(self, n: int):
        if n == 0:
            return dist.Categorical(tensor([1.0] + [0.0] * (self.K - 1)))
        else:
            return dist.Categorical(self.Theta(self.X(n - 1).item()))

    @bm.random_variable
    def Y(self, n: int):
        return dist.Normal(
            self.Mu(self.X(n).item()), self.Sigma(self.X(n).item())
        )

# Inference

Inference is the process of combining _model_ with _data_ to obtain _insights_, in the form of probability distributions over values of interest. Bean Machine offers a powerful and general inference framework to enable fitting a arbitrary models to data.

Our model makes use of both continuous and discrete random variables. We'll want to make use of different inference strategies for each. In particular, we would like to take advantage of gradient information for the continuous random variables. To facilitate this, Bean Machine provides the `CompositionalInference` class.

`CompositionalInference` is a powerful, flexible class for configuring inference in a variety of ways. By default, `CompositionalInference` will select an inference method for each random variable that is appropriate based on its support. The HMM presented in this tutorial has a number of different random variables that we're interested in learning about. Those random variables, along with their supports and the inference method that `CompositionalInference` will automatically select for them, are summarized in the table below:

| Random variable | Support | Inference method | Transform?
| --- | --- | --- | ---
| $X$ | $0, \ldots, K - 1$ | Uniform | No (handled by the inference method)
| $\mu$ | $(-\infty, \infty)$ | Newtonian Monte Carlo | No (unnecessary)
| $\sigma$ | $[0, \infty)$ | Newtonian Monte Carlo | Yes (to unconstrained)
| $\Theta$ | Simplex | Simplex Newtonian Monte Carlo | Yes (stick-breaking transform)

You can learn more about compositional inference in our framework topics.

Normally, this is all you would have to do! However, the HMM model has meaningful structure that we would like to consider when configuring our inference strategy. In particular, the hidden state of each time step is highly correlated with the observable emissions of that state. Thus, if our inference proposes a new value for $X_n$, we would also like to jointly propose new values for $\mu[X_n]$ and $\sigma[X_n]$ -- it is very likely that previous values for these random variables are inappropriate for the new hidden state. In order to update these variables jointly (as opposed to by single-site), we can make use of block inference.

We configure block inference using the method `add_sequential_proposer`, which takes a list of variables as an argument. Note that, unlike in typical Bean Machine syntax, we don't include arguments to variable objects when defining the block inference scheme. This is because the framework can compute the minimal _specific_ set of variables to update from the random variable families provided to this function. The next paragraph explains how that block selection is performed, and is quite technical! Feel free to skip it if desired.

<div style="background: #daeaf3; border-left: 3px solid #2980b9; display: block; margin: 16px 0; padding: 12px;">
  Block inference selects specific subsets of random variables to update jointly. This is based upon which variables would typically be included in one another's Markov blanket during a single-site update. The Markov blanket of a random variable is a statistical concept defining which  other random variables may be directly affected when we propose a new value for the current random variable. It includes the current random variable's parents, children, and parents-of-children. To be concrete, consider our example where we block together <code>X</code>, <code>Mu</code>, and <code>Sigma</code>. Our single-site inference typically runs inference for each individual <code>X(n)</code>, <code>Mu(k)</code>, and <code>Sigma(k)</code>, doing separate accept-reject procedures for each individual node of the probabilistic model. With block inference, before computing the accept-reject probability on some <code>X(n)</code> (for a specific <code>n</code>), the method will search the Markov blanket of <code>X(n)</code> to find all variables that would be directly affected by an update to <code>X(n)</code>. Any variables of the form <code>Mu(j)</code> or <code>Sigma(k)</code>, for any <code>j</code>, <code>k</code>, will be blocked together, since that's what we configured with <code>add_sequential_proposer</code>. New proposals for <i>all</i> of these variables will be jointly accepted or rejected together. Read more about block inference in our documentation.
</div>

Much of the remaining code here is defining the list of queries and dictionary of observations. For this code, note that we bind data to random variables after defining our model, so this method can optionally observe the values of $\Theta, \mu, \sigma$.

In [None]:
def infer(
    model,
    num_samples: int,
    y_obs: Tensor,
    thetas_obs: Optional[Tensor],
    mus_obs: Optional[Tensor],
    sigmas_obs: Optional[Tensor],
):
    inference_strategy = bm.CompositionalInference()
    inference_strategy.add_sequential_proposer([model.X, model.Sigma, model.Mu])
    """
    Always query X_1, ..., X_{N-1}
    Always observe Y_1, ..., Y_{N-1}, 
    Always observe X_0 = 0
    """
    queries = [model.X(n) for n in range(1, model.N)]
    observations_dict = {model.Y(n): y_obs[n] for n in range(model.N)}
    observations_dict.update({model.X(0): tensor(0.0)})

    """
    The model parameters include thetas, mus, sigmas.
    These three variables can be either observed or queried. 
    Bean Machine's programmable inference syntax allows us to flexible define
    our model with or without observing these parameters, as we can see below.
    """
    if thetas_obs != None:
        observations_dict.update(
            {model.Theta(k): thetas_obs[k] for k in range(model.K)}
        )
    else:
        queries += [model.Theta(k) for k in range(model.K)]

    if mus_obs != None:
        observations_dict.update(
            {model.Mu(k): mus_obs[k] for k in range(model.K)}
        )
    else:
        queries += [model.Mu(k) for k in range(model.K)]

    if sigmas_obs != None:
        observations_dict.update(
            {model.Sigma(k): sigmas_obs[k] for k in range(model.K)}
        )
    else:
        queries += [model.Sigma(k) for k in range(model.K)]

    monte_carlo_samples = inference_strategy.infer(
        queries,
        observations_dict,
        num_samples,
        num_adaptive_samples=int(num_samples / 3),
        num_chains=1,
    )
    return monte_carlo_samples, queries

# Analysis

First, we will generate random observations, choose the prior, and run the code we've written so far.

It is interesting to experiment with different block inference schemes, different sets of observed variables, and different priors. Try exploring different experiments with this notebook!






In [None]:
def generate_chain_observations(theta, mus, sigmas, N):
    theta_distbns = {
        j: dist.Categorical(vector) for j, vector in enumerate(theta)
    }
    hidden = [0]
    while len(hidden) < N:
        hidden.append(theta_distbns[hidden[-1]].sample().item())

    def observe(x):
        return dist.Normal(mus[x], sigmas[x]).sample().item()

    return hidden, list(map(observe, hidden))

In [None]:
def main():
    """
    Define model and prior
    """

    concentration = 2.2
    mu_loc = 0.0
    mu_scale = 5.0
    sigma_shape = 4.0
    sigma_rate = 1.0

    N = 15
    K = 6

    thetas_obs = dist.Dirichlet(torch.ones(K) * concentration / K).sample_n(K)
    mus_obs = dist.Normal(mu_loc, mu_scale).sample_n(K)
    sigmas_obs = dist.Gamma(sigma_shape, sigma_rate).sample_n(K)
    xs_obs, ys_obs = generate_chain_observations(
        thetas_obs, mus_obs, sigmas_obs, N
    )
    xs_obs = tensor(xs_obs)
    ys_obs = tensor(ys_obs)

    # Initialize model
    model = HiddenMarkovModel(
        N, K, concentration, mu_loc, mu_scale, sigma_shape, sigma_rate,
    )

    # Run inference
    num_samples = 2 if smoke_test else 300
    monte_carlo_samples, queries = infer(
        model,
        num_samples=num_samples,
        y_obs=ys_obs,
        thetas_obs=thetas_obs,
        mus_obs=None,
        sigmas_obs=sigmas_obs,
    )

    xs_samples = [
        monte_carlo_samples.get_chain()[model.X(n)].detach().numpy()
        for n in range(1, N)
    ]
    # Account for the fact that X(0) is observed, not sampled
    xs_samples = (
        list(np.zeros(len(xs_samples[0])).reshape(1, len(xs_samples[0])))
        + xs_samples
    )
    xs_samples = tensor(xs_samples)

    thetas_samples = None
    mus_samples = None
    sigmas_samples = None

    if model.Theta(0) in queries:
        thetas_samples = [
            monte_carlo_samples.get_chain()[model.Theta(k)].detach().numpy()
            for k in range(model.K)
        ]
        thetas_samples = tensor(thetas_samples)
    if model.Mu(0) in queries:
        mus_samples = [
            monte_carlo_samples.get_chain()[model.Mu(k)].detach().numpy()
            for k in range(model.K)
        ]
        mus_samples = tensor(mus_samples)
    if model.Sigma(0) in queries:
        sigmas_samples = [
            monte_carlo_samples.get_chain()[model.Sigma(k)].detach().numpy()
            for k in range(model.K)
        ]
        sigmas_samples = tensor(sigmas_samples)


    return (
        (xs_obs, ys_obs, thetas_obs, mus_obs, sigmas_obs),
        (xs_samples, None, thetas_samples, mus_samples, sigmas_samples),
        N,
        K,
    )


obs, samples, N, K = main()

## Visualization

We will look at the values of the samples collected for $X$ and $\mu$. We will take the mean of samples taken over the last 10% of the chain, and compare these to our synthetic data.

In [None]:
tail_len = int(len(samples[0]) / 10)

xs_tail_plot = samples[0][:, -tail_len:].mean(1)
xs_obs = obs[0]
plt.scatter(
    range(len(xs_tail_plot)),
    xs_tail_plot,
    alpha=0.5,
    label="Inferred",
    c="b",
)
plt.scatter(
    range(len(xs_obs)), xs_obs, alpha=0.5, label="Ground truth", c="r"
)
plt.title("Values of X_1 ... X_N")
plt.xlabel("n")
plt.ylabel("Value of X_n")
plt.legend()
plt.show()

mus_tail_plot = samples[3][:, -tail_len:].mean(1)
mus_obs = obs[3]
plt.scatter(range(K), mus_tail_plot, alpha=0.5, label="Inferred", c="b")
plt.scatter(range(K), mus_obs, alpha=0.5, label="Ground truth", c="r")
plt.title("Values of mu_1 ... mu_K")
plt.xlabel("k")
plt.ylabel("Value of mu_k")
plt.legend()
plt.show()

These plots indicate that inference seems to be recovering hidden states well, and is computing reasonably accurate values for $\mu$.

## Posterior likelihood checks

One way to evaluate posterior samples is computing likelihoods of our posterior samples, and comparing these to the likelihood of the underlying synthetic data. Formally, we can compute the joint likelihood of posterior samples with the observations used to generate the samples. And similarly, we can compute the joint likelihood of the observations with the underlying synthetic data which was generated at the same time as the observations.

In our case, we've queried the $X$ and $\mu$ variables, and observed the rest. But this notebook is written flexibly, such that changing the call to `infer` in `main` will enable flexible observation of different sets of variables. 

In [None]:
def hmm_llh(xs, ys, thetas, mus, sigmas, N):
    result = 0
    for n in range(1, N):
        result += torch.log(thetas[xs[n - 1].int()][xs[n].int()])
    for n in range(N):
        result += dist.Normal(mus[xs[n].int()], sigmas[xs[n].int()]).log_prob(
            ys[n]
        )
    return result


xs = samples[0].transpose(0, 1)
num_samples = len(xs)
ys = [obs[1]] * num_samples

if samples[2] != None:
    thetas = samples[2].transpose(0, 1)
else:
    thetas = [obs[2]] * num_samples

if samples[3] != None:
    mus = samples[3].transpose(0, 1)
else:
    mus = [obs[3]] * num_samples

if samples[4] != None:
    sigmas = samples[4].transpose(0, 1)
else:
    sigmas = [obs[4]] * num_samples

ppcs = [
    hmm_llh(x, y, theta, mu, sigma, N)
    for x, y, theta, mu, sigma in zip(xs, ys, thetas, mus, sigmas)
]
ppc_avgs = [e / (i + 1) for i, e in enumerate(np.cumsum(ppcs))]

In [None]:
plt.figure(figsize=(12,6))
plt.plot(ppcs, label="Sample", c="lightblue", alpha=0.7)
plt.plot(ppc_avgs, label="Cumulative samples (average)", c="g")
plt.plot([hmm_llh(*obs, N)] * num_samples, label="Generated data", c="r")
plt.ylabel("Log likelihood")
plt.legend()
plt.show()

From the above plot, inference appears to be doing a good job of fitting the random variables given the observed data. Inference appears to converge with a log likelihood scores near to those generated by the ground truth parameters.