In [2]:
from __future__ import print_function

import datetime
import numpy as np
from matplotlib import cm, pyplot as plt
from matplotlib.dates import YearLocator, MonthLocator
try:
    from matplotlib.finance import quotes_historical_yahoo_ochl
except ImportError:
    # For Matplotlib prior to 1.5.
    from matplotlib.finance import (
        quotes_historical_yahoo as quotes_historical_yahoo_ochl
    )
from hmmlearn import hmm
import urllib2
from BeautifulSoup import BeautifulSoup as bs

ModuleNotFoundError: No module named 'urllib2'

In [21]:
# HMM tutorial
print("HMMs:")
print("~  Probabilistic model where a sequence of variables (X) is generated by a sequence of internal hidden states (Z) ~")
print("-Transition between hidden states are determined by a probability vector (π) and a transition probability matrix (A)")
print("-The emission probability of an observable can be any distribution with parameters (Ω) conditioned on the current hidden state.")
print("                    fundamental principles of HMMS:")
print("1.  Given the model parameters and observed data, estimate the optimal sequence of hidden states.")
print("\t--> Viterbi Algorithm  ")
print("2.  Given the model parameters and observed data, calculate the likelihood of the data.")
print("\t--> Forward-Backward Algorithm  ")
print("3.  Given just the observed data, estimate the model parameters.")
print("\t--> EM Algorithm  ")

HMMs:
~  Probabilistic model where a sequence of variables (X) is generated by a sequence of internal hidden states (Z) ~
-Transition between hidden states are determined by a probability vector (π) and a transition probability matrix (A)
-The emission probability of an observable can be any distribution with parameters (Ω) conditioned on the current hidden state.
                    fundamental principles of HMMS:
1.  Given the model parameters and observed data, estimate the optimal sequence of hidden states.
	--> Viterbi Algorithm  
2.  Given the model parameters and observed data, calculate the likelihood of the data.
	--> Forward-Backward Algorithm  
3.  Given just the observed data, estimate the model parameters.
	--> EM Algorithm  


In [15]:
# Viterbi Algorithm
def viterbi(obs, states, start_p, trans_p, emit_p):
    V = [{}]
    for st in states:
        V[0][st] = {"prob": start_p[st] * emit_p[st][obs[0]], "prev": None}
    # Run Viterbi when t > 0
    for t in range(1, len(obs)):
        V.append({})
        for st in states:
            max_tr_prob = max(V[t-1][prev_st]["prob"]*trans_p[prev_st][st] for prev_st in states)
            for prev_st in states:
                if V[t-1][prev_st]["prob"] * trans_p[prev_st][st] == max_tr_prob:
                    max_prob = max_tr_prob * emit_p[st][obs[t]]
                    V[t][st] = {"prob": max_prob, "prev": prev_st}
                    break
    for line in dptable(V):
        print(line)
    opt = []
    # The highest probability
    max_prob = max(value["prob"] for value in V[-1].values())
    previous = None
    # Get most probable state and its backtrack
    for st, data in V[-1].items():
        if data["prob"] == max_prob:
            opt.append(st)
            previous = st
            break
    # Follow the backtrack till the first observation
    for t in range(len(V) - 2, -1, -1):
        opt.insert(0, V[t + 1][previous]["prev"])
        previous = V[t + 1][previous]["prev"]

    print('The steps of states are [ ' + ' '.join(opt) + ' ] with highest probability of %s' % max_prob)

def dptable(V):
    # Print a table of steps from dictionary
    yield(" ".join(("%12d" % i) for i in range(len(V))))
    for state in V[0]:
        yield("%.7s: " % state + " ".join("%.7s" % ("%f" % v[state]["prob"]) for v in V))

In [26]:
# Example of Viterbi algorithm in action
print("This example demonstrates the Viterbi algorithm")
print("Context: Based on a sequence of observed symptoms (normal, cold, dizzy) and possible hidden states (healthy, fever)")
print("  ~ What is the most likely sequence of health conditions of the patient that would explain these observations?\n")   
obs = ('normal', 'cold', 'dizzy')
states = ('Healthy', 'Fever')
start_p = {'Healthy': 0.6, 'Fever': 0.4}
trans_p = {
   'Healthy' : {'Healthy': 0.7, 'Fever': 0.3},
   'Fever' : {'Healthy': 0.4, 'Fever': 0.6}
   }
emit_p = {
   'Healthy' : {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
   'Fever' : {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6}
   }
print("- Sequence of Observations: ",obs)
print("- Hidden States: ",states)
print("- Start Probability: ",start_p)
print("- Transmission Probabilities: ",trans_p)
print("- Emission Probabilities: ",emit_p,"\n")

print("Results:")
viterbi(obs,states,start_p,trans_p,emit_p)

This example demonstrates the Viterbi algorithm
Context: Based on a sequence of observed symptoms (normal, cold, dizzy) and possible hidden states (healthy, fever)
  ~ What is the most likely sequence of health conditions of the patient that would explain these observations?

- Sequence of Observations:  ('normal', 'cold', 'dizzy')
- Hidden States:  ('Healthy', 'Fever')
- Start Probability:  {'Healthy': 0.6, 'Fever': 0.4}
- Transmission Probabilities:  {'Healthy': {'Healthy': 0.7, 'Fever': 0.3}, 'Fever': {'Healthy': 0.4, 'Fever': 0.6}}
- Emission Probabilities:  {'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1}, 'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6}} 

Results:
           0            1            2
Healthy: 0.30000 0.08400 0.00588
Fever: 0.04000 0.02700 0.01512
The steps of states are [ Healthy Healthy Fever ] with highest probability of 0.01512


In [22]:
# Forward-Backward Algorithm
def fwd_bkw(observations, states, start_prob, trans_prob, emm_prob, end_st):
    # forward part of the algorithm
    fwd = []
    f_prev = {}
    for i, observation_i in enumerate(observations):
        f_curr = {}
        for st in states:
            if i == 0:
                # base case for the forward part
                prev_f_sum = start_prob[st]
            else:
                prev_f_sum = sum(f_prev[k]*trans_prob[k][st] for k in states)

            f_curr[st] = emm_prob[st][observation_i] * prev_f_sum

        fwd.append(f_curr)
        f_prev = f_curr

    p_fwd = sum(f_curr[k] * trans_prob[k][end_st] for k in states)

    # backward part of the algorithm
    bkw = []
    b_prev = {}
    for i, observation_i_plus in enumerate(reversed(observations[1:]+(None,))):
        b_curr = {}
        for st in states:
            if i == 0:
                # base case for backward part
                b_curr[st] = trans_prob[st][end_st]
            else:
                b_curr[st] = sum(trans_prob[st][l] * emm_prob[l][observation_i_plus] * b_prev[l] for l in states)

        bkw.insert(0,b_curr)
        b_prev = b_curr

    p_bkw = sum(start_prob[l] * emm_prob[l][observations[0]] * b_curr[l] for l in states)

    # merging the two parts
    posterior = []
    for i in range(len(observations)):
        posterior.append({st: fwd[i][st] * bkw[i][st] / p_fwd for st in states})

    assert p_fwd == p_bkw
    return fwd, bkw, posterior

In [34]:
print("This example demonstrates the Forward-Backward algorithm")
states = ('Healthy', 'Fever')
end_state = 'E'
observations = ('normal', 'cold', 'dizzy')
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
transition_probability = {
   'Healthy' : {'Healthy': 0.69, 'Fever': 0.3, 'E': 0.01},
   'Fever' : {'Healthy': 0.4, 'Fever': 0.59, 'E': 0.01},
}
emission_probability = {
   'Healthy' : {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
   'Fever' : {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
}
print("The Forware-Backward Algorithm consists of 3 steps:")
print("1.  Computing forward probabilities\n\t -  the probability of ending up in any particular state given the first k observations in the sequence")
print("2.  Computing backward probabilities\n\t -  the probability of observing the remaining observations given any starting point k")
print("3.  computing smoothed values.\n")
print("- Sequence of Observations: ",observations)
print("- Hidden States: ",states)
print("- Start Probability: ",start_probability)
print("- Transmission Probabilities: ",transition_probability)
print("- Emission Probabilities: ",emission_probability)
print("- End State: ",end_state,"\n")

print("Results:")
fwd,bkwd,post = fwd_bkw(observations,states,start_probability,transition_probability,emission_probability,end_state)
print("  Forward: ",fwd)
print("  Backwards: ",bkwd)
print("  Posterior: ",post)

This example demonstrates the Forward-Backward algorithm
The Forware-Backward Algorithm consists of 3 steps:
1.  Computing forward probabilities
	 -  the probability of ending up in any particular state given the first k observations in the sequence
2.  Computing backward probabilities
	 -  the probability of observing the remaining observations given any starting point k
3.  computing smoothed values.

- Sequence of Observations:  ('normal', 'cold', 'dizzy')
- Hidden States:  ('Healthy', 'Fever')
- Start Probability:  {'Healthy': 0.6, 'Fever': 0.4}
- Transmission Probabilities:  {'Healthy': {'Healthy': 0.69, 'Fever': 0.3, 'E': 0.01}, 'Fever': {'Healthy': 0.4, 'Fever': 0.59, 'E': 0.01}}
- Emission Probabilities:  {'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1}, 'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6}}
- End State:  E 

Results:
  Forward:  [{'Healthy': 0.3, 'Fever': 0.04000000000000001}, {'Healthy': 0.0892, 'Fever': 0.03408}, {'Healthy': 0.007518, 'Fever': 0.02812031

In [64]:
# Using the numpy built in HMM functions
# Synthesize Data 
trans_mat  = np.array([[0.2, 0.6, 0.2],
                     [0.4, 0.0, 0.6],
                     [0.1, 0.2, 0.7]])
emm_mat    = np.array([[0.2, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1],
                    [0.1, 0.1, 0.1, 0.1, 0.2, 0.1, 0.1, 0.1, 0.1],
                    [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.2]])
start_prob = np.array([0.3, 0.4, 0.3])
X = [3,4,5,6,7]
model = hmm.MultinomialHMM(n_components=3)
model.startprob_ = start_prob
model.transmat_  = trans_mat
model.emissionprob_ = emm_mat

# Predict the optimal sequence of internal hidden state
X = np.atleast_2d([3, 4, 5, 6, 7]).T
print(model)
print(model.decode(X))
remodel = hmm.GaussianHMM(n_components=3, n_iter=100)
remodel.fit(X)
print(model)
print(remodel.decode(X))


MultinomialHMM(algorithm='viterbi', init_params='ste', n_components=3,
        n_iter=10, params='ste', random_state=None, startprob_prior=1.0,
        tol=0.01, transmat_prior=1.0, verbose=False)
(-13.758752224145665, array([0, 1, 2, 2, 2]))
MultinomialHMM(algorithm='viterbi', init_params='ste', n_components=3,
        n_iter=10, params='ste', random_state=None, startprob_prior=1.0,
        tol=0.01, transmat_prior=1.0, verbose=False)
(-0.8813434435724581, array([2, 0, 1, 1, 1]))


  np.log(self.transmat_), framelogprob)
  np.log(self.transmat_),
  np.log(self.transmat_),
  np.log(self.transmat_),
  np.log(self.transmat_), framelogprob)
