In [1]:
import tensorflow as tf
import numpy as np
from __future__ import print_function

A similar algorithm is commonly referred to as the Viterbi algorithm, but also known as a special case of the max-product or min-sum algorithm, which solves the related problem of maximization, or most probable explanation. Instead of attempting to solve the marginal, the goal here is to find the values \mathbf{x} that maximises the global function (i.e. most probable values in a probabilistic setting), and it can be defined using the arg max:

\operatorname*{arg\,max}_{\mathbf{x}} g(\mathbf{x}).
An algorithm that solves this problem is nearly identical to belief propagation, with the sums replaced by maxima in the definitions.[8]

It is worth noting that inference problems like marginalization and maximization are NP-hard to solve exactly and approximately (at least for relative error) in a graphical model. More precisely, the marginalization problem defined above is #P-complete and maximization is NP-complete.

The memory usage of belief propagation can be reduced through the use of the Island algorithm (at a small cost in time complexity).

In [2]:
def dptable(state_prob):
    print(" ".join(("%10d" % i) for i in range(state_prob.shape[0])))
    for i, prob in enumerate(state_prob.T):
        print("%.7s: " % states[i] +" ".join("%.7s" % ("%f" % p) for p in prob))

In [3]:
class HiddenMarkovModel(object):

    """
    Hidden Markov Model Class

    Parameters:
    -----------
    
    - S: Number of states.
    - T: Transition matrix of size S by S
         stores probability from state i to state j.
    - E: Emission matrix of size S by N (number of observations)
         stores the probability of observing  O_j  from state  S_i. 
    - T0: Initial state probabilities of size S.
    """

    def __init__(self, T, E, T0):
        # Number of states
        self.S = T.shape[0]
        
        # Emission probability
        self.E = tf.constant(E, name='emission_matrix')

        # Transition matrix
        self.T = tf.constant(T, name='transition_matrix')

        # Initial state vector
        self.T0 = tf.constant(T0, name='inital_state_vector')

    def initialize_path_variables(self, shape):
        
        pathStates = tf.Variable(tf.zeros(shape, dtype=tf.int64), name='States_matrix')
        pathScores = tf.Variable(tf.zeros(shape, dtype=tf.float64), name='Score_matrix')
        states_seq = tf.Variable(tf.zeros([shape[0]], dtype=tf.int64), name='States_sequence')
        return pathStates, pathScores, states_seq
    
    def belief_propagation(self, scores):
        
        scores_reshape = tf.reshape(scores, (-1,1))
        return tf.add(scores_reshape, tf.log(self.T))
    
    def viterbi_inference(self, obs_seq):
        
        # length of observed sequence
        N = len(obs_seq)
        
        # shape path Variables
        shape = [N, self.S]
        
        # observed sequence
        x = tf.constant(obs_seq, name='observation_sequence')
        
        # Initialize variables
        pathStates, pathScores, states_seq = self.initialize_path_variables(shape)       
        
        # log probability of emission sequence
        obs_prob_seq = tf.log(tf.gather(self.E, x))
        obs_prob_list = tf.split(0, N, obs_prob_seq)

        # step 1 state log-prior update
        pathScores = tf.scatter_update(pathScores, 0, tf.log(self.T0) + tf.squeeze(obs_prob_list[0]))
            
        
        for step, obs_prob in enumerate(obs_prob_list[1:]):
            
            # propagate state belief
            belief = self.belief_propagation(pathScores[step, :])

            # the inferred state by maximizing global function
            # and update state and score matrices 
            pathStates = tf.scatter_update(pathStates, step + 1, tf.argmax(belief, 0))
            pathScores = tf.scatter_update(pathScores, step + 1, tf.reduce_max(belief, 0) + tf.squeeze(obs_prob))

        # infer most likely last state
        states_seq = tf.scatter_update(states_seq, N-1, tf.argmax(pathScores[N-1, :], 0))
        
        for step in range(N - 1, 0, -1):
            # for every timestep retrieve inferred state
            state = states_seq[step]
            idx = tf.reshape(tf.pack([step, state]), [1, -1])
            state_prob = tf.gather_nd(pathStates, idx)
            states_seq = tf.scatter_update(states_seq, step - 1,  state_prob[0])

        return states_seq, tf.exp(pathScores) # turn scores back to probabilities
    
    def run_viterbi(self, obs_seq):
        with tf.Session() as sess:
            
            state_graph, state_prob_graph = self.viterbi_inference(obs_seq)
            sess.run(tf.initialize_all_variables())
            states_seq, state_prob = sess.run([state_graph, state_prob_graph])

        return states_seq, state_prob 
    
    
    def _forward(self, y):
        # forward pass
        # TensorFlow doesn't support indexing. List of Tensors will be used instead
        forward = []
        
        # initialize with state probabilities T0
        forward.append(tf.constant(self.T0, shape=(1, self.S), dtype=tf.float64))
        #forward.append(tf.ones((1, self.S), dtype=tf.float64) * self.T0)
        
        # forward belief propagation
        for step in range(y.shape[0]):

            forward_score = tf.mul(tf.matmul(forward[step], self.T), y[step])
            
            # normalize scores into probabilities
            forward.append(forward_score / tf.reduce_sum(forward_score))

        return forward[1:] # remove initial value
        

    def _backward(self, y):
        
        N = y.shape[0]
        backward = np.zeros((nT + 1, self.K))

        # backward pass
        backward = [None] * (nT + 1)
        backward[-1] = tf.ones((1, self.K), dtype=tf.float64) * (1.0 / self.K)
        for t in range(nT, 0, -1):
            backward_score = tf.transpose(
                tf.matmul(
                    tf.matmul(self.P, tf.diag(y[t - 1])),
                    tf.transpose(backward[t])
                )
            )
            backward[t - 1] = tmp / tf.reduce_sum(tmp)      
        
    def forward_backward(self, y):
        """
        runs forward backward algorithm on state probabilities y

        Arguments
        ---------
        y : np.array : shape (T, K) where T is number of timesteps and
            K is the number of states

        Returns
        -------
        (posterior, forward, backward)
        posterior : list of length T of tensorflow graph nodes representing
            the posterior probability of each state at each time step
        forward : list of length T of tensorflow graph nodes representing
            the forward probability of each state at each time step
        backward : list of length T of tensorflow graph nodes representing
            the backward probability of each state at each time step
        """

        # length of observed sequence
        N = len(obs_seq)
        
        # shape path Variables
        shape = [N, self.S]
        
        # observed sequence
        x = tf.constant(obs_seq, name='observation_sequence')
        
        # Initialize variables
        pathStates, pathScores, states_seq = self.initialize_path_variables(shape)       
        
        # log probability of emission sequence
        obs_prob_seq = tf.log(tf.gather(self.E, x))
        obs_prob_list = tf.split(0, N, obs_prob_seq)
        

        posterior = np.zeros((nT, self.K))
        
        forward = self._forward(y)
        
        backward = self._backward(y)

        # remove initial/final probabilities
        forward = forward[1:]
        backward = backward[:-1]

        # combine and normalize
        posterior = [f * b for f, b in zip(forward, backward)]
        posterior = [p / tf.reduce_sum(p) for p in posterior]

        return posterior, forward, backward

In [4]:
p0 = np.array([0.6, 0.4])

emi = np.array([[0.5, 0.1],
                [0.4, 0.3],
                [0.1, 0.6]])

trans = np.array([[0.7, 0.3],
                  [0.4, 0.6]])

states = {0:'Healthy', 1:'Fever'}
obs = {0:'normal', 1:'cold', 2:'dizzy'}

obs_seq = np.array([0, 1, 2])

In [5]:
model =  HiddenMarkovModel(trans, emi, p0)
states_seq, state_prob = model.run_viterbi(obs_seq)
dptable(state_prob)
print()
print("Most likely States: ", [obs[s] for s in states_seq])

         0          1          2
Healthy: 0.30000 0.08400 0.00588
Fever: 0.04000 0.02700 0.01512

Most likely States:  ['normal', 'normal', 'cold']


In [6]:
p0 = np.array([0.5, 0.5])

emi = np.array([[0.9, 0.2],
                [0.1, 0.8]])

trans = np.array([[0.7, 0.3],
                  [0.3, 0.7]])

states = {0:'rain', 1:'no_rain'}
obs = {0:'umbrella', 1:'no_umbrella'}

obs_seq = np.array([0, 0, 1, 0, 0])

In [10]:
model =  HiddenMarkovModel(trans, emi, p0)

p, f, b = model.forward_backward(obs_seq)
[tf.Session().run(g) for g in p]

AttributeError: 'HiddenMarkovModel' object has no attribute 'K'