# Q-Learning Model

## Part 1

Imagine a robot navigating the maze shown below:

![maze](../references/maze.png)

It starts at point <b>E</b> (Entrance) and wants to get to point <b>G</b> (Goal). We are going to build an AI that can find the quickest way to the Goal.

In [1]:
# import required libraries
import numpy as np

In [2]:
# set randomization for reproducing results
np.random.seed(42)

Now, we are going to define the environment with the following locations and states shown below:

![states](../references/states.png)

We know that the robot can perform specific actions for each location it is in at time t. This can be represented by an adjacency matrix, where +1 indicates a reward for an action it can perform and 0 indicates a reward for an action it cannot perform.

However, to incentivize our AI to achieve its goal (going to G), we need to place a <b>higher additional reward</b> at location (G,G) in our adjacency matrix.

In [3]:
# define the possible actions
actions = np.array(range(0,12))

# define the possible states
states = {'A': 0, 'B': 1, 'C': 2, 'D': 3, 'E': 4, 'F': 5, 'G': 6, 'H': 7, 'I': 8, 'J': 9, 'K': 10, 'L': 11}

# make a mapping from states to locations
locations = {state: location for location, state in states.items()}

# define the rewards matrix
#                    A,B,C,D,E,F,G,H,I,J,K,L
rewards = np.array([[0,1,0,0,0,0,0,0,0,0,0,0],
                    [1,0,1,0,0,1,0,0,0,0,0,0],
                    [0,1,0,0,0,0,1,0,0,0,0,0],
                    [0,0,0,0,0,0,0,1,0,0,0,0],
                    [0,0,0,0,0,0,0,0,1,0,0,0],
                    [0,1,0,0,0,0,0,0,0,1,0,0],
                    [0,0,1,0,0,0,0,1,0,0,0,0],
                    [0,0,0,1,0,0,1,0,0,0,0,1],
                    [0,0,0,0,1,0,0,0,0,1,0,0],
                    [0,0,0,0,0,1,0,0,1,0,1,0],
                    [0,0,0,0,0,0,0,0,0,1,0,1],
                    [0,0,0,0,0,0,0,1,0,0,1,0]])

In [4]:
# set the gamma and alpha parameters for Q-Learning
γ = 0.75
α = 0.90

In [5]:
# number of iterations
N = 1_000

# initialize the Q-values
Q = np.zeros((12,12))

In [6]:
# give the goal location a high reward
goal = 'G'
ix = states[goal]
rewards[ix, ix] = 1000

In [7]:
# implement the Q-Learning process
options = list(states.values())
for i in range(N):
    # pick random current state
    current_state = np.random.choice(options)

    # get available moves based on current state
    playable_actions = []
    for j in range(rewards.shape[1]):
        if rewards[current_state,j] > 0:
            playable_actions.append(j) # add to list of options

    # pick a move
    next_state = np.random.choice(playable_actions)

    # compute the temporal difference
    TD = rewards[current_state, next_state] + γ * Q[next_state, np.argmax(Q[next_state,])] - Q[current_state, next_state]

    # update the Q-value
    Q[current_state, next_state] = Q[current_state, next_state] + α * TD

In [8]:
# display the final Q-values
print('Q-values:\n{}'.format(Q.astype(int)))

Q-values:
[[   0 1684    0    0    0    0    0    0    0    0    0    0]
 [1259    0 2245    0    0 1259    0    0    0    0    0    0]
 [   0 1683    0    0    0    0 2992    0    0    0    0    0]
 [   0    0    0    0    0    0    0 2245    0    0    0    0]
 [   0    0    0    0    0    0    0    0  711    0    0    0]
 [   0 1678    0    0    0    0    0    0    0  947    0    0]
 [   0    0 2244    0    0    0 3988 2244    0    0    0    0]
 [   0    0    0 1684    0    0 2992    0    0    0    0 1684]
 [   0    0    0    0  534    0    0    0    0  947    0    0]
 [   0    0    0    0    0 1259    0    0  709    0 1262    0]
 [   0    0    0    0    0    0    0    0    0  947    0 1681]
 [   0    0    0    0    0    0    0 2245    0    0 1261    0]]


In [9]:
# function to return optimally computed route
def route(starting_location, ending_location):
    # create a list for the optimal route
    route = [starting_location] # add the starting point to the route
    
    # set the next location to the current location
    next_location = starting_location
    
    # keep running until we reach the end location
    while (next_location != ending_location):
        # get the value associated with the current location
        starting_state = states[starting_location]
        
        # use that value to get the best Q-value index
        next_state = np.argmax(Q[starting_state,])
        
        # index that Q-value on our dictionary of locations
        next_location = locations[next_state]
        
        # add the location to our list of steps
        route.append(next_location)
        
        # update the current state for next iteration
        starting_location = next_location
        
    return route

In [10]:
# print the results
print('Optimal route:\n\n{}'.format(route('E', goal)))

Optimal route:

['E', 'I', 'J', 'K', 'L', 'H', 'G']


## Part 2

Let's try to solve a harder problem. Imagine a warehouse maze as shown below:

![warehouse](../references/warehouse.png)

Here, black squares are shelves. The robot can run into a shelf, so the goal is to navigate the aisles and get to the green square. This time, we will not be restricting the robot's possible actions. Instead we will teach it to learn not to run into shelves.

In [11]:
# define the environment
environment_rows = 11
environment_cols = 11

# initialize the Q-values
Q = np.zeros((environment_rows, environment_cols, 4)) # [state, state, action]

In [12]:
# define actions
actions = ['up', 'down', 'left', 'right']

Now let's define the rewards. For this example, we want to get to the goal. If we made all white squares +1 and all black squares 0 or -1, the AI agent <b>could</b> maximize its reward by moving along the white squares forever before reaching its destination. Instead we want to incentivize our AI agent to move efficiently. We can do this with the rewards matrix defined below:

![rewards](../references/warehouse_rewards.png)

In [13]:
# define the rewards
rewards = np.full((environment_rows, environment_cols), -100.0)
rewards[0, 5] = 100.0 # goal location

# define aisle locations
aisles = {}
aisles[1] = [i for i in range(1, 10)]
aisles[2] = [1, 7, 9]
aisles[3] = [i for i in range(1, 8)]
aisles[3].append(9)
aisles[4] = [3, 7]
aisles[5] = [i for i in range(0, 11)]
aisles[6] = [5]
aisles[7] = [i for i in range(1, 10)]
aisles[8] = [3, 7]
aisles[9] = [i for i in range(0, 11)]

# set aisle rewards
for row in range(1, 10):
    for col in aisles[row]:
        rewards[row, col] = -1.0

In [14]:
# view rewards matrix
for row in rewards:
    print(row)

[-100. -100. -100. -100. -100.  100. -100. -100. -100. -100. -100.]
[-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.]
[-100.   -1. -100. -100. -100. -100. -100.   -1. -100.   -1. -100.]
[-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.   -1. -100.]
[-100. -100. -100.   -1. -100. -100. -100.   -1. -100. -100. -100.]
[-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]
[-100. -100. -100. -100. -100.   -1. -100. -100. -100. -100. -100.]
[-100.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1.   -1. -100.]
[-100. -100. -100.   -1. -100. -100. -100.   -1. -100. -100. -100.]
[-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]
[-100. -100. -100. -100. -100. -100. -100. -100. -100. -100. -100.]


In [15]:
# define helper functions
def is_terminal_state(row, col):
    """This function returns whether or not we are at a terminal state."""
    return not rewards[row, col] == -1.0

def get_starting_location():
    """This function initializes a random non-terminal starting location."""
    row = np.random.randint(environment_rows) # random row
    col = np.random.randint(environment_cols) # random col
    # keep choosing until a non-terminal state is identified
    # (i.e. a white aisle square)
    while is_terminal_state(row, col):
        row = np.random.randint(environment_rows)
        col = np.random.randint(environment_cols)
        
    return row, col

def get_next_action(row, col, ϵ):
    """This function uses an epsilon greedy algorithm to choose the next action."""
    if np.random.random() < ϵ:
        # choose most promising option
        return np.argmax(Q[row, col])
    else:
        # choose a random action
        return np.random.randint(4)

def get_next_location(row, col, action):
    """This function gets the next location based on the chosen action."""
    new_row, new_col = row, col
    if actions[action] == 'up' and row > 0:
        new_row -= 1
    elif actions[action] == 'down' and row < environment_rows - 1:
        new_row += 1
    elif actions[action] == 'left' and col > 0:
        new_col -= 1
    elif actions[action] == 'right' and col < environment_cols - 1:
        new_col += 1
    
    return new_row, new_col

In [16]:
# define shortest path function
def get_shortest_path(start_row, start_col):
    """This function returns the shortest path from the starting state to the goal."""
    if is_terminal_state(start_row, start_col):
        return [] # invalid starting location
    else:
        # save current state
        current_row, current_col = start_row, start_col
        
        # make best path list and add current state
        shortest_path = []
        shortest_path.append([current_row, current_col])
        
        # continue moving until the goal is reached
        while not is_terminal_state(current_row, current_col):
            # get best action to take
            action = get_next_action(current_row, current_col, 1.)
            
            # update the state
            current_row, current_col = get_next_location(current_row, current_col, action)
            
            # update path
            shortest_path.append([current_row, current_col])
        
        return shortest_path

In [17]:
# display untrained answers
print(get_shortest_path(3, 9)) # row 3, col 9
print(get_shortest_path(5, 0)) # row 5, col 0
print(get_shortest_path(9, 5)) # row 9, col 5

[[3, 9], [2, 9], [1, 9], [0, 9]]
[[5, 0], [4, 0]]
[[9, 5], [8, 5]]


In [18]:
# train AI agent using Q-Learning
# define training parameters
ϵ = 0.90 # greedy epsilon algorithm
γ = 0.90 # discount factor
α = 0.90 # learning rate

# number of iterations
N = 1_000

# simulate
for i in range(N):
    # get starting location
    row, col = get_starting_location()
    
    # continue moving until we reach a terminal state
    # (reach the goal or crash into an aisle)
    while not is_terminal_state(row, col):
        # get next action to take
        action = get_next_action(row, col, ϵ)
        
        # perform action and transition to next state
        old_row, old_col = row, col
        row, col = get_next_location(row, col, action)
        
        # receive reward for moving to new state
        reward = rewards[row, col]
        old_Q = Q[old_row, old_col, action]
        
        # compute temporal difference
        TD = reward + γ * np.max(Q[row, col]) - old_Q
        
        # update Q-value for previous state and action pair
        new_Q = old_Q + α * TD
        Q[old_row, old_col, action] = new_Q
        
print('Training complete!')

Training complete!


In [19]:
# display the final Q-values
print('Q-values:\n{}'.format(Q.astype(int)))

Q-values:
[[[   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]]

 [[   0    0    0    0]
  [ -99   -3  -99   62]
  [ -99  -99   54   70]
  [ -99  -99   62   79]
  [ -99  -99   70   89]
  [ 100 -100   79   79]
  [ -99  -99   89   70]
  [-100   62   79   62]
  [ -99  -90   70   54]
  [ -99   43   62  -99]
  [   0    0    0    0]]

 [[   0    0    0    0]
  [  54   37  -99  -90]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [   0    0    0    0]
  [  70   54  -99  -99]
  [   0    0    0    0]
  [  54   -2  -90  -99]
  [   0    0    0    0]]

 [[   0    0    0    0]
  [  48  -90  -99   -4]
  [ -90  -99   42   -5]
  [ -90   -5   -5   37]
  [ -99  -99   -4   42]
  [ -99  -99   33   48]
  [ -90  -99   42   54]
  [  62   48   48  -99]


In [20]:
# display a few shortest paths
print(get_shortest_path(3, 9)) # row 3, col 9
print(get_shortest_path(5, 0)) # row 5, col 0
print(get_shortest_path(9, 5)) # row 9, col 5

[[3, 9], [2, 9], [1, 9], [1, 8], [1, 7], [1, 6], [1, 5], [0, 5]]
[[5, 0], [5, 1], [5, 2], [5, 3], [5, 4], [5, 5], [5, 6], [5, 7], [4, 7], [3, 7], [2, 7], [1, 7], [1, 6], [1, 5], [0, 5]]
[[9, 5], [9, 4], [9, 3], [8, 3], [7, 3], [7, 4], [7, 5], [6, 5], [5, 5], [5, 6], [5, 7], [4, 7], [3, 7], [2, 7], [1, 7], [1, 6], [1, 5], [0, 5]]
