Mountain car source code: https://github.com/openai/gym/blob/master/gym/envs/classic_control/mountain_car.py

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import gym
import time
from tqdm import tqdm
import rl.features.tile_coding as tc
from rl.animations.video import write_video, show_video, simulate_episode

np.random.seed(42)

# Mountain Car environment

In [None]:
""" 
Observation:
    Type: Box(2)
    Num    Observation               Min            Max
    0      Car Position              -1.2           0.6
    1      Car Velocity              -0.07          0.07
    
Actions:
    Type: Discrete(3)
    Num    Action
    0      Accelerate to the Left
    1      Don't accelerate
    2      Accelerate to the Right
    Note: This does not affect the amount of velocity affected by the
    gravitational pull acting on the car.
    
Reward:
     Reward of 0 is awarded if the agent reached the flag (position = 0.5)
     on top of the mountain.
     Reward of -1 is awarded if the position of the agent is less than 0.5.
     
Starting State:
     The position of the car is assigned a uniform random value in
     [-0.6 , -0.4].
     The starting velocity of the car is always assigned to 0.
     
Episode Termination:
     The car position is more than 0.5
     Episode length is greater than 200
"""


env = gym.make("MountainCar-v0").env
setattr(env, 'x_bottom', -0.5236)

# Agent

In [None]:
class Agent:
    
    def __init__(self, env, step_size=0.01, epsilon=0.1, gamma=1, num_tilings=8, num_tiles=8, iht_size=4096):
        self.action_space = env.action_space
        self.gamma = gamma
        self.step_size = step_size
        self.epsilon = epsilon
        
        self.last_action = None
        self.last_action_value = None
        self.last_state = None
        self.last_features = None
        
        self.iht_size = iht_size
        self.num_tilings = num_tilings
        self.num_tiles = num_tiles
        self.iht = tc.IHT(iht_size)
        
        self.action_weights = [np.zeros(self.iht_size) for i in range(self.action_space.n)]
    
    def state_to_tiles(self, state):
        position, velocity = state
        position_scaled = ((position + 1.2) / 1.7) * self.num_tiles
        velocity_scaled = ((velocity + 0.07) / 0.14) * self.num_tiles
        return np.array(tc.tiles(self.iht, self.num_tilings, [position_scaled, velocity_scaled]))
    
    @staticmethod
    def argmax(q_values):
        max_value = np.max(q_values) - 1e-3
        max_indices = np.where(q_values >= max_value)[0]
        return np.random.choice(max_indices)
    
    def select_action(self, features):
        action_values = self.get_all_action_values(features)
        if np.random.uniform() < self.epsilon:
            action = self.action_space.sample()
        else:
            action = self.argmax(action_values)
        return action, action_values[action]
    
    def agent_start(self, state):
        features = self.state_to_tiles(state)
        self.last_action, self.last_action_value = self.select_action(features)
        self.last_state = state
        self.last_features = features
        return self.last_action
    
    def agent_step(self, reward, state):
        features = self.state_to_tiles(state)
        action, action_value = self.select_action(features)
        # update weights
        td_error = reward + self.gamma * action_value - self.last_action_value
        self.action_weights[self.last_action][self.last_features] += self.step_size * td_error  # *1 for the gradient officially
        # select next action
        self.last_action = action
        self.last_action_value = action_value
        self.last_state = state
        self.last_features = features
    
    def agent_end(self, reward):
        # update weights
        action_value = 0
        td_error = reward + self.gamma * action_value - self.last_action_value
        self.action_weights[self.last_action][self.last_features] += self.step_size * td_error  # *1 for the gradient officially
    
    def get_action_value(self, features, action):
        return sum(self.action_weights[action][features])
    
    def get_all_action_values(self, features):
        action_values = []
        for a in range(self.action_space.n):
            action_values.append(self.get_action_value(features, a))
        return action_values
    
    def render_visitations(self, env, observations_arr):
        # prepare pandas data frame
        states = pd.DataFrame(data=observations_arr, columns=['position', 'velocity'])
        position_linspace = np.linspace(-1.2, 0.6, 181)
        velocity_linspace = np.linspace(-0.07, 0.07, 141)
        states['position_bin'] = pd.IntervalIndex(pd.cut(states['position'], position_linspace)).mid
        states['velocity_bin'] = pd.IntervalIndex(pd.cut(states['velocity'], velocity_linspace)).mid
        state_visitation_counts = states.groupby(['position_bin', 'velocity_bin'])['position'].count().unstack(fill_value=0)
        state_visitation_counts.index = np.round(state_visitation_counts.index, 3)
        state_visitation_counts.columns = np.round(state_visitation_counts.columns, 3)
        # create heatmap of visitation count
        fig, ax = plt.subplots(figsize=(18, 9))
        sns.heatmap(np.log(state_visitation_counts + 1).T.sort_index(ascending=False), ax=ax)
        ax.set_title("Natural log state visitation counts", fontsize=20)
        ax.set_xlabel("position", fontsize=16)
        ax.set_ylabel("velocity", fontsize=16)
        x_scaled = (env.x_bottom - state_visitation_counts.index[0]) / (state_visitation_counts.index[-1] - state_visitation_counts.index[0])
        x_scaled *= (ax.get_xlim()[1] - ax.get_xlim()[0])
        ax.axvline(x_scaled, color='white', lw=1)
        ax.text(x_scaled+1, ax.get_ylim()[0]-1, "bottom of valley", fontsize=14, color='white', ha='center', va='bottom')
        
    def render_state_values(self):
        # create data frames with state values and top action
        position_linspace = np.linspace(-1.2, 0.6, 181)
        velocity_linspace = np.linspace(-0.07, 0.07, 141)
        df_state_values = pd.DataFrame(index=position_linspace, columns=velocity_linspace, dtype='float32')
        df_top_action = pd.DataFrame(index=position_linspace, columns=velocity_linspace)
        for i in position_linspace:
            for j in velocity_linspace:
                features = self.state_to_tiles(np.array([i,j]))
                action_values = self.get_all_action_values(features)
                top_action = self.argmax(action_values)
                df_top_action.loc[i, j] = top_action
                df_state_values.loc[i, j] = action_values[top_action]
        df_top_action.index = np.round(df_top_action.index, 3)
        df_top_action.columns = np.round(df_top_action.columns, 3)
        df_state_values.index = np.round(df_state_values.index, 3)
        df_state_values.columns = np.round(df_state_values.columns, 3)
        # create heatmaps of state values and top action
        fig, [ax1, ax2] = plt.subplots(2, 1, figsize=(18, 18))
        sns.heatmap(df_state_values.T.sort_index(ascending=False), ax=ax1)
        ax1.set_title("State value estimates", fontsize=20)
        sns.heatmap(df_top_action.astype(int).T.sort_index(ascending=False), ax=ax2)
        ax2.set_title("Action from top action value", fontsize=20)

# Helper functions

In [None]:
def episode(env, agent):
    """Run one (training) episode.
    """
    step_nr = 0
    last_observation = env.reset()
    terminal = False
    cumulative_reward = 0
    state_list = []
    agent.agent_start(last_observation.reshape((2,1)))
    while not terminal and step_nr < 300:
        observation, reward, terminal, info = env.step(agent.last_action)
        agent.agent_step(reward, observation.reshape((2,1)))
        cumulative_reward += reward
        state_list.append(observation)
        step_nr += 1
    agent.agent_end(reward)
    return cumulative_reward, state_list


def run_experiment(env, agent):
    reward_list = []
    state_lists = []
    for i in tqdm(range(1000)):
        episode_reward, state_list = episode(env, agent)
        reward_list.append(episode_reward)
        state_lists.append(state_list)
    return reward_list, state_lists


def animate_episode(env, agent):
    if agent == 'random':
        action_fnc = lambda x: env.action_space.sample()
    elif agent == 'right':
        action_fnc = lambda x: 2
    else:
        action_fnc = lambda x: agent.select_action(agent.state_to_tiles(x))[0]
    obs = env.reset()
    terminal = False
    i = 0
    while not terminal and i < 500:
        obs, _, terminal, _ = env.step(action_fnc(obs))
        env.render()
        i += 1
        time.sleep(1/30)
    env.close()

# Example episodes

In [None]:
# random agent
simulate_episode(env, lambda x: env.action_space.sample(), max_steps=250, width=500, play_type='autoplay')

In [None]:
# agent that only goes right
simulate_episode(env, lambda x: 2, max_steps=300, width=500, play_type='autoplay')

# Start the experiment

In [None]:
agent = Agent(env, step_size=0.01, epsilon=0.01)

In [None]:
rewards, observations = run_experiment(env, agent)
observations = np.array([s for lst in observations for s in lst])

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))
plt.plot(pd.Series(rewards).rolling(window=20).mean())
ax.set_title("Rolling mean of total reward per episode", fontsize=20)
ax.set_xlabel("Episode Number", fontsize=16)
ax.set_ylabel("Reward per Episode", fontsize=16);

In [None]:
# trained agent
simulate_episode(env, lambda x: agent.select_action(agent.state_to_tiles(x))[0], max_steps=300, width=500, play_type='autoplay')

In [None]:
agent.render_visitations(env, observations)

In [None]:
agent.render_state_values()