## Hidden Markov Models (HMM)

HMM are popular approaches to working with time series problems where there are latent or hidden states

There are 4 common inference problems in HMM:

- __Filtering__  Inferring the present - carried out by passing messages up and to the right, so infer $h_t$ from $p(h_t|v_{1:t})$  where  $t = T$

- __Prediction__ Inferring the future - similar to filtering but with a new future state, so infer $h_t$ from $p(h_t|v_{1:T})$ where $t>T$

- __Smoothing__  Inferring the past - combine filtering messages with messages up and to the left, so infer $h_t$ from $p(h_t|v_{1:T})$ where $t <T$

- __Decoding__ Find the most likely hidden path - computed similarly to smoothing, so infer the most likely hidden sequence $h_{1:T}$ from $p(h_{1:T}|v_{1:T})$

In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Nov 12 20:35:27 2020

@author: donaldbrown
"""

import numpy as np
import pandas as pd

class HMM:
    """Creates a class for Hidden Markov Models
    Input:
        Viz:      List of observed or visible states over time
        Trans_M:  Transition matrix for hidden states, H X H, H=len(Trans_M), no. of hidden variables
        Obs_M:    Observation matrix, H X V, V = no. of visible variables
        Pi:       List of initial state probabilities
    Methods:
        filter = The posterior probabilities for hidden states for each time period, T X H array
        smoother = The probabiliteis for the hidden states at each prevoius time period, T X H array
        viturbi = The most likely path of hidden states given the observed state, data frame, 1 X T
        predictor = The probabilities for next hidden state and the next observed state, 1 X H array """
    
    def __init__(self,Viz, Trans_M, Obs_M, Pi):
        # initialize variables
        # Hidden state transition matrix
        self.Trans_M = Trans_M
        # Visible or observates state probabilities given the hidden states
        self.Obs_M = Obs_M
        # No. of hidden states
        self.H = Trans_M.shape[0]
        # No. of observed states
        self.V = Obs_M.shape[0]
        # prior probabaiities for the hidden states
        self.Pi = Pi
        # List of observed states over time
        self.Viz = Viz

        
    def filter(self):
        
        T = len(self.Viz)
        
        # Obtain the joint probabilities of the hidden and observed states at time t
        self.alpha = np.zeros((T, self.H))
        self.alpha[0, :] = self.Pi * self.Obs_M[:,self.Viz[0]]
 
        for t in range(1, T):
            for j in range(self.H):
                self.alpha[t, j] = self.alpha[t - 1].dot(self.Trans_M[:, j]) * self.Obs_M[j, self.Viz[t]]
        
        ### Insert your code here to computer the posterior probabilities ###
        self.Post = np.zeros((T, self.H))
        
        for t in range(0, T):
            sum_of_row = sum(self.alpha[t,:])
            for j in range(self.H):
                self.Post[t, j] = self.alpha[t, j] / sum_of_row

        print("self.alpha")
        print(self.alpha)
        print("Posterior")
        print(self.Post)   
        return self.Post
      
    def smoother(self):

        T = len(self.Viz)
        self.beta = np.zeros((T, self.H))
 
        # setting beta(T) = 1
        self.beta[T - 1] = np.ones((self.H))
 
        # Loop backwards way from T-1 to 1
        # Due to python indexing the actual loop will be T-2 to 0
        for t in range(T - 2, -1, -1):
            for j in range(self.H):
                self.beta[t, j] = (self.beta[t + 1] * self.Obs_M[:, self.Viz[t + 1]]).dot(self.Trans_M[j, :])
                
        # Obtain the posterior probabilities of the hidden states given the observed states       
        
        ### Insert your code here to compute the posterior probabilities ###
        # Post = np.dot(self.alpha, self.beta) /  np.dot(self.alpha, self.beta).sum(axis=1)
        # Post = self.alpha * beta /  (self.alpha * beta).sum(axis=1)

        
        self.Post_smoother = np.zeros((T, self.H))
        
        for t in range(0, T):
            sum_of_row = sum(self.alpha[t] * self.beta[t])
            
            for j in range(self.H):
                self.Post_smoother[t, j] = (self.alpha[t, j] * self.beta[t, j])  / sum_of_row      
        
        print("beta")
        print(self.beta)
        print("Posterior")
        print(self.Post_smoother)
 
        return self.Post_smoother
    
    
    def viturbi(self):
        T = len(self.Viz)
        
        # Obtain the joint probabity of the most likely path that ends in state j at time t
        delta = np.zeros((T, self.H))
        delta[0, :] = (self.Pi * self.Obs_M[:, Viz[0]])
 

        prev = np.zeros((T, self.H))
        prev[0,:] = np.repeat(None, 3)
 
        for t in range(1, T):
            for j in range(self.H):
                # The most likely state given our previoius state at t-1
                
                prob = delta[t - 1] * (self.Trans_M[:, j])
 
                #  The probability of the most probable state given the previous state and the observation at time t
                
                delta[t, j] = np.max(prob) * (self.Obs_M[j, Viz[t]])                
                
                # The most probable state given previous state 

                prev[t, j] = np.argmax(prob)
 
                
        # Path Array
        S = np.zeros(T)
 
        # Find the most probable last hidden state
        last_state = np.argmax(delta[T-1, :])
 
        S[T-1] = last_state
        
        # Find the most probable hidden states at the previous times
        # Walk backwords
        ### Insert your code here ###
        for t in range(T-1, 0, -1):
            S[t] = np.argmax(delta[t, :])
            
        # Change to states numbers in problem (i.e., +1)
        S = S+1
            
        S = S.reshape([1,3])
 
        # Path, S, as a dataframe 
        # Create a list of column names, Time  
        cols = list()
        for i in range(1,T+1):
            cols.append("Time "+(str(i)))
        Path = pd.DataFrame(S, columns = cols)
        print('delta')
        print(delta)
        print('Previous')
        print(prev)        
        print("Path")
        print(Path)
        return Path
 

    def predictor(self, steps = 1):
        T = len(self.Viz)
        # Hidden state prediction probabilities using filtering results (Post)
        ### Insert your code here ### dot product of filter post and transition matrix
        Pred_Hidden = Pred_Hidden = self.Post[T-1, :] @ self.Trans_M 
        print("Predicted Hidden State")
        print(Pred_Hidden)
        # Visible state prediction using the predicted hidden state probabilities
        ### Insert your code here ### dot product pred_hidden with observed matrix
        Pred_Visible = Pred_Visible = Pred_Hidden @ self.Obs_M
        print("Predicted Visible State")
        print(Pred_Visible)

## Burglary Problem from the Lecture

In [2]:
# Example lecture problem: Burglar in an apartment
# Data 
# Transition matrix
TM = np.array([[.1,.4,.5],[.4,.0,.6],[0,.6,.4]])  
# Observation matrix
OM = np.array([[.6,.2,.2],[.2,.6,.2],[.2,.2,.6]])
OM = OM.T
# Prior probabilities of hidden states
p = [1,0,0]
# Observed visible states
Viz = [0,2,2]


In [3]:
# Filtering results
hmm1 = HMM(Viz, TM, OM, p)
hmm1.filter()

self.alpha
[[0.6     0.      0.     ]
 [0.012   0.048   0.18   ]
 [0.00408 0.02256 0.06408]]
Posterior
[[1.         0.         0.        ]
 [0.05       0.2        0.75      ]
 [0.04497354 0.24867725 0.70634921]]


array([[1.        , 0.        , 0.        ],
       [0.05      , 0.2       , 0.75      ],
       [0.04497354, 0.24867725, 0.70634921]])

In [4]:
# Filtering results - Answers
hmm1 = HMM(Viz, TM, OM, p)
hmm1.filter()

self.alpha
[[0.6     0.      0.     ]
 [0.012   0.048   0.18   ]
 [0.00408 0.02256 0.06408]]
Posterior
[[1.         0.         0.        ]
 [0.05       0.2        0.75      ]
 [0.04497354 0.24867725 0.70634921]]


array([[1.        , 0.        , 0.        ],
       [0.05      , 0.2       , 0.75      ],
       [0.04497354, 0.24867725, 0.70634921]])

In [5]:
# Smoothing results
hmm1.smoother()

beta
[[0.1512 0.1616 0.1392]
 [0.4    0.44   0.36  ]
 [1.     1.     1.    ]]
Posterior
[[1.         0.         0.        ]
 [0.05291005 0.23280423 0.71428571]
 [0.04497354 0.24867725 0.70634921]]


array([[1.        , 0.        , 0.        ],
       [0.05291005, 0.23280423, 0.71428571],
       [0.04497354, 0.24867725, 0.70634921]])

In [6]:
# Smoothing results - Answers
hmm1.smoother()

beta
[[0.1512 0.1616 0.1392]
 [0.4    0.44   0.36  ]
 [1.     1.     1.    ]]
Posterior
[[1.         0.         0.        ]
 [0.05291005 0.23280423 0.71428571]
 [0.04497354 0.24867725 0.70634921]]


array([[1.        , 0.        , 0.        ],
       [0.05291005, 0.23280423, 0.71428571],
       [0.04497354, 0.24867725, 0.70634921]])

In [7]:
# Decoding results using the Viturbi algorithm
hmm1.viturbi()

delta
[[0.6     0.      0.     ]
 [0.012   0.048   0.18   ]
 [0.00384 0.0216  0.0432 ]]
Previous
[[nan nan nan]
 [ 0.  0.  0.]
 [ 1.  2.  2.]]
Path
   Time 1  Time 2  Time 3
0     1.0     3.0     3.0


Unnamed: 0,Time 1,Time 2,Time 3
0,1.0,3.0,3.0


In [8]:
# Decoding results using the Viturbi algorithm - Answers
hmm1.viturbi()

delta
[[0.6     0.      0.     ]
 [0.012   0.048   0.18   ]
 [0.00384 0.0216  0.0432 ]]
Previous
[[nan nan nan]
 [ 0.  0.  0.]
 [ 1.  2.  2.]]
Path
   Time 1  Time 2  Time 3
0     1.0     3.0     3.0


Unnamed: 0,Time 1,Time 2,Time 3
0,1.0,3.0,3.0


In [9]:
hmm1.predictor()

Predicted Hidden State
[0.10396825 0.44179894 0.4542328 ]
Predicted Visible State
[0.2415873  0.37671958 0.38169312]


In [10]:
# One step predictions of hidden and visible states - answers

hmm1.predictor()

Predicted Hidden State
[0.10396825 0.44179894 0.4542328 ]
Predicted Visible State
[0.2415873  0.37671958 0.38169312]


## HW 5 Problem - Investment Decision