In [2]:
import numpy as np
import random
import sys
import matplotlib.pyplot as plt
sys.setrecursionlimit(100000)
from tqdm import tqdm
import time

## Helper functions

This functions should be run if experimenting with the snake pit is over. The functions are overwritten if their "snake-pit" confirguration is not required anymore. 

In [25]:
#given the current position and chosen action
#returns the landing spot - S'
def get_sprime(state,action):
  if action == 0:
    return state[0]-1,state[1]
  if action == 1:
    return state[0], state[1]+1
  if action == 2:
    return state[0]+1,state[1]
  if action == 3:
    return state[0], state[1]-1

In [26]:
#given postion and action
#returns the reeward accordingly
def reward(position, action):
  if position == (2,20) and action == 2:
    return 20
  cliff_edge = [(2,x) for x in range(1,20)] 
  if position in cliff_edge and action == 2:
    return -100
  return -1

In [27]:
def get_actions(position):
  
  if position == (0,0):
    return 1, 2
  if position == (0,20):
    return 2, 3
  if position == (3,0):
    return [0]
  
  #up, right, down - left margin
  if position in [(1,0),(2,0)]:
    return 0, 1, 2
  
  #left, down, right - upper margin
  if position in [(0,x) for x in range(1,20)]:
    return 1, 2, 3
  
  #up, left, down - right margin
  if position in [(1,20),(2,20)]:
    return 0, 2, 3
  
  return 0, 1, 2, 3

In [28]:
#displays the Q values in a clean manner
def display(q_values):
  qval = q_values.tolist()
  for act in qval:
    for line in act:
      print(line)
    print('\n')

In [29]:
#given the learned Q values
#displays the best action for each position on the grid
def display_actiongrid(q_values):
  act_grid = np.array([[None] * 21 ]* 3)
  for line in range(3):
    for col in range(21):
      cr_val = []
      pos_act = get_actions((line,col))
      for action in pos_act:
        cr_val.append(q_values[action][line][col])
      best_act = pos_act[np.argmax(np.array(cr_val))]
      act_grid[line][col] = action_sign(best_act)

  print('\n')
  for act in list(act_grid):
    print(*act)

In [30]:
#used in display path to make the actions more distinguishable 
def action_sign(action):
  if action == 0:
    return('↑')
  if action == 1:
    return('→')
  if action == 2:
    return('↓')
  if action == 3:
    return('←')

In [31]:
#given the learned Q values
#prints the best available path (taking the best available action at each step)
# commented lines are made for trying to escape loops
def display_path(q_values):
  max_steps = 50
  cr_pos = (3,0)
  prev_pos = (-1, -1)
  endstates = [(3,x) for x in range(1,21)] 
  step = 0
  while cr_pos not in endstates:
    print(cr_pos)
    action = choose_action(cr_pos, q_values, -1)
    next_pos = get_sprime(cr_pos, action)
    # while next_pos == prev_pos or next_pos in cliff:
    #   action = chose_action(cr_pos, q_values, -1)
    #   next_pos = get_sprime(cr_pos, action)
    # prev_pos = cr_pos
    print(action_sign(action))
    cr_pos = next_pos

    step+= 1
    if step >= max_steps:
      break

In [32]:
#checks if the learned policy leads to the goal (in a small number of steps)
def is_conv(q_values):
  max_steps = 35
  cr_pos = (3,0)
  goal = (3,20)
  step = 0
  while cr_pos != goal:
    action = choose_action(cr_pos, q_values, -1)
    cr_pos = get_sprime(cr_pos, action)
    step+= 1
    if step >= max_steps:
      return False
  return True

In [33]:
#implementation of e-greedy algorithm
def choose_action(current_pos, q_values, e):
  line = current_pos[0]
  col = current_pos[1]
  pos_act = list(get_actions(current_pos))
  gen = random.uniform(0, 1)

  #e=1 -> 
  #initially completely random, gen will be surely smaller 
  if gen < e:
    chosen_act = random.choice(pos_act)
    #print("Rand chosen action {} from available {} on position {}".format(chosen_act, pos_act, current_pos))
    return chosen_act

  #-> e=0.1
  #in the end, e will be smaller, thus more greedy actions allowed
  if gen >= e:
    q_pos = []
    for i in pos_act:
      q_pos.append(q_values[i][line][col])
    # print(q_pos) 
    # print(np.argmax(np.array(q_pos)))
    return pos_act[np.argmax(np.array(q_pos))]

In [34]:
#returns the quality of the best available action 
#given the current position   
#special case - max Q when final state is reached = 0 
def get_max(q_val, line, col):
  final_states = [(3,x) for x in range(1,21)]
  if (line, col) in final_states:
    return 0

  current_pos = (line, col)
  pos_act = list(get_actions(current_pos))
  q_pos = []
  for i in pos_act:
    q_pos.append(q_val[i][line][col]) 
  return max(q_pos)

In [35]:
def get_quality(q_val, state, action):
  final_states = [(3,x) for x in range(1,21)]
  if state in final_states:
    return 0
  line, col = state
  #print(action, line, col)
  return q_val[action][line][col]

In [36]:
#state is a tuple
def update_qlearn(state, action, reward, lr, gamma, q_val):
  line = state[0]
  col = state[1] 
  prime_line, prime_col = get_sprime(state,action)
  q_val[action][line][col] = q_val[action][line][col] + lr * ( reward + gamma * get_max(q_val, prime_line, prime_col) - q_val[action][line][col])
  return q_val
  
def update_sarsa(state, action, reward, state_prime, action_prime, lr, gamma, q_val):
  col = state[1]
  line = state[0]
  q_val[action][line][col] = q_val[action][line][col] + lr * ( reward + gamma * get_quality(q_val, state_prime, action_prime) - q_val[action][line][col]) 
  return q_val

In [37]:
def episode_qlearn(q_val, rep_buf, hyper_param, pos_cr):
  #print(pos_cr)

  e, lr, gamma = hyper_param
  final_states = [(3,x) for x in range(1,21)]
  action = choose_action(pos_cr, q_val, e)
  #print(action_sign(action))
  r = reward(pos_cr,action)
  q_val = update_qlearn(pos_cr, action, r, lr, gamma, q_val)

  s_prime = get_sprime(pos_cr, action)
  rep_buf.append((pos_cr, action, r, s_prime))
  if s_prime in final_states:
    return q_val, rep_buf
  return episode_qlearn(q_val, rep_buf, hyper_param, s_prime)

In [38]:
def episode_sarsa(q_val, hyper_param, pos_cr, action):
  #print(pos_cr)
  e, lr, gamma = hyper_param
  final_states = [(3,x) for x in range(1,21)]
  #print(action_sign(action))

  r = reward(pos_cr,action)
  s_prime = get_sprime(pos_cr, action)
  action_prime = choose_action(s_prime, q_val, e)

  q_val = update_sarsa(pos_cr, action, r, s_prime, action_prime, lr, gamma, q_val)
  
  if s_prime in final_states:
    return q_val
  return episode_sarsa(q_val, hyper_param, s_prime, action_prime)

## Buffer functions

In [67]:
def get_instance(rep_buf):
  return random.choice(rep_buf)

In [68]:
def update_qlearn_buf(q_val, sars, hyper_param):
  e, lr, gamma = hyper_param
  state, action, reward, s_prime = sars
  line = state[0]
  col = state[1]
  prime_line = s_prime[0]
  prime_col = s_prime[1]
  q_val[action][line][col] = q_val[action][line][col] + lr * ( reward + gamma * get_max(q_val, prime_line, prime_col) - q_val[action][line][col])
  return q_val

In [69]:
#takes the replay buffer
def train_buffer(rep_buf, hyper_param, max_steps):
  q_val = np.zeros((4, 4, 21))
  for i in range(max_steps):
    sars = get_instance(rep_buf)
    q_val = update_qlearn_buf(q_val, sars, hyper_param)
    if is_conv(q_val):
      #print("The number of required updates was {}".format(i))
      break
  
  return i
  #display_path(q_values)
  #display_actiongrid(q_values)

## Snake pit

To implement the snake pit, some of the previous function are overwrriten.

To experiment with it, these cells should be executed. If the standard case is prefered, these cells should be omitted.

In [17]:
#overwriting
def reward(position, action):
  if position == (2,20) and action == 2:
    return 20
  cliff_edge = [(2,x) for x in range(1,20)] 
  if position in cliff_edge and action == 2:
    return -100
  
  #snake
  if position == (0,9) and action == 1:
    return -100
  if position == (0,11) and action == 3:
    return -100
  if position == (1,10) and action == 0:
    return -100
  
  return -1

In [18]:
#overwriting
def display_path(q_values):
  max_steps = 50
  cr_pos = (3,0)
  prev_pos = (-1, -1)
  endstates = [(3,x) for x in range(1,21)] 
  endstates.append((0,10))
  step = 0
  while cr_pos not in endstates:
    print(cr_pos)
    action = choose_action(cr_pos, q_values, -1)
    next_pos = get_sprime(cr_pos, action)
    # while next_pos == prev_pos or next_pos in cliff:
    #   action = chose_action(cr_pos, q_values, -1)
    #   next_pos = get_sprime(cr_pos, action)
    # prev_pos = cr_pos
    print(action_sign(action))
    cr_pos = next_pos

    step+= 1
    if step >= max_steps:
      break

In [19]:
def get_max(q_val, line, col):
  final_states = [(3,x) for x in range(1,21)]
  final_states.append((0,10))
  if (line, col) in final_states:
    return 0

  current_pos = (line, col)
  pos_act = list(get_actions(current_pos))
  q_pos = []
  for i in pos_act:
    q_pos.append(q_val[i][line][col]) 
  return max(q_pos)

In [20]:
def get_quality(q_val, state, action):
  final_states = [(3,x) for x in range(1,21)]
  final_states.append((0,10))
  if state in final_states:
    return 0
  line, col = state
  return q_val[action][line][col]

In [21]:
def episode_qlearn(q_val, rep_buf, hyper_param, pos_cr):
  #print(pos_cr)

  e, lr, gamma = hyper_param
  final_states = [(3,x) for x in range(1,21)]
  final_states.append((0,10))
  action = choose_action(pos_cr, q_val, e)
  #print(action_sign(action))
  r = reward(pos_cr,action)
  q_val = update_qlearn(pos_cr, action, r, lr, gamma, q_val)

  s_prime = get_sprime(pos_cr, action)
  rep_buf.append((pos_cr, action, r, s_prime))
  if s_prime in final_states:
    return q_val, rep_buf
  return episode_qlearn(q_val, rep_buf, hyper_param, s_prime)

In [22]:
def episode_sarsa(q_val, hyper_param, pos_cr, action):
  #print(pos_cr)
  e, lr, gamma = hyper_param
  final_states = [(3,x) for x in range(1,21)]
  final_states.append((0,10))
  #print(action_sign(action))

  r = reward(pos_cr,action)
  s_prime = get_sprime(pos_cr, action)
  action_prime = choose_action(s_prime, q_val, e)

  q_val = update_sarsa(pos_cr, action, r, s_prime, action_prime, lr, gamma, q_val)
  
  if s_prime in final_states:
    return q_val
  return episode_sarsa(q_val, hyper_param, s_prime, action_prime)

## Running experiments

SARSA

In [40]:
#init
q_values = np.zeros((4, 4, 21))
start = (3, 0)

#hyperparameters
learning_rate = 0.1 #starting value
gamma = 0.9
epsilon = 0.001 #starting values
episodes = 400

#schedule hyperparameters
e_thresh = 0.2 #lower threshold for epsilon
lr_thresh = 0.01  #lower threshold learning rate
decay = 0.0002

hyper_parameters = epsilon, learning_rate, gamma
action = choose_action(start, q_values, epsilon)

st = time.process_time()
for i in tqdm(range(episodes)):

  #schedules for lr and e
  # if epsilon > e_thresh:
  #   epsilon = epsilon - decay
  # if learning_rate > lr_thresh:
  #   learning_rate = 1/(i+1)
  hyper_parameters = epsilon, learning_rate, gamma  
  q_values = episode_sarsa(q_values, hyper_parameters, start, action)

  #early stop
  # if is_conv(q_values):
  #   et = time.process_time()
  #   print(i)
  #   break

#print("SARSA converged afeter {} seconds.".format(et - st))
display_actiongrid(q_values)
# display(q_values)
#display_path(q_values)

100%|██████████| 400/400 [00:00<00:00, 577.05it/s]



→ → → → → ← ↓ ↓ ↓ → → ↓ → → ↓ → ← ↓ ← ↓ ↓
↑ ↑ → ← ↓ → ↓ → ↓ → → ↓ ↓ → ↓ ↓ ↓ ↓ → ↓ ↓
→ → → → → → → → → → → → → → → → → → → → ↓





Q-Learning

In [161]:
#init
q_values = np.zeros((4, 4, 21))
start = (3, 0)
replay_buffer = []

#hyperparameters
learning_rate = 0.1 #starting value
gamma = 0.95
epsilon = 0.2 #starting values
episodes = 2000

hyper_parameters = epsilon, learning_rate, gamma
st = time.process_time()
for i in range(episodes):
  q_values, replay_buffer = episode_qlearn(q_values, replay_buffer, hyper_parameters, start)
  
  #early stop
  # if is_conv(q_values):
  #   et = time.process_time()
  #   print(i)
  #   break

print("QLearning converged afeter {} seconds.".format(et - st))
display_actiongrid(q_values)
# display(q_values)
#display_path(q_values)
#print(len(replay_buffer))

QLearning converged afeter -23.84430531199996 seconds.


↓ → ↓ ↓ ↓ ↓ ↓ ↓ ↓ → ↓ → ↓ → ↓ ↓ ↓ ↓ ↓ ↓ ↓
↓ ↓ ↓ ↓ → ↓ ↓ → → → → → ↓ → ↓ → ↓ ↓ ↓ ↓ ↓
→ → → → → → → → → → → → → → → → → → → → ↓


Buffer experiment

In [None]:
#the experiment is repeated 10 times to be more statistically relevant and reduce the effects of randomness
reps = 20
updates_allowed = 10000
steps = []
durations = []
for i in range(reps):
  
  st = time.process_time()
  steps.append(train_buffer(replay_buffer, hyper_parameters, updates_allowed))
  et = time.process_time()
  duration = et - st
  durations.append(duration)

avg_steps = sum(steps)/len(steps)
avg_duration = sum(durations)/len(durations)
print("The average number of updates required is {}.".format(avg_steps))
print("The average duration of an experiment is {} seconds.".format(avg_duration))