https://colab.research.google.com/drive/1E2RViy7xmor0mhqskZV14_NUj2jMpJz3#scrollTo=DnCfO5tVG0LJ

In [1]:
import numpy as np
np.random.seed(42)
import copy

<b>Environment Grid, Grid of 11 rows X 11 columns</b>

In [2]:
grid_rows = 11
grid_cols = 11

<b>Q-value for each state/action pair Q(s,a)
2D array give the Q value, 3d dimension is the Q value for the action
there are 4 actions: Up, Down, Right, Left
So there are 4 layers
</b>

In [3]:
q_values = np.zeros((grid_rows, grid_cols, 4))

<b>Actions: 0 - Up, 1 - Right, 2 - Down, 3 - Left</b>

In [4]:
actions = ['up', 'right', 'down', 'left']

<b>Reward table : define the rewards table
100 - max reward - goal
-100 - max punishment - blocked cells
-1 - allowed cells for travelling
Purpose of Q learning algorithm is to maximise the rewards. 
If we keep positive rewards then bot will keep moving unnecessarly to collect the rewards.
-ve rewards keep control on movements</b>

<b>TODO: Change the grid rewards</b>

In [5]:
rewards = np.full((grid_rows, grid_cols), -100.)
rewards[0, 5] = 100. #set the reward for the packaging area (i.e., the goal) to 100

#define aisle locations (i.e., white squares) for rows 1 through 9
aisles = {} #store locations in a dictionary
aisles[1] = [i for i in range(1, 10)]
aisles[2] = [1, 7, 9]
aisles[3] = [i for i in range(1, 8)]
aisles[3].append(9)
aisles[4] = [3, 7]
aisles[5] = [i for i in range(11)]
aisles[6] = [5]
aisles[7] = [i for i in range(1, 10)]
aisles[8] = [3, 7]
aisles[8] = [i for i in range(11)]

aisles[9] = [i for i in range(11)]

aisles[10] = [i for i in range(11)]

#set the rewards for all aisle locations (i.e., white squares)
for row_index in range(1, 11):
    for column_index in aisles[row_index]:
        rewards[row_index, column_index] = -1.

for row in rewards:
    print(row)

[-100. -100. -100. -100. -100.  100. -100. -100. -100. -100. -100.]
[-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.]
[-100.   -1. -100. -100. -100. -100. -100.   -1. -100.   -1. -100.]
[-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.   -1. -100.]
[-100. -100. -100.   -1. -100. -100. -100.   -1. -100. -100. -100.]
[-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]
[-100. -100. -100. -100. -100.   -1. -100. -100. -100. -100. -100.]
[-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.]
[-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]
[-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]
[-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]


<b><pre>
TRAINING
1. To begin, choose random, non-terminal state
2. Choose action, move7- up/right/down/left from current state
3. Choose action using epsilon greedy method - balanced approach of explore and exploit
4. Perform the action and move to next state
5. Receive rewards and compute temporal difference
6. Update Q-value for previos state/action pair
7. if new state is terminal state, go to step 1, else go to step 2

Run the process for 1000 episodes
</pre></b>

In [6]:
def is_terminal_state(row_index, col_index):
    if rewards[row_index, col_index] == -1. :
        return False
    return True

<b>Choose start location</b>

In [7]:
def get_starting_location():
    row_indx = np.random.randint(grid_rows)
    col_indx = np.random.randint(grid_cols)
    
    #continue choosing random row and column indexes until a non-terminal state is identified
    #(i.e., until the chosen state is a 'white square').
    while is_terminal_state(row_indx, col_indx):
        row_indx = np.random.randint(grid_rows)
        col_indx = np.random.randint(grid_cols)

    return row_indx, col_indx

<b>next_action - epsilon greedy algorithm</b>

In [8]:
def get_next_action(row_indx, col_indx, epsilon):
    if np.random.random() < epsilon:
        return np.argmax(q_values[row_indx, col_indx])
    return np.random.randint(4)

In [9]:
def get_next_location(row_indx, col_indx, action_indx):
    new_row_indx = row_indx
    new_col_indx = col_indx
    
    if actions[action_indx] == 'up' and row_indx > 0:
        new_row_indx -= 1
    elif actions[action_indx] == 'right' and col_indx < grid_cols - 1:
        new_col_indx += 1
    elif actions[action_indx] == 'down' and row_indx < grid_rows - 1:
        new_row_indx += 1
    elif actions[action_indx] == 'left' and col_indx > 0:
        new_col_indx -= 1
        
    return new_row_indx, new_col_indx

<b>get_shortest_path - using q table, find shortest path</b>

In [10]:
def get_shortest_path(start_row_indx, start_col_indx):
    
    # terminal starting position
    if is_terminal_state(start_row_indx, start_col_indx):
        return []
    else :
        #non-terminal starting position
        curr_row_indx, curr_col_indx = start_row_indx, start_col_indx
        shortest_path = []
        shortest_path.append([curr_row_indx, curr_col_indx])
        
        while not is_terminal_state(curr_row_indx, curr_col_indx):
            action_indx = get_next_action(curr_row_indx, curr_col_indx, 1.)
            
            curr_row_indx, curr_col_indx = get_next_location(curr_row_indx, curr_col_indx, action_indx)
            shortest_path.append([curr_row_indx, curr_col_indx])
        return shortest_path

<b>Train the Agent</b>

In [11]:
epsilon = 0.9
discount_factor = 0.9 #discount for future rewards
learning_rate = 0.9

episods_count = 10000

In [12]:
for episode in range(episods_count):
    row_indx, col_indx = get_starting_location()
    
    while not is_terminal_state(row_indx, col_indx):
        action_indx = get_next_action(row_indx, col_indx, epsilon)
        
        old_row_indx, old_col_indx = row_indx, col_indx
        row_indx, col_indx = get_next_location(row_indx, col_indx, action_indx)
        
        #rewards
        reward = rewards[row_indx, col_indx]
        old_q_value = q_values[row_indx, col_indx, action_indx]
        temporal_difference = reward + (discount_factor + np.max(q_values[row_indx, col_indx]) - old_q_value)
        
        #update q table
        new_q_value = old_q_value + (learning_rate * temporal_difference)
        q_values[old_row_indx, old_col_indx, action_indx] = new_q_value
        
print("Training Complete...")        

Training Complete...


In [13]:
def print_path(path):
    new_path = copy.copy(rewards)
    
    for i in range(len(path)):
        new_path[path[i][0]][path[i][1]] = '9'
    print(new_path)

In [14]:
path = get_shortest_path(3, 9)
print_path(path)

[[-100. -100. -100. -100. -100.    9. -100. -100. -100. -100. -100.]
 [-100.   -1.   -1.   -1.   -1.    9.    9.    9.    9.    9. -100.]
 [-100.   -1. -100. -100. -100. -100. -100.   -1. -100.    9. -100.]
 [-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.    9. -100.]
 [-100. -100. -100.   -1. -100. -100. -100.   -1. -100. -100. -100.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]
 [-100. -100. -100. -100. -100.   -1. -100. -100. -100. -100. -100.]
 [-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]]


In [15]:
path = get_shortest_path(9, 6)
print_path(path)

[[-100. -100. -100. -100. -100.    9. -100. -100. -100. -100. -100.]
 [-100.   -1.   -1.   -1.   -1.    9.    9.    9.   -1.   -1. -100.]
 [-100.   -1. -100. -100. -100. -100. -100.    9. -100.   -1. -100.]
 [-100.   -1.   -1.   -1.   -1.   -1.   -1.    9. -100.   -1. -100.]
 [-100. -100. -100.   -1. -100. -100. -100.    9. -100. -100. -100.]
 [  -1.   -1.   -1.   -1.   -1.    9.    9.    9.   -1.   -1.   -1.]
 [-100. -100. -100. -100. -100.    9. -100. -100. -100. -100. -100.]
 [-100.   -1.   -1.   -1.   -1.    9.   -1.   -1.   -1.   -1. -100.]
 [  -1.   -1.   -1.   -1.   -1.    9.   -1.   -1.   -1.   -1.   -1.]
 [  -1.   -1.   -1.   -1.   -1.    9.    9.   -1.   -1.   -1.   -1.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]]


In [16]:
path = get_shortest_path(3, 1)
print_path(path)

[[-100. -100. -100. -100. -100.    9. -100. -100. -100. -100. -100.]
 [-100.    9.    9.    9.    9.    9.   -1.   -1.   -1.   -1. -100.]
 [-100.    9. -100. -100. -100. -100. -100.   -1. -100.   -1. -100.]
 [-100.    9.   -1.   -1.   -1.   -1.   -1.   -1. -100.   -1. -100.]
 [-100. -100. -100.   -1. -100. -100. -100.   -1. -100. -100. -100.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]
 [-100. -100. -100. -100. -100.   -1. -100. -100. -100. -100. -100.]
 [-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]]


<b>Test: Get shortest path</b>

In [17]:
print(get_shortest_path(3, 9))
print(get_shortest_path(5, 0))

[[3, 9], [2, 9], [1, 9], [1, 8], [1, 7], [1, 6], [1, 5], [0, 5]]
[[5, 0], [5, 1], [5, 2], [5, 3], [5, 4], [5, 5], [5, 6], [5, 7], [4, 7], [3, 7], [2, 7], [1, 7], [1, 6], [1, 5], [0, 5]]


<b>Reverse path</b>

In [18]:
path = get_shortest_path(5, 2)
path.reverse()
print(path)
print(rewards)

[[0, 5], [1, 5], [1, 6], [1, 7], [2, 7], [3, 7], [4, 7], [5, 7], [5, 6], [5, 5], [5, 4], [5, 3], [5, 2]]
[[-100. -100. -100. -100. -100.  100. -100. -100. -100. -100. -100.]
 [-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.]
 [-100.   -1. -100. -100. -100. -100. -100.   -1. -100.   -1. -100.]
 [-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.   -1. -100.]
 [-100. -100. -100.   -1. -100. -100. -100.   -1. -100. -100. -100.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]
 [-100. -100. -100. -100. -100.   -1. -100. -100. -100. -100. -100.]
 [-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]
 [  -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.]]
