In [1]:
#import libraries
import numpy as np
from matplotlib import pyplot as plt
import csv
import pandas as pd

In [2]:
environment_rows = 4
environment_columns = 6
q_values = np.zeros((environment_rows, environment_columns, 4))

In [3]:
#numeric action codes: 0 = up, 1 = right, 2 = down, 3 = left
actions = ['up', 'right', 'down', 'left']

In [4]:
rewards = np.full((environment_rows, environment_columns), 0.)
rewards[1, 5] = 1. 

rewards[3,4] = -1 #trap position
rewards[3,5] = -1

for row in rewards:
    print(row)

[0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 1.]
[0. 0. 0. 0. 0. 0.]
[ 0.  0.  0.  0. -1. -1.]


In [5]:
#define a function that determines if the specified location is a terminal state
def is_terminal_state(current_row_index, current_column_index):
  #if the reward for this location is -1, then it is not a terminal state (i.e., it is a 'white square')
  if rewards[current_row_index, current_column_index] == 0:
    return False
  else:
    return True

In [6]:
#define a function that will choose a random, non-terminal starting location
def get_starting_location():
  #get a random row and column index
  current_row_index = np.random.randint(2)
  if current_row_index == 0:
    current_column_index = 0
  elif current_row_index == 1:
      current_column_index = 2
  return current_row_index, current_column_index

In [7]:
#define an epsilon greedy algorithm that will choose which action to take next (i.e., where to move next)
def get_next_action(current_row_index, current_column_index, epsilon):
  if np.random.random() < epsilon:
    return np.argmax(q_values[current_row_index, current_column_index])
  else: #choose a random action
    return np.random.randint(4)

In [8]:
#define a function that will get the next location based on the chosen action
def get_next_location(current_row_index, current_column_index, action_index):
  new_row_index = current_row_index
  new_column_index = current_column_index
  if actions[action_index] == 'up' and current_row_index > 0:
    new_row_index -= 1
  elif actions[action_index] == 'right' and current_column_index < environment_columns - 1:
    new_column_index += 1
  elif actions[action_index] == 'down' and current_row_index < environment_rows - 1:
    new_row_index += 1
  elif actions[action_index] == 'left' and current_column_index > 0:
    new_column_index -= 1
  return new_row_index, new_column_index

In [9]:
def get_shortest_path(current_row_index, current_column_index):
    shortest_path = []
    steps = 0
    shortest_path.append([current_row_index, current_column_index])
    reward = 1
    while not is_terminal_state(current_row_index, current_column_index) or steps < 20:
      if (current_row_index == 1 and current_column_index ==5):
        reward += 1
      action_index = get_next_action(current_row_index, current_column_index, 1.)
      current_row_index, current_column_index = get_next_location(current_row_index, current_column_index, action_index)
      shortest_path.append([current_row_index, current_column_index])
      steps += 1
    return shortest_path, steps, reward

In [11]:
epsilon = 0.9 
discount_factor = 0.9 #discount factor for future rewards
learning_rate = 0.9 #the rate at which the AI agent should learn
reward_together = []
#run through 1000 training episodes
for episode in range(1000):
  row_index, column_index = get_starting_location()
  while not is_terminal_state(row_index, column_index):
    action_index = get_next_action(row_index, column_index, epsilon)
    old_row_index, old_column_index = row_index, column_index #store the old row and column indexes
    row_index, column_index = get_next_location(row_index, column_index, action_index)
    
    #receive the reward for moving to the new state, and calculate the temporal difference
    reward = rewards[row_index, column_index]
    reward_together.append(reward)
    
    #print("the reward is: ", reward)
    old_q_value = q_values[old_row_index, old_column_index, action_index]
    temporal_difference = reward + (discount_factor * np.max(q_values[row_index, column_index])) - old_q_value

    #update the Q-value for the previous state and action pair
    new_q_value = old_q_value + (learning_rate * temporal_difference)
    q_values[old_row_index, old_column_index, action_index] = new_q_value


with open('reward.csv', 'w', newline='') as f:
  header = ['x','y']
  writer = csv.writer(f)
  writer.writerow(header)
  for i in range(len(reward_together)):
    writer.writerow([i,reward_together[i]])
    
#print(len(reward_together)) 


#df  = pd.read_csv("reward.csv")
#df.plot(kind='scatter',x='x',y='y') # scatter plot


print('Q Learning complete!')

Q Learning complete!


In [16]:
test_r, test_c = get_starting_location()
print("Starting Point: ", test_r,test_c)

Starting Point:  1 2


In [17]:
result = get_shortest_path(test_r,test_c) #starting at row 5, column 0
print("Total Steps: ", result[1])
print("Max Rewards: ", result[2])
print("Route: ", result[0])

Total Steps:  21
Max Rewards:  9
Route:  [[1, 2], [0, 2], [0, 3], [0, 4], [0, 5], [1, 5], [0, 5], [1, 5], [0, 5], [1, 5], [0, 5], [1, 5], [0, 5], [1, 5], [0, 5], [1, 5], [0, 5], [1, 5], [0, 5], [1, 5], [0, 5], [1, 5]]
