# HMM

In [None]:
import numpy as np
import tensorflow as tf
#import tensorflow_probability as tfp

In [None]:
# initial parameters can be learned on training data
# theory reference https://web.stanford.edu/~jurafsky/slp3/8.pdf
# code reference https://phvu.net/2013/12/06/sweet-implementation-of-viterbi-in-python/
class HMM(object):
    def __init__(self, initial_prob, trans_prob, obs_prob):
        self.N = np.size(initial_prob)
        self.initial_prob = initial_prob   # N_features x 1
        self.trans_prob = trans_prob       # N_features x N_features
        self.obs_prob = obs_prob           # N_features x N_states
        self.emission = tf.constant(obs_prob)    # N_features x N_states
        assert self.initial_prob.shape == (self.N, 1)   # N_features x 1
        assert self.trans_prob.shape == (self.N, self.N) # N_features x N_features
        assert self.obs_prob.shape[0] == self.N    # N_features
        self.obs = tf.placeholder(tf.int32)
        self.fwd = tf.placeholder(tf.float64)
        self.viterbi = tf.placeholder(tf.float64)

    def get_emission(self, obs_idx):
        slice_location = [0, obs_idx]  # start location for particular state
        num_rows = tf.shape(self.emission)[0] # N_features
        slice_shape = [num_rows, 1]  # N_features x 1
        return tf.slice(self.emission, slice_location, slice_shape)   # returns all features of a particular state N_featuresx1

    def forward_init_op(self):
        obs_prob = self.get_emission(self.obs)
        fwd = tf.multiply(self.initial_prob, obs_prob) # (N_features x 1) x (N_features x 1)
        return fwd  # (N_features x 1)

    def forward_op(self):
        transitions = tf.matmul(self.fwd, tf.transpose(self.get_emission(self.obs))) # (N_features x 1)*(1xN_features)
        weighted_transitions = transitions * self.trans_prob # (N_features x N_features)
        fwd = tf.reduce_sum(weighted_transitions, 0) # (1 x N_features)
        return tf.reshape(fwd, tf.shape(self.fwd)) # (N_features x 1)

    def decode_op(self):
        transitions = tf.matmul(self.viterbi, tf.transpose(self.get_emission(self.obs)))
        weighted_transitions = transitions * self.trans_prob
        viterbi = tf.reduce_max(weighted_transitions, 0)
        return tf.reshape(viterbi, tf.shape(self.viterbi))

    def backpt_op(self):
        back_transitions = tf.matmul(self.viterbi, np.ones((1, self.N)))
        weighted_back_transitions = back_transitions * self.trans_prob
        return tf.argmax(weighted_back_transitions, 0)

In [None]:
def forward_algorithm(sess, hmm, observations):
    fwd = sess.run(hmm.forward_init_op(), feed_dict={hmm.obs: observations[0]})
    for t in range(1, len(observations)):
        fwd = sess.run(hmm.forward_op(), feed_dict={hmm.obs: observations[t], hmm.fwd: fwd})
    prob = sess.run(tf.reduce_sum(fwd))
    return prob

In [None]:
def viterbi_decode(sess, hmm, observations):
    viterbi = sess.run(hmm.forward_init_op(), feed_dict={hmm.obs: observations[0]})
    backpts = np.ones((hmm.N, len(observations)), 'int32') * -1
    for t in range(1, len(observations)):
        viterbi, backpt = sess.run([hmm.decode_op(), hmm.backpt_op()],
                                    feed_dict={hmm.obs: observations[t],
                                               hmm.viterbi: viterbi})
        backpts[:, t] = backpt
    tokens = [viterbi[:, -1].argmax()]
    for i in range(len(observations) - 1, 0, -1):
        tokens.append(backpts[tokens[-1], i])
    return tokens[::-1]

In [None]:
if __name__ == '__main__':
    states = ('Healthy', 'Fever')
#     observations = ('normal', 'cold', 'dizzy')
#     start_probability = {'Healthy': 0.6, 'Fever': 0.4}
#     transition_probability = {
#         'Healthy': {'Healthy': 0.7, 'Fever': 0.3},
#         'Fever': {'Healthy': 0.4, 'Fever': 0.6}
#     }
#     emission_probability = {
#         'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
#         'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6}
#     }
    initial_prob = np.array([[0.6], [0.4]])  # N_features x 1
    trans_prob = np.array([[0.7, 0.3], [0.4, 0.6]])  # N_features x N_features
    obs_prob = np.array([[0.5, 0.4, 0.1], [0.1, 0.3, 0.6]]) # N_features x N_states
    hmm = HMM(initial_prob=initial_prob, trans_prob=trans_prob, obs_prob=obs_prob)

    #observations = [0, 1, 1, 2, 1]
    observations = [0, 0, 0, 1, 1]
    with tf.Session() as sess:
        prob = forward_algorithm(sess, hmm, observations)
        print('Probability of observing {} is {}'.format(observations, prob))

        seq = viterbi_decode(sess, hmm, observations)
        print('Most likely hidden states are {}'.format(seq))

##  Native TF implementation

In [None]:
tfd = tfp.distributions

# A simple weather model.

# Represent a cold day with 0 and a hot day with 1.
# Suppose the first day of a sequence has a 0.8 chance of being cold.
# We can model this using the categorical distribution:

initial_distribution = tfd.Categorical(probs=[0.8, 0.2])

# Suppose a cold day has a 30% chance of being followed by a hot day
# and a hot day has a 20% chance of being followed by a cold day.
# We can model this as:

transition_distribution = tfd.Categorical(probs=[[0.7, 0.3],
                                                 [0.2, 0.8]])

# Suppose additionally that on each day the temperature is
# normally distributed with mean and standard deviation 0 and 5 on
# a cold day and mean and standard deviation 15 and 10 on a hot day.
# We can model this with:

observation_distribution = tfd.Normal(loc=[0., 15.], scale=[5., 10.])

# We can combine these distributions into a single week long
# hidden Markov model with:

model = tfd.HiddenMarkovModel(
    initial_distribution=initial_distribution,
    transition_distribution=transition_distribution,
    observation_distribution=observation_distribution,
    num_steps=7)

# The expected temperatures for each day are given by:

model.mean()  # shape [7], elements approach 9.0

# The log pdf of a week of temperature 0 is:

model.log_prob(tf.zeros(shape=[7]))

In [None]:
# Build a graph
graph = tf.Graph()
with graph.as_default():
    # A 2x3 matrix
    a = tf.constant(np.array([[ 1],
                              [10]]),
                    dtype=tf.float32)
    # Another 2x3 matrix
    b = tf.constant(np.array([[2],
                              [3],[5]]),
                    dtype=tf.float32)

    # Elementwise multiplication
    c =  a * b
    d = tf.multiply(a, b)

# Run a Session
with tf.Session(graph=graph) as session:
    (output_c, output_d) = session.run([c, d])
    print("output_c")
    print(output_c)
    print("\noutput_d")
    print(output_d)