In [1]:
import numpy as np
from colored import fore

In [2]:

# Shape of environment
env_rows = 7
env_columns = 7

# 3D numpy array to hold the Q-values for each state and action pair
# 7 x 7 for states and 4 layers for each possible action for each state
# with values of each (state, action) pair initialized to 0
q_values = np.zeros((env_rows, env_columns, 4))

# actions as 0 = up, 1 = right, 2 = down, 3 = left
actions = ['up', 'right', 'down', 'left']

# 2D numpy array to hold the rewards for each state. 
# Contains 7 rows and 7 columns (to match the shape of the environment), and each value is initialized to -100.
rewards = np.full((env_rows, env_columns), -100.)
grid = np.full((env_rows, env_columns), fore.DARK_GRAY+"X")

# Reward for the Final Goal F set to 100
rewards[3, 3] = 100 
grid[3, 3] = fore.GREEN+"F"


In [3]:
# define White locations for rows 0 to 6
W = {} # store locations in a dictionary
W[0] = [0, 1, 2, 4, 5, 6]
W[1] = [0, 6]
W[2] = [i for i in range(1,7)]
W[3] = [0, 1, 5]
W[4] = [0, 1, 4, 5, 6]
W[5] = [0, 2, 3, 6]
W[6] = [1, 2, 3, 4, 6]

#set the rewards for all White locations 
for row_index in range(0, 7):
  for column_index in W[row_index]:
    rewards[row_index, column_index] = -4
    grid[row_index, column_index] = fore.LIGHT_GRAY+"W"

In [4]:
# define Grey locations for rows 0 to 6
X = {} # store locations in a dictionary
X[0] = [3]
X[1] = [1, 5]
X[3] = [2, 4, 6]
X[4] = [3]
X[5] = [1]
X[6] = [5]

In [5]:
# define Blue locations for rows 0 to 6
N = {} # store locations in a dictionary
N[0] = []
N[1] = [2, 3, 4]
N[2] = []
N[3] = []
N[4] = [2]
N[5] = [4]
N[6] = [0]

#set the rewards for all Blue locations 
for row_index in range(0, 7):
  for column_index in N[row_index]:
    rewards[row_index, column_index] = -2
    grid[row_index, column_index] = fore.BLUE+"N"

In [6]:
# define Orange locations for rows 0 to 6
S = {} # store locations in a dictionary
S[0] = []
S[1] = []
S[2] = [0]
S[3] = []
S[4] = []
S[5] = [5]
S[6] = []

#set the rewards for all Orange locations 
for row_index in range(0, 7):
  for column_index in S[row_index]:
    rewards[row_index, column_index] = -6
    grid[row_index, column_index] = fore.RED+"S"

In [7]:
# print rewards matrix
for row in rewards:
  print(row)

[  -4.   -4.   -4. -100.   -4.   -4.   -4.]
[  -4. -100.   -2.   -2.   -2. -100.   -4.]
[-6. -4. -4. -4. -4. -4. -4.]
[  -4.   -4. -100.  100. -100.   -4. -100.]
[  -4.   -4.   -2. -100.   -4.   -4.   -4.]
[  -4. -100.   -4.   -4.   -2.   -6.   -4.]
[  -2.   -4.   -4.   -4.   -4. -100.   -4.]


In [8]:
for row in grid:
    for column in row:
        print(column, end=" ")
    print()
print("______________")
    
grid_init = np.copy(grid) # to copy the default grid and use it to rnder the updated location each time

for row in grid_init:
    for column in row:
        print(column, end=" ")
    print()

[38;5;7mW [38;5;7mW [38;5;7mW [38;5;8mX [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;8mX [38;5;4mN [38;5;4mN [38;5;4mN [38;5;8mX [38;5;7mW 
[38;5;1mS [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;7mW [38;5;8mX [38;5;2mF [38;5;8mX [38;5;7mW [38;5;8mX 
[38;5;7mW [38;5;7mW [38;5;4mN [38;5;8mX [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;8mX [38;5;7mW [38;5;7mW [38;5;4mN [38;5;1mS [38;5;7mW 
[38;5;4mN [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;8mX [38;5;7mW 
______________
[38;5;7mW [38;5;7mW [38;5;7mW [38;5;8mX [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;8mX [38;5;4mN [38;5;4mN [38;5;4mN [38;5;8mX [38;5;7mW 
[38;5;1mS [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;7mW [38;5;8mX [38;5;2mF [38;5;8mX [38;5;7mW [38;5;8mX 
[38;5;7mW [38;5;7mW [38;5;4mN [38;5;8mX [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;8mX [38;5;7mW [38;5;7mW [38;

In [9]:
# function to determine if the specified location is a terminal state
def is_terminal_state(current_row_index, current_column_index):
  #if the reward for this location is -1, then it is not a terminal state (i.e., it is a 'white square')
  if rewards[current_row_index, current_column_index] == -2 or rewards[current_row_index, current_column_index] == -4 or rewards[current_row_index, current_column_index] == -6:
    return False
  else:
    return True
  
# function to choose a random, non-terminal starting location
def get_starting_location():
  #get a random row and column index
  current_row_index = np.random.randint(env_rows)
  current_column_index = np.random.randint(env_columns)
  #continue choosing random row and column indexes until a non-terminal state is identified
  while is_terminal_state(current_row_index, current_column_index) or check_location(current_row_index, current_column_index) == 'S' or check_location(current_row_index, current_column_index) == 'N':
    current_row_index = np.random.randint(env_rows)
    current_column_index = np.random.randint(env_columns)
  return current_row_index, current_column_index

# epsilon greedy algorithm to choose which action to take next (i.e., where to move next)
def get_intended_action(current_row_index, current_column_index, epsilon):
  #if a randomly chosen value between 0 and 1 is less than epsilon, 
  #then choose the most promising value from the Q-table for this state.
  if np.random.random() < epsilon:
    return np.argmax(q_values[current_row_index, current_column_index])
  else: #choose a random action
    return np.random.randint(4)
  
# function to identify the location type
def check_location(current_row_index, current_column_index):
  return grid[current_row_index, current_column_index][-1]
    
# function to get the next location based on the chosen action and initial location
def get_next_location_ideally(current_row_index, current_column_index, action_index):
  new_row_index = current_row_index
  new_column_index = current_column_index  
  # Acting as per intended action
  if actions[action_index] == 'up' and current_row_index > 0:
    new_row_index -= 1
  elif actions[action_index] == 'right' and current_column_index < env_columns - 1:
    new_column_index += 1
  elif actions[action_index] == 'down' and current_row_index < env_rows - 1:
    new_row_index += 1
  elif actions[action_index] == 'left' and current_column_index > 0:
    new_column_index -= 1
  return new_row_index, new_column_index

# function to get the next location based on the chosen action and initial location
def get_next_location_slip(current_row_index, current_column_index, action_index):
  new_row_index = current_row_index
  new_column_index = current_column_index
  if actions[action_index] == 'up' and current_row_index > 0:
    # Finding the action to act depending on the intended direction and the location
    if check_location(current_row_index, current_column_index) == "W":
      action_index = np.random.choice([0,1,2,3], p=[0.8, 0.1, 0, 0.1])
    if check_location(current_row_index, current_column_index) == "N":
      action_index = np.random.choice([0,1,2,3], p=[1, 0, 0, 0])
    if check_location(current_row_index, current_column_index) == "S":
      action_index = np.random.choice([0,1,2,3], p=[0.5, 0.25, 0, 0.25])
    
  elif actions[action_index] == 'right' and current_column_index < env_columns - 1:
    # Finding the action to act depending on the intended direction and the location
    if check_location(current_row_index, current_column_index) == "W":
      action_index = np.random.choice([0,1,2,3], p=[0.1, 0.8, 0.1, 0])
    if check_location(current_row_index, current_column_index) == "N":        
      action_index = np.random.choice([0,1,2,3], p=[0, 1, 0, 0])
    if check_location(current_row_index, current_column_index) == "S":
      action_index = np.random.choice([0,1,2,3], p=[0.25, 0.5, 0.25, 0])

  elif actions[action_index] == 'down' and current_row_index < env_rows - 1:
    # Finding the action to act depending on the intended direction and the location
    if check_location(current_row_index, current_column_index) == "W":
      action_index = np.random.choice([0,1,2,3], p=[0, 0.1, 0.8, 0.1])
    if check_location(current_row_index, current_column_index) == "N":
      action_index = np.random.choice([0,1,2,3], p=[0, 0, 1, 0])
    if check_location(current_row_index, current_column_index) == "S":
      action_index = np.random.choice([0,1,2,3], p=[0, 0.25, 0.5, 0.25])

  elif actions[action_index] == 'left' and current_column_index > 0:
    # Finding the action to act depending on the intended direction and the location
    if check_location(current_row_index, current_column_index) == "W":
      action_index = np.random.choice([0,1,2,3], p=[0.1, 0, 0.1, 0.8])  
    if check_location(current_row_index, current_column_index) == "N":
      action_index = np.random.choice([0,1,2,3], p=[0, 0, 0, 1])
    if check_location(current_row_index, current_column_index) == "S":
      action_index = np.random.choice([0,1,2,3], p=[0.25, 0, 0.25, 0.5])
  
  # Acting as per Probability
  if actions[action_index] == 'up' and current_row_index > 0:
    new_row_index -= 1
  elif actions[action_index] == 'right' and current_column_index < env_columns - 1:
    new_column_index += 1
  elif actions[action_index] == 'down' and current_row_index < env_rows - 1:
    new_row_index += 1
  elif actions[action_index] == 'left' and current_column_index > 0:
    new_column_index -= 1
  return new_row_index, new_column_index

def environment(state, action) :
  # state holds current_row_index, current_column_index
  next_state = {}
  next_state = get_next_location_slip(state, actions.index(action))
  acquired_reward = rewards[state[0],state[1]]
  return next_state, acquired_reward

def render(state) :
  global grid
  row = state[0]
  column = state[1]
  grid[row,column] = fore.MAGENTA + grid_init[row, column][-1] 
  for row in grid:
    for column in row:
        print(column, end=" ")
    print()
  grid = np.copy(grid_init)
  #grid = np.copy(grid_init)
  print("______________")
  


In [10]:
#define training parameters
epsilon = 0.9 #the percentage of time when we should take the best action (instead of a random action)
discount_factor = 0.9 #discount factor for future rewards
learning_rate = 0.9 #the rate at which the AI agent should learn
alpha = 0

#run through 10000 training episodes
for episode in range(10000):
  #get the starting location for this episode
  row_index, column_index = get_starting_location()

  #continue taking actions (i.e., moving) until we reach a terminal state
  while not is_terminal_state(row_index, column_index):
    #choose which action to take (i.e., where to move next)
    action_index = get_intended_action(row_index, column_index, epsilon)

    #perform the chosen action, and transition to the next state (i.e., move to the next location)
    old_row_index, old_column_index = row_index, column_index #store the old row and column indexes
    if np.random.random() < alpha:
      row_index, column_index = get_next_location_slip(row_index, column_index, action_index)
    else:
      row_index, column_index = get_next_location_ideally(row_index, column_index, action_index)
    
    #receive the reward for moving to the new state, and calculate the temporal difference
    reward = rewards[row_index, column_index]
    old_q_value = q_values[old_row_index, old_column_index, action_index]
    temporal_difference = reward + (discount_factor * np.max(q_values[row_index, column_index])) - old_q_value

    #update the Q-value for the previous state and action pair
    new_q_value = old_q_value + (learning_rate * temporal_difference)
    q_values[old_row_index, old_column_index, action_index] = new_q_value

print('Training complete!')

Training complete!


In [11]:
#Define a function that will get the shortest path between any location that 
#the robot is allowed to travel and F.
def get_shortest_path(start_row_index, start_column_index):
  #return immediately if this is an invalid starting location
  if is_terminal_state(start_row_index, start_column_index) or check_location(start_row_index, start_column_index) == 'S' or check_location(start_row_index, start_column_index) == 'N':
    return []
  else: #if this is a 'legal' starting location
    current_row_index, current_column_index = start_row_index, start_column_index
    shortest_path = []
    shortest_path.append([current_row_index, current_column_index])
    #continue moving along the path until we reach the goal 
    while not is_terminal_state(current_row_index, current_column_index):
        #get the best action to take
        action_index = get_intended_action(current_row_index, current_column_index, 1.)
        state = [current_row_index, current_column_index]
        render(state)
        #move to the next location on the path, and add the new location to the list
        current_row_index, current_column_index = get_next_location_slip(current_row_index, current_column_index, action_index)
        shortest_path.append([current_row_index, current_column_index])
    return shortest_path

In [12]:
# Intended actions as per max Q-values
row_i = 0
column_i = 0
for row in q_values:
    for column in row:
        if row_i == 3 and column_i == 3:
            print("F", end="      ")
        elif column[0] == column[1] and column[0] == column[2] and column[0] == column[3]:
            print("X", end="       ")
        elif actions[column.argmax()] ==  'up':
            print(actions[column.argmax()], end="      ")
        elif actions[column.argmax()] == 'down' or actions[column.argmax()] == 'left':
            print(actions[column.argmax()], end="    ")
        else:
            print(actions[column.argmax()], end="   ")
        column_i += 1
    row_i += 1
    column_i = 0
    print()

right   right   down    X       down    left    left    
down    X       right   down    left    X       down    
right   right   right   down    left    left    left    
right   up      X       F      X       up      X       
up      up      left    X       right   up      left    
up      X       up      right   up      up      up      
up      left    up      up      up      X       up      


In [13]:
print(get_shortest_path(1,0))

[38;5;7mW [38;5;7mW [38;5;7mW [38;5;8mX [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;5mW [38;5;8mX [38;5;4mN [38;5;4mN [38;5;4mN [38;5;8mX [38;5;7mW 
[38;5;1mS [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;7mW [38;5;8mX [38;5;2mF [38;5;8mX [38;5;7mW [38;5;8mX 
[38;5;7mW [38;5;7mW [38;5;4mN [38;5;8mX [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;8mX [38;5;7mW [38;5;7mW [38;5;4mN [38;5;1mS [38;5;7mW 
[38;5;4mN [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;8mX [38;5;7mW 
______________
[38;5;7mW [38;5;7mW [38;5;7mW [38;5;8mX [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;8mX [38;5;4mN [38;5;4mN [38;5;4mN [38;5;8mX [38;5;7mW 
[38;5;5mS [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;7mW [38;5;8mX [38;5;2mF [38;5;8mX [38;5;7mW [38;5;8mX 
[38;5;7mW [38;5;7mW [38;5;4mN [38;5;8mX [38;5;7mW [38;5;7mW [38;5;7mW 
[38;5;7mW [38;5;8mX [38;5;7mW [38;5;7mW [38;