In [1]:
import numpy as np
import tensorflow as tf 

In [20]:
class HMM(object):
    def __init__(self, initial_prob, trans_prob, obs_prob):
        self.N = np.size(initial_prob)
        self.initial_prob = initial_prob
        self.trans_prob = trans_prob
        self.emission = obs_prob
        assert self.initial_prob.shape == (self.N, 1)
        assert self.trans_prob.shape == (self.N, self.N)
        assert obs_prob.shape[0] == self.N
        self.obs_idx = 0
        self.fwd = tf.cast(0., tf.double)
        self.viterbi = None
        
    def get_emission(self, obs_idx):
        slice_location = [0, obs_idx]
        num_rows = tf.shape(self.emission)[0]
        slice_shape = [num_rows, 1]
        return tf.slice(self.emission, slice_location, slice_shape)
    
    def forward_init_op(self, obs_idx):
        self.obs_idx = obs_idx
        obs_prob = self.get_emission(self.obs_idx)
        fwd = tf.multiply(self.initial_prob, obs_prob)
        return fwd
    
    def forward_op(self, obs_idx, fwd):
        self.obs_idx = obs_idx
        self.fwd = fwd
        transitions = tf.matmul(self.fwd,
        tf.transpose(self.get_emission(self.obs_idx)))
        weighted_transitions = transitions * self.trans_prob
        fwd = tf.reduce_sum(weighted_transitions, 0)
        return tf.reshape(fwd, tf.shape(self.fwd))
        
    def decode_op(self, obs_idx, viterbi):
        self.viterbi = viterbi
        self.obs_idx = obs_idx
        transitions = tf.matmul(viterbi,
        tf.transpose(self.get_emission(self.obs_idx)))
        weighted_transitions = transitions * self.trans_prob
        viterbi = tf.reduce_max(weighted_transitions, 0)
        return tf.reshape(viterbi, tf.shape(self.viterbi))

    def backpt_op(self, viterbi):
        back_transitions = tf.matmul(self.viterbi, np.ones((1, self.N)))
        weighted_back_transitions = back_transitions * self.trans_prob
        return tf.argmax(weighted_back_transitions, 0)     

In [21]:
def forward_algorithm(sess, hmm, observations):
    fwd = hmm.forward_init_op(observations[0])
    for t in range(1, len(observations)):
        fwd = hmm.forward_op(observations[t],fwd)
    prob = tf.reduce_sum(fwd)
    return prob

In [22]:
def viterbi_decode(hmm, observations):
    viterbi = hmm.forward_init_op(observations[0])
    backpts = np.ones((hmm.N, len(observations)), 'int32') * -1
    for t in range(1, len(observations)):
        viterbi = hmm.decode_op(observations[t], viterbi)
        backpt = hmm.backpt_op(viterbi)        
        backpts[:, t] = backpt
        
    tokens = [tf.math.argmax(viterbi[:, -1], 0)]
    for i in range(len(observations) - 1, 0, -1):
        tokens.append(backpts[tokens[-1], i])
    return tokens[::-1]


In [25]:
initial_prob = np.array([[0.6],[0.4]])
trans_prob = np.array([[0.7, 0.3],
                       [0.4, 0.6]])
obs_prob = np.array([[0.1, 0.4, 0.5],
                     [0.6, 0.3, 0.1]])
hmm = HMM(initial_prob=initial_prob, trans_prob=trans_prob, obs_prob=obs_prob)
observations = [0, 1, 1, 2, 1]
seq = viterbi_decode(hmm, observations)
print('Most likely hidden states are {}'.format(seq))


Most likely hidden states are [1, 0, 0, 0, <tf.Tensor: shape=(), dtype=int64, numpy=0>]
