In [1]:
import pandas as pd
import matplotlib.pyplot as plt  
import numpy as np
import csv
import sys
import random
from random import seed
from random import randint
from sklearn.metrics import accuracy_score
from bs4 import BeautifulSoup
from random import randrange
from scipy.special import expit
from scipy.special import softmax

np.set_printoptions(precision=10)
pd.option_context('display.max_rows', None, 'display.max_columns', None)  # more options can be specified also
np.set_printoptions(threshold=sys.maxsize)
seed(1)

np.set_printoptions(precision=10, )
pd.options.display.float_format = '{:.2f}'.format

%precision 10

!cp -r /content/drive/MyDrive/10601_DataSets/HW8 /content

In [2]:
def Print_DF(df):
  with pd.option_context('display.max_rows', None, 'display.max_columns', None):  # more options can be specified also
    print(df)

def read_csv_files(path):
  df_read = pd.read_csv(path, quoting=csv.QUOTE_NONE, encoding='utf-8', sep='\n', header=None, keep_default_na=False)

  # Expand all the elements of maze into multiple dataframe columns
  df_read = df_read[0].apply(lambda x: pd.Series(list(x)))

  print("Maze shape: ", df_read.shape)
  print("Maze is :")
  print(df_read)

  return df_read

In [3]:
# maze_description.values.tolist()
# # Reverse the row index in DF
# maze_df = maze_df.iloc[::-1]

In [4]:
Set_Of_Actions = {}
Set_Of_Actions['W'] = 0 
Set_Of_Actions['N'] =  1
Set_Of_Actions['E'] =  2
Set_Of_Actions['S'] =  3

In [5]:
def Get_Transition_Probability(maze_df, Set_Of_Actions):
  ''' 
  Get transition probability along with all state action pairs with all possible transitions
  
  Warning: Probability transition function isn't generic and it might not behave well.
  The results differ from the main solution; i still don't understant what it meant by saying: 
  "The transitions are deterministic, so if the agent chooses to move left, the next state will 
  be the grid to the left of the previous one.". I think this is the main issue in my understanding.

  However this function is just to demonstrate the ability to perform transitions. It's still ill
  defined though and needs adjustments. It's mainly for demonstration purposes.
  '''

  # Get height and width of the maze
  maze_height =  maze_df.shape[0]
  maze_width = maze_df.shape[1] 

  # Initiate Bellman's equation by intializaing 3 matrices: P, R, V
  # Transition probability matrix: P
  P = np.zeros((maze_height * maze_width, maze_height * maze_width))

  # Dict for valid and invalid state transitions for a single state
  transitions = {}

  # Dict for all coordinates that one can go to from the current state
  all_coordinates = {}
  
  # Flag to indicate that the next state is a terminate state
  terminate_flag = False

  # List of invalid states to remove later from P matrix
  invalid = []

  # Remeber the state action decisions made
  state_action = {}

  # Loop over all rows in the maze
  for x, row in maze_df.iterrows():

    # Loop over all columns in each row in the maze
    for y, value in enumerate(row):
      # For a single value, check to see if it's whether an obstacle or it's the end
      # of the maze or it's a state that we need to handle
      # Obstacles aren't states, skip them

      if value == '*' or value == 'G':
        row_index = x * maze_width + y
        invalid.append(row_index)
        continue
            
      # This is a state, let's get its transition probability
      # First get its coordinates in (x, y) tuple
      x_state, y_state = x, y
      # Get its North coordinate
      all_coordinates['N'] =  x - 1, y
      # Get its south coordinate
      all_coordinates['S'] = x + 1, y
      # Get its East coordinate
      all_coordinates['E'] = x, y + 1
      # Get its west coordinate
      all_coordinates['W'] =  x, y - 1

      for action_name in ['N', 'S', 'E', 'W']:
        x_tmp, y_tmp = all_coordinates[action_name]
        if x_tmp == -1 or y_tmp == -1 or x_tmp == maze_height or y_tmp == maze_width:
          state_action[((x_state, y_state), action_name)] = 'bounce'
          x_tmp, y_tmp = 0, 0
        else:
          transitions[action_name] = (x_tmp, y_tmp)
          state_action[((x_state, y_state), action_name)] = (x_tmp, y_tmp)
          
      available_directions = transitions.copy()
      
      # Now that we have all possible transitions, check to see if there is 
      # an obstacle in the next state or that we are already at the Goal
      for direction in transitions.keys():
        x_next, y_next = transitions[direction]      

        # DataFrames are accessed by columns first, so use y value to select
        # the column, the the x value to select the row to loc the element  
        if maze_df[y_next][x_next] == '*':
          available_directions.pop(direction)
          state_action[((x_state, y_state), direction)] = 'bounce'

        if maze_df[y_next][x_next] == 'G':
          available_directions.pop(direction)
          terminate_flag = True
          state_action[((x_state, y_state), direction)] = 'termination'
          break
      
      # Check to see if the current state has terminatation as next state
      if terminate_flag:
        terminate_flag = False
        continue

      # Empty directions means the next state is the goal state, in this case 
      # terminate the current state with P transistion to be all zeros.
      if not (available_directions):
        # The matrix is already initialized with zeros, so just continue
        continue

      # Now we have at least one direction, but multiple directions are valid too.
      # So, we need to prioritize the direction for us to move -> N, E, S, W
      prioriry_dirs = ['N', 'E', 'S', 'W']
      for dir in prioriry_dirs:
        location = available_directions.get(dir, -1)
        if location == -1:
          continue

        x_next, y_next = location
        row_index = x_state * maze_width + y_state
        col_index = x_next * maze_width + y_next
        print("action taken at state {} is {}".format((x_state, y_state), dir))
        P[row_index][col_index] = 1

        # That's the state we need, put 1 in the transition and just break the loop
        break

  P = np.delete(P, invalid, axis=0)
  P = np.delete(P, invalid, axis=1)
  return P, state_action

In [6]:
def value_iteration(Set_Of_Actions, epochs, Discount_factor, path):
  '''
  This is a planning method used when the model is  known. Under assumption 
  that we have access to: Set of states,  Set of Actions,
  Transition probabililty T(s, a, s'), Reward function R(s, a, s').

  This function implements the value iteration algorithm. Initialize V(s) = 0 for
  each state s that's not occupied by and obstacle.
  Don't model V(s) when s is occupied by an obstacle.

  This is "synchronous" version from value iteration
  '''
  
  maze_df = read_csv_files(path)
  
  maze_end = (0, maze_df.shape[1] - 1)
  maze_start = (maze_df.shape[0] - 1, 0)
  
  # Get transition probability along with all state action pairs with all possible transitions
  
  # Warning: Probability transition function isn't generic and it might not behave well.
  # The results differ from the main solution; i still don't understant what it meant by saying: 
  # "The transitions are deterministic, so if the agent chooses to move left, the next state will 
  # be the grid to the left of the previous one.". I think this is the main issue in my understanding.

  # However this function is just to demonstrate the ability to perform transitions. It's still ill
  # defined though and needs adjustments
  P, state_action_pair = Get_Transition_Probability(maze_df, Set_Of_Actions)

  R = np.ones((P.shape[0], 1)) * -1
  
  V = np.zeros((P.shape[0], 1))

  # Value iterations are merely applying bellman's equations iteratively
  for _ in range(epochs):
    V = R + Discount_factor * np.dot(P, V)

  # Get all possible states
  states = []
  for key in state_action_pair.keys():
    if key[0] not in states:
      states.append(key[0])

  # Get Q values for all state action pairs
  Q_values = {}
  for idx, state in enumerate(states):
    for action in Set_Of_Actions:
      SAPair = (state, action)
      action = state_action_pair.get(SAPair, -1)
      if action == 'bounce':
        Q_values[SAPair] = -1 + Discount_factor * V[idx]
      
      elif action == 'termination':
        Q_values[SAPair] = -1 + Discount_factor * 0
      
      else:
        Q_values[SAPair] = -1 + Discount_factor * np.squeeze(V[states.index(action)])

  # Get policy values using max Q value obtained at each state
  policy_state = {}
  for idx, state in enumerate(states):
    policy_state[state] = 0
    max_num = -float('inf')
    for action in Set_Of_Actions:
      SAPair = (state, action)
      qvalue = Q_values[SAPair]

      if qvalue > max_num:
        max_num = qvalue
        policy_state[state] = action

      if qvalue == max_num:
        tmp_state = policy_state[state]
        if Set_Of_Actions[tmp_state] >= Set_Of_Actions[action]:
          policy_state[state] = tmp_state
  
  print(state_action_pair)
  print("V values: \n", V)
  print("Q values: \n", Q_values)
  print("Policy values: \n", policy_state)

In [7]:
value_iteration(Set_Of_Actions, 1000, 0.9, 
                '/content/HW8/maze1.txt')

Maze shape:  (3, 5)
Maze is :
   0  1  2  3  4
0  *  *  .  *  G
1  .  .  .  *  .
2  S  *  .  .  .
action taken at state (0, 2) is S
action taken at state (1, 0) is E
action taken at state (1, 1) is E
action taken at state (1, 2) is N
action taken at state (2, 0) is N
action taken at state (2, 2) is N
action taken at state (2, 3) is E
action taken at state (2, 4) is N
{((0, 2), 'N'): 'bounce', ((0, 2), 'S'): (1, 2), ((0, 2), 'E'): 'bounce', ((0, 2), 'W'): 'bounce', ((1, 0), 'N'): 'bounce', ((1, 0), 'S'): (2, 0), ((1, 0), 'E'): (1, 1), ((1, 0), 'W'): 'bounce', ((1, 1), 'N'): 'bounce', ((1, 1), 'S'): 'bounce', ((1, 1), 'E'): (1, 2), ((1, 1), 'W'): (1, 0), ((1, 2), 'N'): (0, 2), ((1, 2), 'S'): (2, 2), ((1, 2), 'E'): 'bounce', ((1, 2), 'W'): (1, 1), ((1, 4), 'N'): 'termination', ((1, 4), 'S'): (2, 4), ((1, 4), 'E'): 'bounce', ((1, 4), 'W'): 'bounce', ((2, 0), 'N'): (1, 0), ((2, 0), 'S'): 'bounce', ((2, 0), 'E'): 'bounce', ((2, 0), 'W'): 'bounce', ((2, 2), 'N'): (1, 2), ((2, 2), 'S'): 'bounc

In [8]:
class Environment:
  def __init__(self, filename):
    '''
    The environment returns the following three values when the 
    agent takes an action a at state s:
    next state: the state of the agent after taking action a at state s
    reward: the reward received from taking action a at state s
    is terminal: an integer value indicating whether the agent reaches a terminal state after
    taking action a at state s. The value is 1 if terminal state is reached and 0 otherwise.
    '''
    maze = read_csv_files(filename)

    self.maze_df = maze

    self.maze_end = (0, maze.shape[1] - 1)
    self.maze_start = (maze.shape[0] - 1, 0)

    self.maze_height = maze.shape[0]
    self.maze_width = maze.shape[1]

    maze_list = self.maze_df.values.tolist()
    total_states = 1
    
    for l in maze_list:
      total_states += l.count('.')

    self.x, self.y = self.maze_start

    self.N_action = (-1, 0)
    self.S_action = (1, 0)
    self.E_action = (0, 1)
    self.W_action = (0, -1)

    # N, S, E, W actions
    self.List_of_actions = [1, 3, 2, 0]
    self.List_of_actions_names = ['N', 'S', 'E', 'W']
    self.List_f_actions_coordinates = [self.N_action, self.S_action, self.E_action, self.W_action]

  def step(self, a):
    '''
    This function takes in an action a, simulates a step, 
    sets the current state to the next state,
    and returns next state, reward, is terminal.
    '''
    index = 0
    for index in range(len(self.List_of_actions)):
      if a == self.List_of_actions[index]:
        break
    
    x_transit, y_transit = self.List_f_actions_coordinates[index]

    x_state = self.x + x_transit
    y_state = self.y + y_transit

    if 0 <= x_state < self.maze_height and 0 <= y_state < self.maze_width and self.maze_df[y_state][x_state] != "*":
      self.x = x_state
      self.y = y_state
    else:
      pass
    
    is_terminal = (self.x, self.y) == self.maze_end
    reward = -1

    return self.x, self.y, reward, is_terminal

  def reset(self):
    '''
    This function resets the agent state to the initial 
    state and returns the initial state.
    '''
    self.x, self.y = self.maze_start

In [9]:
def init_q_table(Q_table, env):
  maze_df = env.maze_df
  for x, row in maze_df.iterrows():
    for y, state in enumerate(row):
      # Start state and termination states have values in the table
      # Obstacles aren't states
      if state == '*':
        continue
      x_state, y_state = x, y
      Q_table[(x_state, y_state)] = [0 for a in env.List_of_actions_names]
  
def Q_learning_Algorithm(path, Q_table, number_of_epochs, max_length, alpha, gamma, eps):

  environment = Environment(path)

  # Initialization for Q table with zeros
  init_q_table(Q_table, environment)

  # for t = 0 to T
  for _ in range(number_of_epochs):
    # At the beginning of every new epoch, reset the environment
    # SelectState() -> s = start state
    environment.reset()
    
    # For each step of epoch t
    for _ in range(max_length):
      # Get the current state
      x_state, y_state = environment.x, environment.y

      # Expolit vs expolre choice. Decide based on Epsilon value (Eps-greedy algorithm)
      # With prob (1 - Eps) select a = max a' Q(s, a') -> exploit
      # With prob (Eps) select a = random action from Set of actions A -> explore
      p = np.random.random()
      
      # Explore
      if p <= eps:
        action = np.random.randint(0, 3)
      
      # Exploit
      else:
        action = np.argmax(Q_table[(x_state, y_state)])
      
      # Get reward and Next state
      new_x, new_y, r, terminated = environment.step(action)

      # Apply the equation: Q(s, a) <- (1 - alpha) * Q(s, a) + alpha * [r' + gamma * max a' Q(s', a') - Q(s, a)]
      
      # First get the max a' for Q(s', a')
      action_dash = np.argmax(Q_table[(new_x, new_y)])    
      
      # Note: This equation is for non-deterministic environment
      Q_table[(x_state, y_state)][action] = (1 - alpha) * Q_table[(x_state, y_state)][action] + alpha * (r + gamma * Q_table[(new_x, new_y)][action_dash])

      if terminated:
        break

  return Q_table 

In [10]:
# Dictionary of state-action pairs. It includes all actions for all states
# keys -> (x, y) coordenates of state, value -> list of actions
Q_table = {}
maze_file = '/content/HW8/maze1.txt'
Q_table = Q_learning_Algorithm(maze_file, Q_table, 2000, 20, 0.8, 0.9, 0.05)

Maze shape:  (3, 5)
Maze is :
   0  1  2  3  4
0  *  *  .  *  G
1  .  .  .  *  .
2  S  *  .  .  .


In [11]:
value = []
policy = []

for key in Q_table.keys():
  best_action = np.argmax(Q_table[key])
  value.append([key, Q_table[key][best_action]])
  policy.append([key, best_action])

print(value)
print(Q_table)
print(policy)

[[(0, 2), -4.68559], [(0, 4), 0], [(1, 0), -5.217031], [(1, 1), -4.68559], [(1, 2), -4.0951], [(1, 4), -1.0], [(2, 0), -5.6953279000000006], [(2, 2), -3.439], [(2, 3), -2.71], [(2, 4), -1.9]]
{(0, 2): [-5.205093495556215, -4.957066724822221, -5.147185001813629, -4.68559], (0, 4): [0, 0, 0, 0], (1, 0): [-5.6953279000000006, -5.6953279000000006, -5.217031, -5.472880287187667], (1, 1): [-5.6953279000000006, -5.217031, -4.68559, -4.867811268624385], (1, 2): [-5.217031, -5.217031, -4.68559, -4.0951], (1, 4): [-1.9, -1.0, -1.9, -1.51424], (2, 0): [-6.12579511, -5.6953279000000006, -6.12579511, -6.003626221142586], (2, 2): [-4.0951, -4.68559, -3.439, -3.9364499865600004], (2, 3): [-4.0951, -3.439, -2.71, -2.8360704000000005], (2, 4): [-3.439, -1.9, -2.71, -2.2131200000000004]}
[[(0, 2), 3], [(0, 4), 0], [(1, 0), 2], [(1, 1), 2], [(1, 2), 3], [(1, 4), 1], [(2, 0), 1], [(2, 2), 2], [(2, 3), 2], [(2, 4), 1]]
