@author: Gediyon M. Girma

Cart-Pole control using TD(0) methods:

a. on-policy SARSA

b. off-policy Q learning

c. off-policy Expected SARSA with an epsilon-greedy policy

In [1]:
import gym
import time
import itertools
import random
import numpy as np
env = gym.make('CartPole-v0')

  logger.warn(
  deprecation(
  deprecation(


In [2]:
# discretization of the observation space

bins = (20, 20, 20, 20)

def discretize_observation(observation):
  """
  Discretizes the observation space.

  Args:
    observation: The observation to discretize.

  Returns:
    The discretized observation.
  """

  low = [-4.8, -5, -0.418, -5]
  high = [4.8, 5, 0.418, 5]

  return tuple(np.digitize(observation[i], np.linspace(low[i], high[i], bins[i] + 1)) - 1 for i in range(4))

  and should_run_async(code)


In [7]:
# the on-policy SARSA initialization

alpha = 0.1
gamma = 0.99
epsilon = 0.1

performance = {}

#########  initialization ###########


# formaulate the state space with every combination of the discritsized elements of the states
states = itertools.product(np.arange(bins[0]),
                          np.arange(bins[1]),
                          np.arange(bins[2]),
                          np.arange(bins[3]))

q = {}
policy = {}



for state in states:
  q[state] = np.zeros((env.action_space.n)) # initialize the action-value function

  # initializing the epsilon-greedy policy arbitrarly
  policy[state] = np.full(((env.action_space.n)),epsilon/env.action_space.n)
  policy[state][np.random.randint(env.action_space.n)] += 1-epsilon

########## iteration  #############

episodes = 1e5
start_timer = time.time()
episode = 1
while episode < episodes:

  # reset the environment
  obs = env.reset()
  state = discretize_observation(obs) # discretize the observation
  action = np.random.choice(np.arange(env.action_space.n), p = policy[state]) # selecting an action
  # generating an episodes using a an epsilon greedy policy
  for t in range(1,5000):
    obs, reward, done, info = env.step(action)
    next_state = discretize_observation(obs) # next state
    next_action = np.random.choice(np.arange(env.action_space.n), p = policy[state]) # next action
    q[state][action] += alpha * (reward + gamma * q[next_state][next_action] - q[state][action]) # update the action-value function

    # update the policy based on the action-value function
    best_action = np.argmax(q[state]) # selecting the best action based on the action-value function
    policy[state] = np.full(((env.action_space.n)),epsilon/env.action_space.n) # resetting the prob. for all action
    policy[state][best_action] += 1 - epsilon  # adding 1-epsilon to the best action's probability

    state = next_state
    action = next_action

    if done:
      break


  episode += 1
  if episode % 10000 == 0:
    end_timer = time.time()
    timer = end_timer - start_timer
    elapsed_time_struct = time.gmtime(timer)
    formatted_time = time.strftime("%H:%M:%S", elapsed_time_struct)
    print("Episode: ",episode, " time: ", formatted_time)

  if episode == episodes:
    print("done!")

########## test ##############

steps = []

for j in range(100):
  obs = env.reset()
  state = discretize_observation(obs)
  for i in range(1,50000):
    action = np.argmax(policy[state])
    # print("step",i, "reward: ", reward," state: ", state," action=",action)
    obs, reward, done, info = env.step(action)
    state = discretize_observation(obs)

    if done:
      #print ("done")
      steps.append(i)
      break
print(steps)
avg_step = np.mean(steps)
performance[episode] = avg_step
print("Average steps til termination: ",avg_step)



# ipythondisplay.clear_output(wait=True)
env.close()

Episode:  10000  time:  00:02:02
Episode:  20000  time:  00:05:46
Episode:  30000  time:  00:10:43
Episode:  40000  time:  00:17:08
Episode:  50000  time:  00:23:53
Episode:  60000  time:  00:30:54
Episode:  70000  time:  00:37:55
Episode:  80000  time:  00:45:05
Episode:  90000  time:  00:52:18
Episode:  100000  time:  00:59:31
done!
[200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 185, 193, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 183, 186, 200, 200, 200, 200, 200, 176, 200, 200, 200, 200, 200, 200, 200, 183, 200, 200, 174, 200, 200, 200, 193, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 192, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 159, 200, 180, 200, 200, 183, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 187, 200, 200]
Average steps to solution:  197.74


Off policy Q-learning

In [4]:
########  initialization  #########


alpha = 0.1
gamma = 0.99
epsilon = 0.1

# formaulate the state space with every combination of the discritsized elements of the states
states = itertools.product(np.arange(bins[0]),
                           np.arange(bins[1]),
                           np.arange(bins[2]),
                           np.arange(bins[3]))

q = {}
b = {}  # behavioural policy


for state in states:
  q[state] = np.zeros((env.action_space.n)) # initialize the action-value function

  # initializing the epsilon-greedy policy with respect to the action-value function
  b[state] = np.full(((env.action_space.n)),epsilon/env.action_space.n)
  b[state][np.random.randint(env.action_space.n)] += 1-epsilon

######## iteration ############

episodes = 1e5
start_timer = time.time()
episode = 1
while episode < episodes:

  # reset the environment
  obs = env.reset()
  state = discretize_observation(obs) # discretize the observation

  for t in range(1,1000):
    action = np.random.choice(np.arange(env.action_space.n), p = b[state]) # select an action
    obs, reward, done, info = env.step(action)
    next_state = discretize_observation(obs) # next state

    # update the action-value function
    q[state][action] += alpha * (reward + gamma * np.max(q[next_state]) - q[state][action]) # update the action-value function

    # update the policy based on the action-value function
    best_action = np.argmax(q[state]) # selecting the best action based on the action-value function
    b[state] = np.full(((env.action_space.n)),epsilon/env.action_space.n) # resetting the policy for the given state
    b[state][best_action] += 1 - epsilon # setting the probability of the best action to 1

    state = next_state

    # check if the episode is done
    if done:
      break


  episode += 1
  if episode % 10000 == 0:
    end_timer = time.time()
    timer = end_timer - start_timer
    elapsed_time_struct = time.gmtime(timer)
    formatted_time = time.strftime("%H:%M:%S", elapsed_time_struct)
    print("Episode: ",episode, " time: ", formatted_time)

  if episode == episodes:
    print("done!")

##########   test  ###########

steps_to_solution = []

for j in range(100):
  obs = env.reset()
  state = discretize_observation(obs)
  for i in range(1,50000):
    action = np.argmax(q[state])
    # print("step",i, "reward: ", reward," state: ", state," action=",action)
    obs, reward, done, info = env.step(action)
    state = discretize_observation(obs)

    if done:
      #print ("done")
      steps_to_solution.append(i)
      break
print(steps_to_solution)
avg_step = np.mean(steps_to_solution)
print("Average steps till termination: ",avg_step)

# ipythondisplay.clear_output(wait=True)
env.close()

Episode:  10000  time:  00:06:30
Episode:  20000  time:  00:14:19
Episode:  30000  time:  00:22:14
Episode:  40000  time:  00:30:20
Episode:  50000  time:  00:38:34
Episode:  60000  time:  00:46:51
Episode:  70000  time:  00:54:59
Episode:  80000  time:  01:03:11
Episode:  90000  time:  01:11:24
Episode:  100000  time:  01:19:38
done!
[200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200]
Average steps to solution:  200.0


Off-policy Expected SARSA with an epsilon-greedy policy

In [8]:
#######  initialization   ########

# the off-policy SARSA with epsilon greedy control policy
alpha = 0.1
gamma = 0.99
epsilon = 0.1

# formaulate the state space with every combination of the discritsized elements of the states
states = itertools.product(np.arange(bins[0]),
                           np.arange(bins[1]),
                           np.arange(bins[2]),
                           np.arange(bins[3])
                           )

q={}
policy = {}
b = {}  # behavioural policy


for state in states:
  q[state] = np.zeros((env.action_space.n)) # initialize the action-value function

  # initializing the epsilon-greedy policy with respect to the action-value function
  policy[state] = np.full(((env.action_space.n)),epsilon/env.action_space.n)
  policy[state][np.random.randint(env.action_space.n)] += 1-epsilon

  # initializing the behaviour policy with equal probability of selecting each action
  b[state] = np.full(((env.action_space.n)),1/env.action_space.n)



########  Iteration  ###########

episodes = 1e5
start_timer = time.time()
episode = 1
while episode < episodes:

  # reset the environment
  obs = env.reset()
  state = discretize_observation(obs) # discretize the observation

  for t in range(1,1000):
    action = np.random.choice(np.arange(env.action_space.n), p = b[state]) # select an action
    obs, reward, done, info = env.step(action)
    next_state = discretize_observation(obs) # next state

    # update the action-value function
    q[state][action] += alpha * (reward + sum(gamma * policy[next_state][i] * q[next_state][i]
                                 for i in range(env.action_space.n)) - q[state][action])

    # update the target policy based on the action-value function
    best_action = np.argmax(q[state]) # selecting the best action based on the action-value function
    policy[state] = np.full(((env.action_space.n)),epsilon/env.action_space.n) # resetting the policy for the given state
    policy[state][best_action] += 1 - epsilon # setting the probability of the best action to 1


    state = next_state

    # check if the episode is done
    if done:
      break


  episode += 1
  if episode % 10000 == 0:
    end_timer = time.time()
    timer = end_timer - start_timer
    elapsed_time_struct = time.gmtime(timer)
    formatted_time = time.strftime("%H:%M:%S", elapsed_time_struct)
    print("Episode: ",episode, " time: ", formatted_time)

  if episode == episodes:
    print("done!")


##########   test  ###########

steps_to_solution = []

for j in range(100):
  obs = env.reset()
  state = discretize_observation(obs)
  for i in range(1,50000):
    action = np.argmax(q[state])
    # print("step",i, "reward: ", reward," state: ", state," action=",action)
    obs, reward, done, info = env.step(action)
    state = discretize_observation(obs)

    if done:
      #print ("done")
      steps_to_solution.append(i)
      break
print(steps_to_solution)
avg_step = np.mean(steps_to_solution)
print("Average steps till termination: ",avg_step)

# ipythondisplay.clear_output(wait=True)
env.close()

Episode:  10000  time:  00:00:58
Episode:  20000  time:  00:01:55
Episode:  30000  time:  00:02:52
Episode:  40000  time:  00:03:51
Episode:  50000  time:  00:04:49
Episode:  60000  time:  00:05:46
Episode:  70000  time:  00:06:44
Episode:  80000  time:  00:07:42
Episode:  90000  time:  00:08:38
Episode:  100000  time:  00:09:30
done!
[200, 112, 160, 200, 128, 155, 200, 200, 200, 172, 157, 200, 200, 200, 151, 185, 190, 200, 200, 200, 156, 174, 200, 182, 200, 200, 164, 118, 200, 197, 200, 200, 200, 200, 159, 103, 161, 200, 200, 148, 139, 191, 200, 200, 200, 122, 176, 160, 160, 181, 200, 195, 200, 116, 156, 200, 147, 180, 200, 135, 200, 200, 200, 130, 200, 200, 137, 200, 200, 184, 200, 200, 200, 160, 155, 171, 153, 123, 200, 194, 95, 128, 175, 200, 200, 200, 164, 195, 126, 200, 127, 159, 159, 133, 200, 127, 200, 181, 122, 126]
Average steps to solution:  174.54
