<a href="https://colab.research.google.com/github/Lisker2/ML/blob/main/simple_implementation/HMMs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import scipy
import itertools

In [None]:
# Define a MM with three states
state = 3
# transition probability matrix
A = np.array([
    [0.1, 0.4, 0.5],
    [0.3, 0.2, 0.5],
    [0.1, 0.6, 0.3]
])
# initial probability distribution (stable)
_, left_eigenvector = scipy.linalg.eig(A, left=True, right=False)
pi = left_eigenvector[:,0] / np.sum(left_eigenvector[:,0])

print('Number of state is\n', state)
print('Transition matrix\n', A)
print('Initial probability distribution\n', pi)

Number of state is
 3
Transition matrix
 [[0.1 0.4 0.5]
 [0.3 0.2 0.5]
 [0.1 0.6 0.3]]
Initial probability distribution
 [0.18055556 0.40277778 0.41666667]


In [None]:
def sequence_generator(num, start):
  res = []
  current = start
  res.append(current)
  for i in range(num):
    # choose the next state based on the transition probability
    current = np.random.choice([i for i in range(state)], p=A[current])
    res.append(current)
  return res

In [None]:
res_100 = sequence_generator(100, 0)
res_500 = sequence_generator(500, 0)
res_100000 = sequence_generator(100000, 0)

In [None]:
def eval_res(res):

  n = len(res)
  # count transitions from state i to state j
  transition = np.zeros((state, state))
  # count appearance of each states
  pi = np.zeros(state)

  for i in range(n - 1):
    current = res[i]
    next = res[i + 1]
    pi[current] += 1
    transition[current][next] += 1
  
  transition = [transition[i]/pi[i]for i in range(state)]
  pi /= n 

  print("Estimation of transition probability matrix:\n", transition)
  print("Estimation of initial probability distribution:\n", pi)

In [None]:
eval_res(res_100)
eval_res(res_500)
eval_res(res_100000)

Estimation of transition probability matrix:
 [array([0.09090909, 0.54545455, 0.36363636]), array([0.39473684, 0.18421053, 0.42105263]), array([0.1, 0.5, 0.4])]
Estimation of initial probability distribution:
 [0.21782178 0.37623762 0.3960396 ]
Estimation of transition probability matrix:
 [array([0.14606742, 0.46067416, 0.39325843]), array([0.29 , 0.175, 0.535]), array([0.08056872, 0.58767773, 0.33175355])]
Estimation of initial probability distribution:
 [0.17764471 0.3992016  0.42115768]
Estimation of transition probability matrix:
 [array([0.09698978, 0.39895057, 0.50405965]), array([0.30059198, 0.19856233, 0.50084569]), array([0.10225228, 0.59962582, 0.2981219 ])]
Estimation of initial probability distribution:
 [0.18104819 0.40203598 0.41690583]


The results of generated sequences of lengths 100 and 500 differ slightly from the orginal setup. Because of the length limit, it's just a rough approximation. However, if one increases the length to 10000, the result is almost identical to the orginal setup since the length is long enough to ignore the start state. 

In [None]:
# observation probability matrix
B = np.array([
    [0.5, 0.1, 0.1, 0.1, 0.2],
    [0.1, 0.3, 0.4, 0.1, 0.1],
    [0.2, 0.1, 0.1, 0.3, 0.4]
])
# choose an observation sequence
observation = [0, 1, 2, 3, 4]

In [None]:
# Enumerate all the combinations. In this case 3**5 = 243 times calculation.

def dumb(observation):
  n = len(observation)

  # all possible combinations [1, 1, 1, 1, 1], [1, 1, 1, 1, 2]....... -> len(all) = 243
  all = list(itertools.product([_ for _ in range(state)], repeat=n))
  # the best sequence
  best_seq = all[0]
  # the possibility of the best sequence
  best_p = 0

  for seq in all:
    # for each possible sequence, initialize the temp_p to be initial probability times p(the first observation|initial state)
    temp_p = pi[seq[0]] * B[seq[0]][observation[0]]
    for t in range(1, n):
      # temp_p *= p(current state|previous state) * p(current observation|current state)
      temp_p *= (A[seq[t - 1]][seq[t]] * B[seq[t]][observation[t]])
    
    # update best sequence and possibility if it's higher
    if temp_p > best_p:
      best_p, best_seq  = temp_p, seq
  
  return best_seq, best_p

In [None]:
dumb(observation)

((2, 1, 1, 2, 2), 2.16e-05)

In [None]:
# This follows pseudo code in ppt.

def viterbi(observation):

  n = len(observation)
  # initialization (t, n) cahce and backpointer
  viterbi = np.zeros((state, n))
  # initialize the temp_p to be initial probability times p(the first observation|initial state)
  viterbi[:, 0] = pi * B[:, observation[0]]
  backpointer = np.zeros((state, n))
  best_path_prob = 0
  best_path = [0 for _ in range(n)]

  #recursion step
  for t in range(1, n):
    for s in range(state):
      
      # from previous time step : for all possible previous state p(current state|previous state) * p(current observation|current state)
      temp = viterbi[:, t - 1] * A[:, s] * B[s][observation[t]]
      viterbi[s, t] = max(temp) # choose the largest p from all possible previous state
      backpointer[s, t] = np.argmax(temp) # argmax return index of state
  
  # the result of best possibility will be the max value at the end time, the last column
  best_path_prob = max(viterbi[:, -1])
  # follow the best state in the last column, move backward to find the best path
  best_path[-1] = np.argmax(viterbi[:, -1])
  
  for i in range(n - 2, -1, -1):
    best_path[i] = int(backpointer[int(best_path[i + 1]), i + 1])

  return best_path, best_path_prob

In [None]:
viterbi(observation)

([2, 1, 1, 2, 2], 2.1599999999999996e-05)

Looks good.