In [None]:
import numpy as np
import numpy.matlib
import matplotlib.pyplot as plt

In [None]:
class HMM:
    def __init__(self, prior_model, trans_model, sensor_model):
        """
        :param prior_model: Column vector with prior probabilities for X_0
        :param trans_model: Transition matrix with transModel(i, j)=P(X_t=j | X_{t-1}=i.
        I.e., the transition probabilities are given row-wise.
        :param sensor_model: Sensor matrix (noHiddenStates x noSensorStates) with the sensor probabilities; the
        sensor probabilities are listed row-wise.
        """

        # Storage for model parameters
        self.prior_model = None
        self.trans_model = None
        self.diag_sensor_model = None
        self.num_hidden = None
        self.num_obs = None

        # Holds the log-likelihood of the model given the data
        self.ll = None

        # Storage for messages being send during inference
        self.forward_messages = None
        self.backward_messages = None
        self.forward_backward_messages = None

        # Initialize the data structures
        self.initialize(prior_model, trans_model, sensor_model)

    def initialize(self, prior_model, trans_model, sensor_model):
        """
          :param prior_model: Column vector with prior probabilities for X_0
          :param trans_model: Transition matrix with transModel(i, j)=P(X_t=j | X_{t-1}=i.
          I.e., the transition probabilities are given row-wise.
          :param sensor_model: Sensor matrix (noHiddenStates x noSensorStates) with the sensor probabilities; the
          sensor probabilities are listed row-wise.
          """

        self.prior_model = prior_model
        self.trans_model = trans_model

        # Extract the number of states for the hidden/state variable and the observation variable
        self.num_hidden = sensor_model.shape[0]
        self.num_obs = sensor_model.shape[1]

        # Convert the sensor model specification into diagonal form (see Slide 25)
        self.diag_sensor_model = []
        for i in range(self.num_obs):
            self.diag_sensor_model.append(np.diag(sensor_model[:, i]))

    # Do forward calculations: P(X_k|e_{1:k}). As a byproduct, we also get the log-likelihood of the model given the
    # data
    def forward(self, data):
        """
        :param data: list with entries of the observations
        """

        # Initialization
        total_time = len(data)
        self.forward_messages = np.zeros((self.num_hidden, total_time))

        # Do forward calculations
        #
        # We take special care of the first time step, where an observation is
        # assumed and no transition model is involved. Note that this differs a bit from the AIMA model from the
        # lecture
        self.forward_messages[:, 0] = np.dot(self.diag_sensor_model[data[0]], self.prior_model)

        # Calculate the log-likelihood (ll) contribution from the first time step
        self.ll = np.log2(np.sum(self.forward_messages[:, 0]))
        self.forward_messages[:, 0] = self.forward_messages[:, 0] / np.sum(self.forward_messages[:, 0])

        for t in range(1, total_time):
            temporal_term = np.dot(np.transpose(self.trans_model), self.forward_messages[:, t - 1])
            self.forward_messages[:, t] = np.dot(self.diag_sensor_model[data[t]], temporal_term)

            # Update ll
            self.ll += np.log2(np.sum(self.forward_messages[:, t]))

            # Normalize
            self.forward_messages[:, t] = self.forward_messages[:, t] / np.sum(self.forward_messages[:, t])

    # Do backward calculations: P(e_{k+1:t}|X_k)
    def backward(self, data):
        """
        :param data: list with entries of the observations
        """

        # Initialization
        total_time = len(data)
        self.backward_messages = np.zeros((self.num_hidden, total_time))
        self.backward_messages[:, total_time - 1] = np.ones(self.num_hidden)

        # Do backward calculations
        for t in range(total_time - 2, -1, -1):
            temp = np.dot(self.diag_sensor_model[data[t + 1]], self.backward_messages[:, t + 1])
            self.backward_messages[:, t] = np.dot(self.trans_model, temp)

            # Normalize (just to avoid numerical instability)
            self.backward_messages[:, t] = self.backward_messages[:, t] / np.sum(self.backward_messages[:, t])

    # The forward backward algorithm. It essentially just calls the forward and backward functions and normalizes the
    # results afterwards.
    def forward_backward(self, data):
        """
        :param data: list with entries of the observations
        """

        self.forward(data)
        self.backward(data)
        prob = self.forward_messages * self.backward_messages
        self.forward_backward_messages = prob / np.matlib.repmat(np.sum(prob, axis=0), self.num_hidden, 1)

    # Visualize the results
    def visualize(self, state, data=None):
        """
        :param state: Display the probability of the state 'state'
        :param data: The observations that have been made
        """
        fig = plt.figure()
        ax1 = fig.add_subplot(1,1,1)
        if self.forward_backward_messages is not None:
            ax1.plot(self.forward_backward_messages[state, :], label='Smooted estimate')
        if self.forward_messages is not None:
            ax1.plot(self.forward_messages[state, :], label='Filtered estimate')
        if data is not None:
            ax1.plot(data, label='Observations')
        plt.xticks(range(0,self.forward_messages.shape[1]))
        plt.xlabel("Time steps")
        plt.ylabel("P(X={}|evidence)".format(state))
        plt.title("Smoothed and filtered probabilities")
        ax1.legend(loc='best')

        fig.subplots_adjust(hspace=.5)
        plt.show()

In [None]:
# Transition matrix with transModel(i, j)=P(X_t=j | X_{t-1}=i.I.e., the transition probabilities are given row-wise.
trans_model = np.array([[0.9, 0.1], [0.4, 0.6]])

# Column vector with prior probabilities for X_0
prior_model = np.array([0.8, 0.2])

# Sensor matrix (noHiddenStates x noSensorStates) with the sensor probabilities; the sensor probabilities are
# listed row-wise.
sensor_model = np.array([[0.95, 0.05], [0.1, 0.9]])

hmm = HMM(prior_model, trans_model, sensor_model)

# The state number of the observations being made
data = [0, 1, 0, 1, 1, 1, 0, 0, 0]

# Perform the forward backward propagations
hmm.forward_backward(data)

# Visualize the results
hmm.visualize(1)