In [5]:
import numpy as np
from matplotlib import pyplot as plt


In [6]:
class Agent:
    def choose_action(self,state):
        action = 0
        if np.random.uniform(0,1) < self.epsilon:
            action = self.action_space.sample()
        else:
            action = np.argmax(self.Q[state])
        return action
class DoubleLearningAgent:
    def choose_action(self,state):
        action = 0
        if np.random.uniform(0,1) < self.epsilon:
            action = self.action_space.sample()
        else:
            action = np.argmax(self.Q1[state]+self.Q2[state])
        return action

In [7]:
class BoxObservationSarsaAgent(Agent):
    def __init__(self,epsilon,alpha,gamma,Q,action_space):
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.action_space = action_space
        self.Q = Q
    def update(self,prev_state,prev_action,reward,next_state,next_action):
        prediction = self.Q[prev_state+(prev_action,)]
        target = reward + self.gamma * self.Q[next_state+(next_action,)]
        error = target - prediction
        self.Q[prev_state+(prev_action,)] += self.alpha * error
        
class BoxObservationQLearningAgent(Agent):
    def __init__(self,epsilon,alpha,gamma,Q,action_space):
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.action_space = action_space
        self.Q = Q
    def update(self,prev_state,prev_action,reward,next_state,next_action):
        prediction = self.Q[prev_state+(prev_action,)]
        target = reward + self.gamma * np.amax(self.Q[next_state])
        error = target - prediction
        self.Q[prev_state+(prev_action,)] += self.alpha * error

class BoxObservationDoubleQLearningAgent(DoubleLearningAgent):
    def __init__(self,epsilon,alpha,gamma,Q1,Q2,action_space):
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.action_space = action_space
        
        self.Q1 = Q1
        self.Q2 = Q2

        self.action_space = action_space
    def update(self,prev_state,prev_action,reward,next_state,next_action):
        if np.random.uniform(0,1) < .5:
            prediction = self.Q1[prev_state+(prev_action,)]
            target = reward + self.gamma*self.Q2[next_state+(np.argmax(self.Q1[next_state]),)]
            error = target - prediction
            self.Q1[prev_state+(prev_action,)] += self.alpha * error
        else:
            prediction = self.Q2[prev_state+(prev_action,)]
            target = reward + self.gamma*self.Q1[next_state+(np.argmax(self.Q2[next_state]),)]
            error = target - prediction
            self.Q2[prev_state+(prev_action,)] += self.alpha * error

In [11]:
import gym
import math

env = gym.make('CartPole-v0')
NUM_BUCKETS = (1, 1, 6, 3)  # (x, x', theta, theta')
NUM_ACTIONS = env.action_space.n # (left, right)
# Bounds for each discrete state
STATE_BOUNDS = list(zip(env.observation_space.low, env.observation_space.high))
# Manually setting bounds (needed for the x_dot and theta_dot)
#STATE_BOUNDS[0] = [STATE_BOUNDS[0][0]/2, STATE_BOUNDS[0][1]/2]
STATE_BOUNDS[1] = [-0.5, 0.5]
#STATE_BOUNDS[2] = [STATE_BOUNDS[2][0]/2, STATE_BOUNDS[2][0]/2]
STATE_BOUNDS[3] = [-math.radians(50), math.radians(50)]

## Learning related constants
MIN_EXPLORE_RATE = 0.01
MIN_LEARNING_RATE = 0.2

# Defining the simulation related constants
NUM_EPISODES = 500
MAX_T = 500
STREAK_TO_END = 100
SOLVED_T = 199

NUM_TRIALS = 1
DEBUG = False

def get_explore_rate(t):
    return max(MIN_EXPLORE_RATE, min(1, 1.0 - math.log10((t+1)/25)))

def get_learning_rate(t):
    return max(MIN_LEARNING_RATE, min(0.5, 1.0 - math.log10((t+1)/25)))


def state_to_bucket(state):
    bucket_indice = []
    for i in range(len(state)):
        if state[i] <= STATE_BOUNDS[i][0]:
            bucket_index = 0
        elif state[i] >= STATE_BOUNDS[i][1]:
            bucket_index = NUM_BUCKETS[i] - 1
        else:
            # Mapping the state bounds to the bucket array
            bound_width = STATE_BOUNDS[i][1] - STATE_BOUNDS[i][0]
            offset = (NUM_BUCKETS[i]-1)*STATE_BOUNDS[i][0]/bound_width
            scaling = (NUM_BUCKETS[i]-1)/bound_width
            bucket_index = int(round(scaling*state[i] - offset))
        bucket_indice.append(bucket_index)
    return tuple(bucket_indice)


boxQAgent = BoxObservationQLearningAgent(1,1,1,np.zeros(NUM_BUCKETS + (NUM_ACTIONS,)),env.action_space)
boxDoubleQAgent = BoxObservationDoubleQLearningAgent(1,1,1,np.zeros(NUM_BUCKETS + (NUM_ACTIONS,)),np.zeros(NUM_BUCKETS + (NUM_ACTIONS,)),env.action_space)
boxSarsa = BoxObservationSarsaAgent(1,1,1,np.zeros(NUM_BUCKETS + (NUM_ACTIONS,)),env.action_space)
agents = [boxQAgent,boxDoubleQAgent,boxSarsa]

episdoesRequiredToSolve = {
    'BoxObservationSarsaAgent': [],
    'BoxObservationQLearningAgent': [],
    'BoxObservationDoubleQLearningAgent':[]
}
for agent in agents:
    for trial in range(NUM_TRIALS):
        agent.alpha = get_learning_rate(0)
        agent.epsilon = get_explore_rate(0)
        agent.gamma = 0.999  # since the world is unchanging

        num_streaks = 0

        for episode in range(NUM_EPISODES):
            obv = env.reset()

            state = state_to_bucket(obv)

            for t in range(MAX_T):
                if DEBUG:
                    env.render()

                action = agent.choose_action(state)

                obv,reward,done,_ = env.step(action)

                next_state = state_to_bucket(obv)

                next_action = agent.choose_action(next_state)

                agent.update(state,action,reward,next_state,next_action)

                state = next_state

                if done:
                    if DEBUG:
                        print("Episode %d finished after %f time steps" % (episode, t))
                    if t >= SOLVED_T:
                        num_streaks += 1
                    else:
                        num_streaks = 0
                    break
            if num_streaks > STREAK_TO_END:
                episdoesRequiredToSolve[type(agent).__name__].append(episode)
                break
            elif episode == NUM_EPISODES:
                episdoesRequiredToSolve[type(agent).__name__].append(NUM_EPISODES)
            agent.epsilon = get_explore_rate(episode)
            agent.alpha = get_learning_rate(episode)
env.close()

In [12]:
for agent in agents:
    print(type(agent).__name__,episdoesRequiredToSolve[type(agent).__name__])

BoxObservationQLearningAgent [274]
BoxObservationDoubleQLearningAgent [283]
BoxObservationSarsaAgent []
