- [Reference](https://pythonguides.com/scikit-learn-hidden-markov-model/)
- [Reference](https://www.digitalvidya.com/blog/markov-models/)
- [Reference](https://www.annytab.com/hidden-markov-model-in-python/)

In [8]:
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plot
%matplotlib inline

states = ['sleeping', 'eating', 'pooping']
pi = [0.45, 0.35, 0.2]
state_space = pd.Series(pi, index=states, name='states')
print(state_space)
print(state_space.sum())
q_df = pd.DataFrame(columns=states, index=states)
q_df.loc[states[0]] = [0.4, 0.2, 0.4]
q_df.loc[states[1]] = [0.40, 0.40, 0.2]
q_df.loc[states[2]] = [0.40, 0.20, .4]

print(q_df)

q_f = q_df.values
print('\n', q_f, q_f.shape, '\n')
print(q_df.sum(axis=1))
from pprint import pprint 

def _get_markov_edges(Q):
    edge = {}
    for column in Q.columns:
        for index in Q.index:
            edge[(index,column)] = Q.loc[index,column]
    return edge

edge_wt = _get_markov_edges(q_df)
pprint(edge_wt)
Graph = nx.MultiDiGraph()


Graph.add_nodes_from(states)
print(f'Nodes:\n{Graph.nodes()}\n')


for k, v in edge_wt.items():
    tmp_origin, tmp_destination = k[0], k[1]
    Graph.add_edge(tmp_origin, tmp_destination, weight=v, label=v)
print(f'Edges:')
pprint(Graph.edges(data=True))    


sleeping    0.45
eating      0.35
pooping     0.20
Name: states, dtype: float64
1.0
         sleeping eating pooping
sleeping      0.4    0.2     0.4
eating        0.4    0.4     0.2
pooping       0.4    0.2     0.4

 [[0.4 0.2 0.4]
 [0.4 0.4 0.2]
 [0.4 0.2 0.4]] (3, 3) 

sleeping    1.0
eating      1.0
pooping     1.0
dtype: float64
{('eating', 'eating'): 0.4,
 ('eating', 'pooping'): 0.2,
 ('eating', 'sleeping'): 0.4,
 ('pooping', 'eating'): 0.2,
 ('pooping', 'pooping'): 0.4,
 ('pooping', 'sleeping'): 0.4,
 ('sleeping', 'eating'): 0.2,
 ('sleeping', 'pooping'): 0.4,
 ('sleeping', 'sleeping'): 0.4}
Nodes:
['sleeping', 'eating', 'pooping']

Edges:
OutMultiEdgeDataView([('sleeping', 'sleeping', {'weight': 0.4, 'label': 0.4}), ('sleeping', 'eating', {'weight': 0.2, 'label': 0.2}), ('sleeping', 'pooping', {'weight': 0.4, 'label': 0.4}), ('eating', 'sleeping', {'weight': 0.4, 'label': 0.4}), ('eating', 'eating', {'weight': 0.4, 'label': 0.4}), ('eating', 'pooping', {'weight': 0.2, 'label': 

In [7]:
# Import libraries
import numpy as np
import pandas as pd
import pprint
# Get markov edges
def get_markov_edges(df):
    # Create a dictionary
    edges = {}
    # Loop columns
    for column in df.columns:
        # Loop rows
        for row in df.index:
            edges[(row,column)] = df.loc[row,column]
    # Return edges
    return edges
# Viterbi algorithm for shortest path
# https://github.com/alexsosn/MarslandMLAlgo/blob/master/Ch16/HMM.py
def viterbi(pi, a, b, obs):
    
    nStates = np.shape(b)[0]
    T = np.shape(obs)[0]
    
    path = np.zeros(T)
    delta = np.zeros((nStates, T))
    phi = np.zeros((nStates, T))
    
    delta[:, 0] = pi * b[:, obs[0]]
    phi[:, 0] = 0
    for t in range(1, T):
        for s in range(nStates):
            delta[s, t] = np.max(delta[:, t-1] * a[:, s]) * b[s, obs[t]] 
            phi[s, t] = np.argmax(delta[:, t-1] * a[:, s])
    
    path[T-1] = np.argmax(delta[:, T-1])
    for t in range(T-2, -1, -1):
        path[t] = phi[int(path[t+1]),int(t+1)]
        
    return path, delta, phi
# The main entry point for this module
def main():
    # Observation states
    # The director can have an umbrella or not have an umbrella (equally likely)
    observation_states = ['Umbrella', 'No umbrella']
    # Create hidden states with probabilities (250 rainy days per year)
    p = [0.32, 0.68]
    #p = [0.5, 0.5]
    #p = [0.7, 0.3]
    hidden_states = ['Sunny', 'Rainy']
    state_space = pd.Series(p, index=hidden_states, name='states')
    # Print hidden states
    print('--- Hidden states ---')
    print(state_space)
    print()
    # Create a hidden states transition matrix with probabilities
    hidden_df = pd.DataFrame(columns=hidden_states, index=hidden_states)
    hidden_df.loc[hidden_states[0]] = [0.75, 0.25]
    hidden_df.loc[hidden_states[1]] = [0.25, 0.75]
    # Print transition matrix
    print('--- Transition matrix for hidden states ---')
    print(hidden_df)
    print()
    print(hidden_df.sum(axis=1))
    print()
    # Create matrix of observations with sensor probabilities
    observations_df = pd.DataFrame(columns=observation_states, index=hidden_states)
    observations_df.loc[hidden_states[0]] = [0.1, 0.9]
    observations_df.loc[hidden_states[1]] = [0.8, 0.2]
    # Print observation matrix
    print('--- Sensor matrix ---')
    print(observations_df)
    print()
    print(observations_df.sum(axis=1))
    print()
    # Create graph edges and weights
    hidden_edges = get_markov_edges(hidden_df)
    observation_edges = get_markov_edges(observations_df)
    # Print edges
    print('--- Hidden edges ---')
    pprint.pprint(hidden_edges)
    print()
    print('--- Sensor edges ---')
    pprint.pprint(observation_edges)
    print()
    # Observations
    observations_map = {0:'Umbrella', 1:'No umbrella'}
    observations = np.array([1,1,1,0,1,1,1,0,0,0])
    observerations_path = [observations_map[v] for v in list(observations)]
    # Get predictions with the viterbi algorithm
    path, delta, phi = viterbi(p, hidden_df.values, observations_df.values, observations)
    state_map = {0:'Sunny', 1:'Rainy'}
    state_path = [state_map[v] for v in path]
    state_delta = np.amax(delta, axis=0)
    # Print predictions
    print('--- Predictions ---')
    print(pd.DataFrame().assign(Observation=observerations_path).assign(Prediction=state_path).assign(Delta=state_delta))
    print()
# Tell python to run main method
if __name__ == "__main__": main()

--- Hidden states ---
Sunny    0.32
Rainy    0.68
Name: states, dtype: float64

--- Transition matrix for hidden states ---
      Sunny Rainy
Sunny  0.75  0.25
Rainy  0.25  0.75

Sunny    1.0
Rainy    1.0
dtype: float64

--- Sensor matrix ---
      Umbrella No umbrella
Sunny      0.1         0.9
Rainy      0.8         0.2

Sunny    1.0
Rainy    1.0
dtype: float64

--- Hidden edges ---
{('Rainy', 'Rainy'): 0.75,
 ('Rainy', 'Sunny'): 0.25,
 ('Sunny', 'Rainy'): 0.25,
 ('Sunny', 'Sunny'): 0.75}

--- Sensor edges ---
{('Rainy', 'No umbrella'): 0.2,
 ('Rainy', 'Umbrella'): 0.8,
 ('Sunny', 'No umbrella'): 0.9,
 ('Sunny', 'Umbrella'): 0.1}

--- Predictions ---
   Observation Prediction     Delta
0  No umbrella      Sunny  0.288000
1  No umbrella      Sunny  0.194400
2  No umbrella      Sunny  0.131220
3     Umbrella      Sunny  0.026244
4  No umbrella      Sunny  0.006643
5  No umbrella      Sunny  0.004484
6  No umbrella      Sunny  0.003027
7     Umbrella      Rainy  0.000605
8     Umbrella 

In [1]:
import numpy as np
import pandas as pd

def viterbi(pi, a, b, obs):
    
    nStates = np.shape(b)[0]
    T = np.shape(obs)[0]
    
    # init blank path
    path = path = np.zeros(T,dtype=int)
    # delta --> highest probability of any path that reaches state i
    delta = np.zeros((nStates, T))
    # phi --> argmax by time step for each state
    phi = np.zeros((nStates, T))
    
    # init delta and phi 
    delta[:, 0] = pi * b[:, obs[0]]
    phi[:, 0] = 0

    print('\nStart Walk Forward\n')    
    # the forward algorithm extension
    for t in range(1, T):
        for s in range(nStates):
            delta[s, t] = np.max(delta[:, t-1] * a[:, s]) * b[s, obs[t]]
            phi[s, t] = np.argmax(delta[:, t-1] * a[:, s])
            print('s={s} and t={t}: phi[{s}, {t}] = {phi}'.format(s=s, t=t, phi=phi[s, t]))
    
    # find optimal path
    print('-'*50)
    print('Start Backtrace\n')
    path[T-1] = np.argmax(delta[:, T-1])
    for t in range(T-2, -1, -1):
        path[t] = phi[path[t+1], [t+1]]
        print('path[{}] = {}'.format(t, path[t]))
        
    return path, delta, phi
obs_map = {'Cold':0, 'Hot':1}
obs = np.array([1,1,0,1,0,0,1,0,1,1,0,0,0,1])

inv_obs_map = dict((v,k) for k, v in obs_map.items())
obs_seq = [inv_obs_map[v] for v in list(obs)]

print("Simulated Observations:\n",pd.DataFrame(np.column_stack([obs, obs_seq]),columns=['Obs_code', 'Obs_seq']) )

pi = [0.6,0.4] # initial probabilities vector
states = ['Cold', 'Hot']
hidden_states = ['Snow', 'Rain', 'Sunshine']
pi = [0, 0.2, 0.8]
state_space = pd.Series(pi, index=hidden_states, name='states')
a_df = pd.DataFrame(columns=hidden_states, index=hidden_states)
a_df.loc[hidden_states[0]] = [0.3, 0.3, 0.4]
a_df.loc[hidden_states[1]] = [0.1, 0.45, 0.45]
a_df.loc[hidden_states[2]] = [0.2, 0.3, 0.5]
print("\n Transition probability:\n", a_df)
a = a_df.values

observable_states = states
b_df = pd.DataFrame(columns=observable_states, index=hidden_states)
b_df.loc[hidden_states[0]] = [1,0]
b_df.loc[hidden_states[1]] = [0.8,0.2]
b_df.loc[hidden_states[2]] = [0.3,0.7]
print("\n Emission probability:\n",b_df)
b = b_df.values
path, delta, phi = viterbi(pi, a, b, obs)
state_map = {0:'Snow', 1:'Rain', 2:'Sunshine'}
state_path = [state_map[v] for v in path]
pd.DataFrame().assign(Observation=obs_seq).assign(Best_Path=state_path)

Simulated Observations:
    Obs_code Obs_seq
0         1     Hot
1         1     Hot
2         0    Cold
3         1     Hot
4         0    Cold
5         0    Cold
6         1     Hot
7         0    Cold
8         1     Hot
9         1     Hot
10        0    Cold
11        0    Cold
12        0    Cold
13        1     Hot

 Transition probability:
          Snow  Rain Sunshine
Snow      0.3   0.3      0.4
Rain      0.1  0.45     0.45
Sunshine  0.2   0.3      0.5

 Emission probability:
          Cold  Hot
Snow        1    0
Rain      0.8  0.2
Sunshine  0.3  0.7

Start Walk Forward

s=0 and t=1: phi[0, 1] = 2.0
s=1 and t=1: phi[1, 1] = 2.0
s=2 and t=1: phi[2, 1] = 2.0
s=0 and t=2: phi[0, 2] = 2.0
s=1 and t=2: phi[1, 2] = 2.0
s=2 and t=2: phi[2, 2] = 2.0
s=0 and t=3: phi[0, 3] = 0.0
s=1 and t=3: phi[1, 3] = 1.0
s=2 and t=3: phi[2, 3] = 1.0
s=0 and t=4: phi[0, 4] = 2.0
s=1 and t=4: phi[1, 4] = 2.0
s=2 and t=4: phi[2, 4] = 2.0
s=0 and t=5: phi[0, 5] = 0.0
s=1 and t=5: phi[1, 5] = 1.0
s=2 

Unnamed: 0,Observation,Best_Path
0,Hot,Sunshine
1,Hot,Sunshine
2,Cold,Rain
3,Hot,Sunshine
4,Cold,Rain
5,Cold,Rain
6,Hot,Sunshine
7,Cold,Rain
8,Hot,Sunshine
9,Hot,Sunshine
