In [163]:
import numpy as np
np.savez('../data/UserCase-One.npz',
         prior_probabilities=np.array([0.67, 0.33]),
         transition_probabilities=np.array([[0.8, 0.2], 
                                         [0.4, 0.6]]) ,
         emission_probabilities=np.array([[0.8, 0.2],
                                       [0.4, 0.6]]),
         observation_states=np.array(['motivated', 'motivated', 'burned-out', 'burned-out', 'burned-out', 'motivated']),
         hidden_states=np.array(['R01','R01', 'R21', 'R21', 'R21', 'R01']))
data = np.load('../data/UserCase-One.npz')

In [164]:
import numpy as np
class HiddenMarkovModel:
    """_summary_
    """

    def __init__(self, observation_states: np.ndarray, hidden_states: np.ndarray, prior_probabilities: np.ndarray, transition_probabilities: np.ndarray, emission_probabilities: np.ndarray):
        """_summary_

        Args:
            observation_states (np.ndarray): _description_
            hidden_states (np.ndarray): _description_
            prior_probabilities (np.ndarray): _description_
            transition_probabilities (np.ndarray): _description_
            emission_probabilities (np.ndarray): _description_
        """
        self.observation_states = observation_states
        self.observation_states_dict = {observation_state: observation_state_index \
                                  for observation_state_index, observation_state in enumerate(list(self.observation_states))}

        self.hidden_states = hidden_states
        self.hidden_states_dict = {hidden_state_index: hidden_state \
                                   for hidden_state_index, hidden_state in enumerate(list(self.hidden_states))}

        self.prior_probabilities = prior_probabilities
        self.transition_probabilities = transition_probabilities
        self.emission_probabilities = emission_probabilities

In [220]:
import copy
import numpy as np
class ViterbiAlgorithm:
    """_summary_
    """

    def __init__(self, hmm_object):
        """_summary_

        Args:
            hmm_object (_type_): _description_
        """
        self.hmm_object = hmm_object

    def best_hidden_state_sequence(self, decode_observation_states: np.ndarray) -> np.ndarray:
        """_summary_

        Args:
            decode_observation_states (np.ndarray): _description_

        Returns:
            np.ndarray: _description_
        """

        # Initialize path (i.e., np.arrays) to store the hidden sequence states returning the maximum probability
        path = np.zeros((len(decode_observation_states),
                         len(self.hmm_object.hidden_states))).astype('int')
        path[0, :] = [hidden_state_index for hidden_state_index in range(len(self.hmm_object.hidden_states))]

        best_path = np.zeros((len(decode_observation_states),
                         len(self.hmm_object.hidden_states)))

        # Compute initial delta:
        # 1. Calculate the product of the prior and emission probabilities. This will be used to decode the first observation state.
        # 2. Scale
        #delta = np.zeros((len(decode_observation_states),
                         #len(self.hmm_object.hidden_states)))

        obs_decode = []
        for i in range(len(self.hmm_object.observation_states)):
            obs_decode_val = self.hmm_object.observation_states_dict.get(hmm_object.observation_states[i])
            obs_decode.append(obs_decode_val)

        decode_observation_states = obs_decode
        #decode_observation_states = np.array(decode_observation_states)

        delta = np.multiply(self.hmm_object.prior_probabilities, self.hmm_object.emission_probabilities)
        delta = delta.squeeze()
        delta = delta / np.sum(delta)  # Scale

        path[:, 0] = np.multiply(self.hmm_object.prior_probabilities, self.hmm_object.emission_probabilities[:, decode_observation_states[0]])

        # For each observation state to decode, select the hidden state sequence with the highest probability (i.e., Viterbi trellis)
        for trellis_node in range(1, len(decode_observation_states)):

            # TODO: comment the initialization, recursion, and termination steps

            # product_of_delta_and_transition_emission = np.multiply(delta*self.hmm_object.transition_probabilities)

            # Update delta and scale

            # Select the hidden state sequence with the maximum probability

            # Update best path
            for hidden_state in range(len(self.hmm_object.hidden_states)):
                temp_product = np.multiply(self.hmm_object.transition_probabilities[:, hidden_state], path[:, trellis_node - 1])
                path[hidden_state, trellis_node] = np.max(temp_product) * self.hmm_object.emission_probabilities[hidden_state, decode_observation_states[trellis_node]]
                best_path[hidden_state, trellis_node - 1] = np.argmax(temp_product)

                """
                k = np.argmax(k in decode_observation_states[k, trellis_node - 1]
                              * self.hmm_object.transition_probabilities[k, hidden_state]
                              * self.hmm_object.emission_probabilities[hidden_state, trellis_node])
                decode_observation_states[hidden_state, trellis_node] = decode_observation_states[k, trellis_node] \
                                                                        * self.hmm_object.transition_probabilities[k, hidden_state] \
                                                                        * self.hmm_object.emission_probabilities[hidden_state, trellis_node]

                """

            # Set best hidden state sequence in the best_path np.ndarray THEN copy the best_path to path
            path = best_path.copy()

        # Select the last hidden state, given the best path (i.e., maximum probability)
        best_hidden_state_path = np.zeros(len(decode_observation_states))
        best_hidden_state_path[-1] = np.argmax(path[:, -1])
        for n in range(len(decode_observation_states)-2, -1, -1):
            best_hidden_state_path[n] = best_path[(best_hidden_state_path[n+1], n)]

        for i in best_hidden_state_path:
            best_hidden_state_path = self.hmm_object.hidden_states_dict[i]

        best_hidden_state_path = np.array([best_hidden_state_path])

        return best_hidden_state_path

In [221]:
hmm_object = HiddenMarkovModel(data['observation_states'], data['hidden_states'], data['prior_probabilities'], data['transition_probabilities'], data['emission_probabilities'])

In [222]:
print(hmm_object.observation_states_dict)

{'motivated': 5, 'burned-out': 4}


In [223]:
print(hmm_object.observation_states)
for i in hmm_object.observation_states:
    print(i)

['motivated' 'motivated' 'burned-out' 'burned-out' 'burned-out'
 'motivated']
motivated
motivated
burned-out
burned-out
burned-out
motivated


In [224]:
obs_decode = []
for i in range(len(hmm_object.observation_states)):
    obs_decode_val = hmm_object.observation_states_dict.get(hmm_object.observation_states[i])
    obs_decode.append(obs_decode_val)
print(obs_decode)

[5, 5, 4, 4, 4, 5]


In [225]:
viterbi_instance = ViterbiAlgorithm(hmm_object)

In [226]:
print(viterbi_instance.best_hidden_state_sequence(data['observation_states']))
print(hmm_object.observation_states_dict)

IndexError: index 5 is out of bounds for axis 1 with size 2

In [81]:
prior_probabilities= np.array([0.67, 0.33]),
transition_probabilities= np.array([[0.8, 0.2], [0.4, 0.6]]),
emission_probabilities= np.array([[0.8, 0.2], [0.4, 0.6]]),
observation_states= np.array(['motivated', 'motivated', 'burned-out', 'burned-out', 'burned-out', 'motivated']),
hidden_states= np.array(['R01','R01', 'R21', 'R21', 'R21', 'R01'])

In [82]:
delta = np.multiply(prior_probabilities, emission_probabilities)
print(delta)

[[[0.536 0.066]
  [0.268 0.198]]]


In [83]:
delta = delta.squeeze()

In [84]:
delta.shape

(2, 2)

In [151]:
decode_observation_states = list(observation_states_dict.keys())

SyntaxError: incomplete input (305701024.py, line 1)

In [100]:
observation_states= np.array(['motivated', 'motivated', 'burned-out', 'burned-out', 'burned-out', 'motivated'])
hidden_states= np.array(['R01','R01', 'R21', 'R21', 'R21', 'R01'])
path = np.zeros((len(observation_states),
                 len(hidden_states)))
print(len(observation_states))
print(len(hidden_states))
print(path)

6
6
[[0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]]
[[0. 0.]
 [0. 0.]]


In [101]:
path[0, :] = [hidden_state_index for hidden_state_index in range(len(hidden_states))]

best_path = np.zeros((len(observation_states),
                 len(hidden_states)))

In [102]:
print(path)

[[0. 1. 2. 3. 4. 5.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]]


In [103]:
print(best_path)

[[0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]]


In [58]:
print(np.sum(norm_delta, axis=2))

[[1.03199635 0.87007543]]


In [64]:
row_sums = delta.sum(axis=1)
new_matrix = delta / row_sums[:, np.newaxis]

In [65]:
print(new_matrix)

[[[0.66666667 0.25      ]
  [0.33333333 0.75      ]]]


In [67]:
np.sum(new_matrix, axis=1)

array([[1., 1.]])

In [68]:
new_matrix.ndim

3