# TSFS12 Hand-in exercise 5: Learning for autonomous vehicles - Reinforcement learning and Q-learning

Initial imports

In [1]:
import matplotlib.pyplot as plt
import numpy as np
from rl_auxiliary import plot_iter, next_state, BoxOff

In [2]:
%matplotlib

Using matplotlib backend: TkAgg


# Parameterize the learning algorithm

In [3]:
gamma = 0.99
R_goal = 0.0 # Reward for reaching goal state
R_sink = -10.0  # Reward for reaching 'cliff' states
R_grid = -0.1 # Reward for remaining states
alpha = 0.5  # learning rate in Q-update
eps = 0.5  # epsilon-greedy parameter

P_move_action = 1.0  # probability of moving in the direction specified by action
P_dist = (1-P_move_action)/2  # probability of moving sideways compared to intended because of disturbance

n_rows = 4
n_cols = 5

# Set start and goal/sink positions.
goal = np.array([3, 4])  # element index goal state
sink = np.array([[3, 1], [3, 2], [3, 3]])  # element indicies for cliff states

# Setup reward matrix R
R = np.full((n_rows, n_cols), fill_value=R_grid)
R[goal[0], goal[1]] = R_goal
R[sink[:, 0], sink[:, 1]] = R_sink

occ_grid = np.zeros((n_rows, n_cols))
occ_grid[1, 1] = 1

# Save parameters in a dictionary
params = {'gamma': gamma, 'R_goal': R_goal, 'R_sink': R_sink,
          'alpha': alpha, 'eps': eps,
          'R_grid': R_grid, 'P_move_action': P_move_action, 
          'P_dist': P_dist, 'n_rows': n_rows, 'n_cols': n_cols, 
          'goal': goal, 'sink': sink, 'R': R, 'occ_grid': occ_grid}

# Main learning loop

In [4]:
def select_eps_greedy(s_curr, k, Q, params):
    """Selection the action to take using a epsilon-greedy strategy
    
      action = select_eps_greedy(s_curr, k, Q, params):
      
      Input:
          s_curr - current satte
          k - Current iteration number
          Q - Q matrix
          params - parameter dictionary
          
      Output:
          action - selected action
    """
    eps = params['eps']

    rnd = np.random.uniform()
    max_a = np.argmax(Q[s_curr[0], s_curr[1]])

    a_list = []
    for a in range(4):
        if not a == max_a:
            a_list.append(a)

    if rnd < 1-eps+eps/4:
        action = max_a
    elif rnd < 1-eps+eps/2:
        action = a_list[0]
    elif rnd < 1-eps+3*eps/4:
        action = a_list[1]
    else:
        action = a_list[2]
        
    return action

Initialize main learning loop

In [5]:
# Initialize value function for each state
V = np.zeros((n_rows, n_cols))

# Actions - ['left','right','up','down'] counted as 0-3

# Initialize Q for terminal states to zero
Q = np.random.uniform(size=(n_rows, n_cols, 4))  # Number of rows x number of columns x number of actions
Q[goal[0], goal[1]] = 0.0
for si in sink:
    Q[si[0], si[1]] = 0.0
    
# Initialize vector for policy
Pi = np.full((n_rows, n_cols), fill_value=-1)

# Define number of iterations for Q-learning
nbr_iters = 10000

# Initialize for sum of rewards for each episode
sum_r = np.zeros(nbr_iters)

Execute iterations until convergance.

In [6]:
converged = False
for k in range(nbr_iters):
    # Start state
    s_curr = [n_rows-1, 0]
    
    terminal_state = False
    
    while not terminal_state:
        # Select action according to epsilon-greedy strategy
        action = select_eps_greedy(s_curr, k, Q, params)
        
        # Perform the action and receive reward and next state
        s_next,r = next_state(s_curr, action, params)
        
        # Q-learning update of action-value function
        Q[s_curr[0], s_curr[1], action] = (
            Q[s_curr[0], s_curr[1], action] + 
                alpha*(r + gamma*np.max(Q[s_next[0], s_next[1]]) - 
                Q[s_curr[0], s_curr[1], action]))
        
        # Update the sum of reward vector
        sum_r[k] = sum_r[k] + r
        
        s_curr = s_next
        
        # Check if a terminal state has been reached (closes an episode)
        if (np.all(s_curr==goal) or
            np.any(np.logical_and(s_curr[0]==sink[:, 0], s_curr[1]==sink[:, 1]))):
            terminal_state = True
            
            # Update value function and policy
            for row in range(n_rows):
                for col in range(n_cols):
                    if ((occ_grid[row, col] == 1) or 
                        np.all([row, col]==goal) or
                        np.any(np.logical_and(row==sink[:, 0], col==sink[:, 1]))):
                        continue
                    max_a = np.argmax(Q[row, col])
                    V_ij = Q[row, col, max_a]
                    V[row, col] = V_ij
                    Pi[row, col] = max_a    

Visualize the value function and policy after all iterations

In [7]:
plot_iter(V, Pi, params)

Compute average of reward for N episodes for smoothing

In [8]:
N = 40
mean_sum_r = np.zeros(sum_r.shape[0])

for k in range(N, sum_r.shape[0]):
    mean_sum_r[k] = np.mean(sum_r[k-N:k])

Visualize the evolution of the reward for each episode

In [9]:
plt.figure(3, clear=True)
plt.plot(mean_sum_r[N:], lw=0.5)
plt.title('Sum of rewards for each episode (average over {})'.format(N))
BoxOff()