In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn
from mpl_toolkits.mplot3d import Axes3D

In [36]:
class Hidden_Markov_Model:

  def __init__(self, model_transition_matrix, initial_state_matrix, neighbourhood, rate):
    self.transition_matrix = model_transition_matrix
    self.current_state = initial_state_matrix
    self.obstacle = neighbourhood
    self.no_of_states = len(initial_state_matrix)
    self.sensor_error_rate = rate
    self.direction = {'N' : 0, 'S' : 1, 'W' : 2, 'E' : 3}

  def create_observation_matrix(self, observation):
    sensor_reading_probability = []
    for i in range (self.no_of_states):
      cur_reading = [False] * 4
      for j in range(4):
        cur_reading[j] = (int(observation[j]) == 1)
      no_of_discrepancy = 0;
      for j in range(4):
        if cur_reading[j] != self.obstacle[i][j]:
          no_of_discrepancy += 1
      probability = ((1 - self.sensor_error_rate) ** (4 - no_of_discrepancy)) * (self.sensor_error_rate ** no_of_discrepancy)
      sensor_reading_probability.append(probability)
      observation_matrix = np.zeros((len(sensor_reading_probability), len(sensor_reading_probability)))
      np.fill_diagonal(observation_matrix, sensor_reading_probability)
    return observation_matrix

  def get_emission_probabilities(self):
    emission_probability_list = [[None] * 16 for _ in range(self.no_of_states)]
    cur_obs = [None] * 4
    for i in range(16):
      observation = i
      cur_obs[3] = observation & 1
      cur_obs[2] = (observation >> 1) & 1 
      cur_obs[1] = (observation >> 2) & 1
      cur_obs[0] = (observation >> 3) & 1
      for j in range(self.no_of_states):
        no_of_discrepancy = 0
        for k in range(4):
          if cur_obs[k] != self.obstacle[j][k]:
            no_of_discrepancy += 1
        probability = ((1 - self.sensor_error_rate) ** (4 - no_of_discrepancy)) * (self.sensor_error_rate ** no_of_discrepancy)
        emission_probability_list[j][i] = probability
    emission_probability = np.array(emission_probability_list)
    return emission_probability

  def viterbi_algorithm(self, initial_state, emission_probabilities, observation_sequence):
    I = self.transition_matrix.shape[0]
    N = len(observation_sequence)
    temp_D = np.zeros((I, N))
    temp_E = np.zeros((I, N-1)).astype(np.int32)
    temp_D[:, 0] = np.multiply(initial_state, emission_probabilities[:, 0])
    for n in range(1, N):
        for i in range(I):
            temp_product = np.multiply(self.transition_matrix[:, i], temp_D[:, n-1])
            temp_D[i, n] = np.max(temp_product) * emission_probabilities[i, observation_sequence[n]]
            temp_E[i, n-1] = np.argmax(temp_product)
    S_opt = np.zeros(N).astype(np.int32)
    S_opt[-1] = np.argmax(temp_D[:, -1])
    for n in range(N-2, 0, -1):
        S_opt[n] = temp_E[int(S_opt[n+1]), n]
    return S_opt, temp_D, temp_E

  def filtering(self, observation_matrix):
    new_state = np.dot(observation_matrix, np.dot(self.transition_matrix, self.current_state))
    normalized_state = new_state / np.sum(new_state)
    self.current_state = normalized_state
    max_prob_val = 0.0
    for i in range(self.no_of_states):
      if normalized_state[i] > max_prob_val:
        likely_state = i
        max_prob_val = normalized_state[i]
    return likely_state, max_prob_val

  def prediction(self):
    new_state = np.dot(self.transition_matrix, self.current_state)
    normalized_state = new_state / np.sum(new_state)
    self.current_state = normalized_state
    return normalized_state
        
  


In [None]:
grid_size = 42
obstacle_ratio = 0.3
count_legal_grid = 0
grid = [[None] * grid_size for _ in range (grid_size)]
for i in range(grid_size):
  for j in range(grid_size):
    if np.random.rand() <= obstacle_ratio:
      grid[i][j] = 1
    else:
      grid[i][j] = 0
      count_legal_grid += 1
  #print(grid[i])

In [None]:
no_of_squares = grid_size * grid_size
neighbours = [[] for _ in range (no_of_squares)]
obstacle = [[True] * 4 for _ in range (no_of_squares)]
for i in range(grid_size):
  for j in range(grid_size):
    if grid[i][j] == 0:
      index = i * grid_size + j
      if i > 0 and grid[i-1][j] == 0:
          neighbours[index].append((i-1) * grid_size + j)
          obstacle[i][0] = False
      if i < grid_size - 1 and grid[i+1][j] == 0:
          neighbours[index].append((i+1) * grid_size + j)
          obstacle[i][1] = False
      if j > 0 and grid[i][j-1] == 0:
          neighbours[index].append(i * grid_size + (j-1))
          obstacle[i][2] = False
      if j < grid_size - 1 and grid[i][j+1] == 0:
          neighbours[index].append(i * grid_size + (j+1))
          obstacle[i][3] = False
    #print(neighbours[i*grid_size+j])

In [None]:
transition_list = [[None] * no_of_squares for _ in range (no_of_squares)]
for i in range (no_of_squares):
  for j in range (no_of_squares):
    transition_list[i][j] = 0
  if len(neighbours[i]) != 0:
    for j in neighbours[i]:
      transition_list[i][j] = 1 / len(neighbours[i])
  #print(transition_list[i])


In [None]:
initial_state = [1 / no_of_squares] * no_of_squares
transition_matrix = np.array(transition_list)
initial_state_matrix = np.array(initial_state)
initial_state_matrix = initial_state_matrix.transpose()
sensor_error_rate = 0.2

In [37]:
# Assume Sensor Error Rate = 0.2
model = Hidden_Markov_Model(transition_matrix, initial_state_matrix, obstacle, 0.2)
direction = {'N' : 0, 'S' : 1, 'W' : 2, 'E' : 3}
observation_sequence = ["1110", "1010"]
# Given the observation instance, find out the robot location
for observation in observation_sequence:
  observation_matrix = model.create_observation_matrix(observation)
  cur_state, probability = model.filtering(observation_matrix)
  print("Current state : ", end = "")
  print(cur_state)
  print("Coordinate : ", end= "")
  print(cur_state // grid_size, end = " ")
  print(cur_state % grid_size)
  print("Probability : ", end = "")
  print(probability)
  print("===============================================")

Current state : 42
Coordinate : 1 0
Probability : 0.0008467848637205608
Current state : 0
Coordinate : 0 0
Probability : 0.0017950714728825411


In [40]:
# given the observation sequence, what is the most likely state sequence
observation_sequence = ["1110", "1010", "1110", "1011", "1010", "0001", "1111", "0000"]
observation_list = []
for element in observation_sequence:
  num = 0
  for i in range(4):
    num = num * 2 + int(element[i])
  observation_list.append(num)
emission_matrix = model.get_emission_probabilities()
most_likely_state_sequence, accumulated_probability_matrix, backtracking_matrix = model.viterbi_algorithm(initial_state_matrix, emission_matrix, observation_list)
print(most_likely_state_sequence)
#print(accumulated_probability_matrix)
#print(backtracking_matrix)

[  0  31  73 115  73  31  73  31]
