In [None]:
# Viterbi model: most likely sequence of states given observed state
import tensorflow as tf
import numpy as np


class HiddenMarkovModel(object):

    def __init__(self, initial_prob, trans_prob, observ_prob):

        self.N = np.size(initial_prob)
        self.initial_prob = initial_prob
        self.trans_prob = trans_prob
        self.emission = tf.constant(observ_prob)

        assert self.initial_prob.shape == (self.N, 1)
        assert self.trans_prob.shape == (self.N, self.N)
        assert self.emission.shape[0] == self.N

        self.obs_idx = tf.placeholder(tf.int32)
        self.fwd = tf.placeholder(tf.float64)
        self.viterbi = tf.placeholder(tf.float64)

    def get_emission(self, obs_idx):
        slice_shape = [self.emission.shape[0], 1]
        slice_location = [0, obs_idx]
        return tf.slice(self.emission, slice_location, slice_shape)

    def forward_init_op(self):
        return tf.multiply(self.initial_prob, self.get_emission(self.obs_idx))

    def forward_op(self):
        transitions = tf.matmul(self.fwd, tf.transpose(self.get_emission(self.obs_idx)))
        weighted_transitions = transitions * self.trans_prob
        fwd = tf.reduce_sum(weighted_transitions, axis=0)
        return tf.reshape(fwd, shape=self.fwd.shape)

    def decode_op(self):
        transitions = tf.matmul(self.viterbi, tf.transpose(self.get_emission(self.obs_idx)))
        weighted_transitions = transitions * self.trans_prob
        viterbi = tf.reduce_max(weighted_transitions, axis=0)
        return tf.reshape(viterbi, tf.shape(self.viterbi))

    def backpt_op(self):
        back_transitions = tf.matmul(self.viterbi, np.ones((1, self.N)))
        weighted_back_transitions = back_transitions * self.trans_prob
        return tf.argmax(weighted_back_transitions,0)


def viterbi_decode(sess, hmm, observations):
    viterbi = sess.run(hmm.forward_init_op(), feed_dict= {hmm.obs_idx: observations[0]})
    backpts = np.ones((hmm.N, len(observations)), 'int32') * -1
    for t in range(1, len(observations)):
        viterbi, backpt = sess.run([hmm.decode_op(), hmm.backpt_op()],
                               feed_dict= {hmm.obs_idx: observations[t],
                                           hmm.viterbi: viterbi})
        backpts[:, t] = backpt
    tokens = [viterbi[:, -1].argmax()]
    for i in range(len(observations) - 1, 0, -1):
        tokens.append(backpts[tokens[-1], i])
    return tokens[:: -1]


if __name__ == '__main__':
    initial_prob = np.array([[0.6], [0.4]])  # e.g. {sunny: 0.6, rainy: 0.4}
    trans_prob = np.array([[0.7, 0.3], [0.4, 0.6]])  # e.g. {sunny: {sunny: 0.7, rainy: 0.3},
    # rainy: {sunny: 0.4, rainy: 0.6}}
    obs_prob = np.array([[0.1, 0.4, 0.5], [0.6, 0.3, 0.1]])  # e.g. {sunny: {shop: 0.1, eat: 0.4, stay: 0.5},
    # rainy: {shop: 0.6, 
    hmm = HiddenMarkovModel(initial_prob=initial_prob, trans_prob=trans_prob, observ_prob=obs_prob)
    observations = [1, 0, 2, 1, 1]
    with tf.Session() as sess:
        seq = viterbi_decode(sess, hmm, observations)
    print('Most likely hidden states are {}' .format(seq))


