In [27]:
from src.models import hmm, decoders
import numpy as np

def trellis_node_step (prev_delta, prob_emiss, prob_trans, current_observation):
    # Multiply previous delta with transition probabilities 
    _delta_trans = np.multiply(prev_delta, prob_trans.T)
    # Get the likely scenario of the node
    probable_state = np.argmax(_delta_trans, axis = 1)
    probable_state_probs = np.amax(_delta_trans, axis = 1)
    # Calculate the new delta
    new_delta = probable_state_probs * prob_emiss[:, current_observation]
    return new_delta, probable_state

In [28]:
# index annotation observation_states=[i,j]    
observation_states = ['on-time','late'] 

# index annotation hidden_states=[i,j]
hidden_states = ['no-traffic','traffic']

# PONDERING QUESTION: How would a user define/compute their own HMM instantiation inputs to decode the hidden states for their use case observations?
use_case_one_data = np.load('./data/UserCase-One.npz')


# Edit for the last hidden states 
new_hidden_states = np.array(['no-traffic', 'no-traffic', 'traffic', 'traffic', 'traffic', 'no-traffic'])

# Instantiate submodule class models.HiddenMarkovModel with
# observation and hidden states and prior, transition, and emission probabilities.
use_case_one_hmm = hmm.HiddenMarkovModel(observation_states,
                                     hidden_states,
                  use_case_one_data['prior_probabilities'], # prior probabilities of hidden states in the order specified in the hidden_states list
                  use_case_one_data['transition_probabilities'], # transition_probabilities[:,hidden_states[i]]
                  use_case_one_data['emission_probabilities']) # emission_probabilities[hidden_states[i],:][:,observation_states[j]]

# Instantiate submodule class models.ViterbiAlgorithm using the use case one HMM 
use_case_one_viterbi = decoders.ViterbiAlgorithm(use_case_one_hmm)

use_case_decoded_hidden_states = use_case_one_viterbi.best_hidden_state_sequence(use_case_one_data['observation_states'])

In [29]:
prior_probabilities = use_case_one_data['prior_probabilities']
transition_probabilities = use_case_one_data['transition_probabilities']
emission_probabilities = use_case_one_data['emission_probabilities']

print("Prior probabilities")
print(prior_probabilities)
print("Transition probabilities")
print(transition_probabilities)
print("Emission probabilities")
print(emission_probabilities)

Prior probabilities
[0.67 0.33]
Transition probabilities
[[0.8 0.2]
 [0.4 0.6]]
Emission probabilities
[[0.8 0.2]
 [0.4 0.6]]


In [30]:
# match variable names
decode_observation_states = use_case_one_data['observation_states']
prior_probs = prior_probabilities
emission_probs = emission_probabilities
transition_probs = transition_probabilities

observation_states_dict = use_case_one_hmm.observation_states_dict
hidden_states_dict = use_case_one_hmm.hidden_states_dict

In [31]:
# Initialize to hold paths of previous states:
all_prev_states = np.zeros((len(decode_observation_states), 
                            len(hidden_states)))
all_prev_states[0,:] = [hidden_state_index for hidden_state_index in range(len(hidden_states))]


# Initialize to holde hidden state probabilities
delta = np.multiply(prior_probs, emission_probs[:,observation_states_dict[decode_observation_states[0]]])
delta = delta / np.sum(delta)
# Loop through each observation state node:
for trellis_node in range(1, len(decode_observation_states)): 
#    print("Current trellis node: " + str(trellis_node))
    # Get the current observation state
    current_observation = observation_states_dict[decode_observation_states[trellis_node]]
#    print("Current observation: " + str(observation_states_dict[decode_observation_states[trellis_node]]) + " code: " + str(current_observation))
    # Calculate state and update delta
    delta, prev_states = trellis_node_step(delta, emission_probs, transition_probs, current_observation)
#     print("Updated delta")
#     print(delta)
    # Store states
    all_prev_states[trellis_node, :] = prev_states

In [32]:
all_prev_states

array([[0., 1.],
       [0., 0.],
       [0., 0.],
       [0., 1.],
       [0., 1.],
       [1., 1.]])

In [33]:
print("Final delta: ")
print(delta)

state = np.argmax(delta)
print("State " + str(state))


sequence_prob = np.amax(delta)
print("sequence probability: " + str(sequence_prob))

Final delta: 
[0.00255567 0.00191675]
State 0
sequence probability: 0.0025556685413173656


In [23]:
# Starting from the last row of all_prev_states:
for prev_states in all_prev_states[::-1]:
    print(prev_states)
    # Get the state
    print("state :" + str(prev_states[int(state)]))

[1. 1.]
state :1.0
[0. 1.]
state :0.0
[0. 1.]
state :0.0
[0. 0.]
state :0.0
[0. 0.]
state :0.0
[0. 1.]
state :0.0


In [19]:
prev_states[1]

1.0

In [25]:
# Get the most likely state and sequence at the end 
state = np.argmax(delta)
sequence_prob = np.amax(delta)
best_path = []

for prev_states in all_prev_states[::-1]:
    print(prev_states)
    state = prev_states[int(state)]
    print("STATE: " + str(state))
    best_path.append(state)

# get the hidden state path
hidden_state_path = np.array([hidden_states_dict[i] for i in best_path[::-1]])

[1. 1.]
STATE: 1.0
[0. 1.]
STATE: 1.0
[0. 1.]
STATE: 1.0
[0. 0.]
STATE: 0.0
[0. 0.]
STATE: 0.0
[0. 1.]
STATE: 0.0


In [26]:
hidden_state_path

array(['no-traffic', 'no-traffic', 'no-traffic', 'traffic', 'traffic',
       'traffic'], dtype='<U10')

In [36]:
hidden_states_dict

{0: 'no-traffic', 1: 'traffic'}

In [31]:
# step through the first node
trellis_node = 1
# Get the current observation state
current_observation = observation_states_dict[decode_observation_states[trellis_node]]
print("Current observation: " + decode_observation_states[trellis_node] + " with code " + str(current_observation))

# multiply the previous delta with transition probabilities
print("Delta x Transition")
_delta_trans = np.multiply(prev_delta, transition_probs.T)
print(_delta_trans)
# Get the likely scenario of the node. This could be from either column
probable_state = np.argmax(_delta_trans, axis = 1)
print("Highest probability _delta_trans from index: ")
print(probable_state)
probable_state_probs = np.amax(_delta_trans, axis = 1)
print("The highest probability values at the index")
print(probable_state_probs)

# Calculate the new delta
new_delta = probable_state_probs * emission_probs[:, current_observation]
print("New delta")
print(new_delta)


Current observation: on-time with code 0
Delta x Transition


NameError: name 'prev_delta' is not defined

In [11]:
new_hidden_states

array(['no-traffic', 'no-traffic', 'traffic', 'traffic', 'traffic',
       'no-traffic'], dtype='<U10')