In [1]:
import gym
import numpy as np
import tiles3 as tc

In [8]:
env = gym.make('MountainCar-v0').env

In [9]:
class TileCoder: 
    def __init__(self, num_tiles, num_tilings, hash_size, position_boundaries, velocity_boundaries):
        self.num_tiles = num_tiles
        self.num_tilings = num_tilings
        self.iht = tc.IHT(hash_size)
        
        self.position_scale = self.num_tiles / (position_boundaries[1] - position_boundaries[0])
        self.velocity_scale = self.num_tiles / (velocity_boundaries[1] - velocity_boundaries[0])
        
    def get_active_tiles(self, state):
        position, velocity = state
        state_scaled = [position * self.position_scale, velocity * self.velocity_scale]
        
        active_tiles = tc.tiles(self.iht, self.num_tilings, state_scaled)
        
        return np.array(active_tiles)

### SARSA agent

In [10]:
class SarsaAgent():
    def __init__(self, agent_init):
        
        self.next_tiles = None
        self.next_action = None
        
        self.discount_factor = agent_init["discount_factor"]    
        self.learning_rate = agent_init["learning_rate"]
        self.epsilon = agent_init["epsilon"]
        
        self.w = np.ones((agent_init["num_action"], agent_init["tile_coder"]["hash_size"]))
        self.tile_coder = TileCoder(*agent_init["tile_coder"].values())
        
    def choose_action(self, state):
        
        position, velocity = state
        active_tiles = self.tile_coder.get_active_tiles(state)
        
        action_values = np.zeros(env.action_space.n)
        for action in range(env.action_space.n):
            action_values[action] = np.sum(self.w[action][active_tiles])
            
        if np.random.rand() < self.epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(action_values)
            
        self.next_action = action
        self.next_tiles = active_tiles
            
        return action
        
    def update(self, state, reward, done):
        if not done:
            self.update_step(state, reward)
        else:
            self.update_end(state, reward)
        
    def update_step(self, next_state, reward):
        
        current_action = self.next_action
        current_tiles = self.next_tiles
        
        self.choose_action(next_state)
        
        target = reward + self.discount_factor * np.sum(self.w[self.next_action, self.next_tiles])\
        - np.sum(self.w[current_action, current_tiles])
        self.w[current_action, current_tiles] += self.learning_rate * target
        
    def update_end(self, state, reward):
        current_action = self.next_action
        current_tiles = self.next_tiles
        
        target = reward - np.sum(self.w[current_action][current_tiles])
        self.w[current_action][current_tiles] += self.learning_rate * target
        

In [31]:
position_boundaries = (env.observation_space.low[0], env.observation_space.high[0])
velocity_boundaries = (env.observation_space.low[1], env.observation_space.high[1])

agent_init = {
    'discount_factor': 0.9, 
    'learning_rate': 0.01, 
    'epsilon': 0.01,
    'num_action': env.action_space.n,
    'tile_coder': {
        'num_tiles': 8,
        'num_tilings': 20,
        'hash_size': 4096,
        'position_boundaries': position_boundaries, 
        'velocity_boundaries': velocity_boundaries
    }
}
sarsa_agent = SarsaAgent(agent_init)

In [32]:
timesteps_per_episode = list()
timesteps_per_episode_per_chunk = list()
trunk_size = 20
n_episode = 200
t_max_per_episode = 200

assert n_episode % trunk_size == 0, f"trunk_size ({trunk_size}) must be a divisor of n_episode ({n_episode})"

for i_episode in range(n_episode):
    state = env.reset()
    sarsa_agent.choose_action(state)
    for t in range(t_max_per_episode):
        env.render()
        state, reward, done, info = env.step(sarsa_agent.next_action)
        sarsa_agent.update(state, reward, done)
        if done:
            break
    if i_episode % trunk_size == 0:
        print(i_episode)
    timesteps_per_episode.append(t)
    
env.close()

0


KeyboardInterrupt: 

In [251]:
timesteps_per_episode = np.array(timesteps_per_episode)
timesteps_per_episode.reshape(trunk_size, int(n_episode/trunk_size)).mean(axis=0)

array([252.6 , 248.05, 272.85, 257.1 , 242.45, 237.55, 233.  , 216.3 ,
       244.1 , 217.2 ])

In [244]:
timesteps_per_episode[0:450].shapec#


(450,)

### Q-Learning Agent

In [13]:
class QLearningAgent():
    def __init__(self, agent_init):
        
        self.current_tiles = None
        self.current_action = None
        
        self.discount_factor = agent_init["discount_factor"]    
        self.learning_rate = agent_init["learning_rate"]
        self.epsilon = agent_init["epsilon"]
        
        self.w = np.ones((agent_init["num_action"], agent_init["tile_coder"]["hash_size"]))
        self.tile_coder = TileCoder(*agent_init["tile_coder"].values())
        
    def choose_action(self, state):
        
        if self.current_tiles is None:
            active_tiles = self.tile_coder.get_active_tiles(state)
        else:
            active_tiles = self.current_tiles
        
        action_values = np.zeros(env.action_space.n)
        for action in range(env.action_space.n):
            action_values[action] = np.sum(self.w[action][active_tiles])
            
        if np.random.rand() < self.epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(action_values)
            
        if self.current_tiles is None:
            self.current_tiles = np.copy(active_tiles)
            
        self.current_action = action
            
        return action
        
    def update(self, next_state, reward, done):

        if not done:
            self.update_step(next_state, reward)
        else:
            self.update_end(next_state, reward)
            
    def update_step(self, next_state, reward):
        
        next_active_tiles = self.tile_coder.get_active_tiles(next_state)
        
        target = reward + self.discount_factor * np.max(self.w[:, next_active_tiles], axis=0).sum() \
        - np.sum(self.w[self.current_action][self.current_tiles])
        self.w[self.current_action][self.current_tiles] += self.learning_rate * target
        
        self.current_tiles = next_active_tiles
        
    def update_end(self, state, reward):
        target = reward - np.sum(self.w[self.current_action][self.current_tiles])
        self.w[self.current_action][self.current_tiles] += self.learning_rate * target

In [14]:
position_boundaries = (env.observation_space.low[0], env.observation_space.high[0])
velocity_boundaries = (env.observation_space.low[1], env.observation_space.high[1])

agent_init = {
    'discount_factor': 0.9, 
    'learning_rate': 0.01, 
    'epsilon': 0.01,
    'num_action': env.action_space.n,
    'tile_coder': {
        'num_tiles': 8,
        'num_tilings': 20,
        'hash_size': 4096,
        'position_boundaries': position_boundaries, 
        'velocity_boundaries': velocity_boundaries
    }
}
q_learning_agent = QLearningAgent(agent_init)

In [15]:
timesteps_per_episode = list()
timesteps_per_episode_per_chunk = list()
trunk_size = 20
n_episode = 200
t_max_per_episode = 200

assert n_episode % trunk_size == 0, f"trunk_size ({trunk_size}) must be a divisor of n_episode ({n_episode})"

for i_episode in range(n_episode):
    state = env.reset()
    for t in range(t_max_per_episode):
        env.render()
        action = q_learning_agent.choose_action(state)
        next_state, reward, done, info = env.step(action)
        q_learning_agent.update(next_state, reward, done) 
        if done:
            break
    if i_episode % trunk_size == 0:
        print(i_episode)
    timesteps_per_episode.append(t)
    
env.close()

0
20
40
60
80
100
120
140
160
180


In [16]:
timesteps_per_episode = np.array(timesteps_per_episode)
timesteps_per_episode.reshape(trunk_size, int(n_episode/trunk_size)).mean(axis=0)

array([223.6 , 269.1 , 220.35, 273.15, 214.95, 210.85, 210.95, 211.55,
       211.2 , 222.9 ])

### Expected SARSA agent

In [23]:
class ExpectedSarsaAgent():
    def __init__(self, agent_init):
        
        self.current_tiles = None
        self.current_action = None
        
        self.discount_factor = agent_init["discount_factor"]    
        self.learning_rate = agent_init["learning_rate"]
        self.epsilon = agent_init["epsilon"]
        
        self.w = np.ones((agent_init["num_action"], agent_init["tile_coder"]["hash_size"]))
        self.tile_coder = TileCoder(*agent_init["tile_coder"].values())
        
    def choose_action(self, state):
        
        if self.current_tiles is None:
            active_tiles = self.tile_coder.get_active_tiles(state)
        else:
            active_tiles = self.current_tiles
        
        action_values = np.zeros(env.action_space.n)
        for action in range(env.action_space.n):
            action_values[action] = np.sum(self.w[action][active_tiles])
            
        if np.random.rand() < self.epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(action_values)
            
        if self.current_tiles is None:
            self.current_tiles = np.copy(active_tiles)
            
        self.current_action = action
            
        return action
        
    def update(self, next_state, reward, done):

        if not done:
            self.update_step(next_state, reward)
        else:
            self.update_end(next_state, reward)
            
    def update_step(self, next_state, reward):
        
        next_active_tiles = self.tile_coder.get_active_tiles(next_state)
        
        target = reward + self.discount_factor * np.mean(self.w[:, next_active_tiles].sum(axis=1)) \
        - np.sum(self.w[self.current_action][self.current_tiles])
        self.w[self.current_action][self.current_tiles] += self.learning_rate * target
        
        self.current_tiles = next_active_tiles
        
    def update_end(self, state, reward):
        target = reward - np.sum(self.w[self.current_action][self.current_tiles])
        self.w[self.current_action][self.current_tiles] += self.learning_rate * target

In [29]:
position_boundaries = (env.observation_space.low[0], env.observation_space.high[0])
velocity_boundaries = (env.observation_space.low[1], env.observation_space.high[1])

agent_init = {
    'discount_factor': 0.9, 
    'learning_rate': 0.01, 
    'epsilon': 0.01,
    'num_action': env.action_space.n,
    'tile_coder': {
        'num_tiles': 8,
        'num_tilings': 20,
        'hash_size': 4096,
        'position_boundaries': position_boundaries, 
        'velocity_boundaries': velocity_boundaries
    }
}
expected_sarsa_agent = ExpectedSarsaAgent(agent_init)

In [30]:
timesteps_per_episode = list()
timesteps_per_episode_per_chunk = list()
trunk_size = 20
n_episode = 400
t_max_per_episode = 200

assert n_episode % trunk_size == 0, f"trunk_size ({trunk_size}) must be a divisor of n_episode ({n_episode})"

for i_episode in range(n_episode):
    state = env.reset()
    for t in range(t_max_per_episode):
        env.render()
        action = expected_sarsa_agent.choose_action(state)
        next_state, reward, done, info = env.step(action)
        expected_sarsa_agent.update(next_state, reward, done) 
        if done:
            break
    if i_episode % trunk_size == 0:
        print(i_episode)
    timesteps_per_episode.append(t)
    
env.close()

0
20
40
60
80
100
120
140
160
180
200
220
240
260
280
300
320
340
360
380


In [26]:
timesteps_per_episode = np.array(timesteps_per_episode)
timesteps_per_episode.reshape(trunk_size, int(n_episode/trunk_size)).mean(axis=0)

array([197.7 , 192.6 , 198.15, 198.35, 197.05, 197.  , 196.45, 193.55,
       197.6 , 197.7 ])

In [33]:
env.close()