In [None]:
# imports for rendering to a virtual display (needed in docker container)
from pyvirtualdisplay import Display
d = Display(visible=0, size=(1400, 900))
d.start()

import gym
import math
import random
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Imports specifically so we can render outputs in Jupyter.
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display


The OpenAI Gym is a general purpose tool for developing, evaluating and comparing reinforcement learning algorithms (check out https://gym.openai.com/docs/ and https://github.com/openai/gym). The gym provides a common interface to a wide range of reinforcement learning tasks. Each task is instantiated as a python object called an "Environment." Each environment has an action space and an observation space. By calling the environment's "step" function, the user can repeatedly input new actions to the environment and get back rewards and a new observations.

The gym doesn't come with any reinforcement learning algorithm implementations, only the environment. Creating a learning agent is up to you! This gives you the flexibility to create an agent with standard python, or a machine learning library such as pyTorch.

In this lab, we will train a simple agent to perform the "cart-pole" task, in which it tries to balance a pole so that it doesn't fall from vertical for as long as possible. The available actions to the agent are to move the cart left or right. And the observations observed include the position x of the cart, the velocity of x, the angle $\theta$ of the pole, and the angular velocity of $\theta$. The agent gets a reward of 1 for each time step where the pole hasn't fallen too far from vertical. Running the rest of the cells in this notebook without modification will produce a video of an agent performing the cart-pole task with a random action policy. Each time the pole moves too far from vertical, the task will restart in a new episode.

In [None]:
env = gym.make('CartPole-v0')

In [None]:
env.action_space

In [None]:
env.observation_space

In this lab, we will use a "tabular" Q-learning approach, in which we divide the continuous (and thus infinite) observation space into a finite number of discrete states. We will iteratively update the Q-value table based on experience, with the goal of having each Q value represent the expected cumulative reward of taking a given action in a given state, and following the optimal policy thereafter. As a reminder, the update function of Q-learning is

$$
Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha (r_{t+1} + \max_{a}Q(s_{t+1}, a) - Q(s_t, a_t))
$$

(Since we are limiting the length of the task, we will assume no discounting.) More on Q-learning can be found at: http://incompleteideas.net/book/ebook/node65.html

In the homework, you will use a neural network to approximate the Q-value function over the continuous state space, rather than assuming a simple discretization. Discretization works for low-dimensional state spaces such as this one, but run into a "curse of dimensionality" for high-dimensional state spaces.

Our Q learning agent will take actions based on an $\epsilon$-greedy policy. This means that with probability $1-\epsilon$ the agent will take what it thinks to be the optimal action, and with probability $\epsilon$ it will take a random action instead. This type of policy ensures that the agent adequately explores the state space, and doesn't only learn about the action(s) it initially believes to be best. (see http://incompleteideas.net/book/ebook/node16.html for more information.)

Below is incomplete code for a Q-learning agent. Your task will be to complete `select_action` and `update_q_table` methods to create $\epsilon$-greedy Q learner.

(This code is modified from https://medium.com/@tuzzer/cart-pole-balancing-with-q-learning-b54c6068d947)

In [None]:
class QLearner:
    def __init__(self, env):
        # state representation size for x, x', theta, and theta'
        self.num_buckets = (3, 3, 6, 3)
        self.num_actions = env.action_space.n
        self.state_bounds = list(zip(env.observation_space.low, env.observation_space.high))
        self.state_bounds[1] = [-0.5, 0.5]
        self.state_bounds[3] = [-math.radians(50), math.radians(50)]
        self.q_table = np.zeros(self.num_buckets + (self.num_actions,))
        self.learning_rate = .5
        self.explore_rate = 1.0

    def select_action(self, obs):
        # store state for later update
        state = self.state_to_bucket(obs)
        
        # REPLACE THIS (takes random action)
        action = env.action_space.sample()
        # TODO: take random action with probability self.explore_rate,
        # and take optimal action otherwise.
        # Hint: optimal action can be found with np.argmax(self.q_table[state])
        

        
        #save state for later update
        self.last_state = state
        return action
    
    def update_q_table(self, obs, action, reward):
        new_state = self.state_to_bucket(obs)
        best_q = np.amax(self.q_table[new_state])
        old_val = self.q_table[self.last_state + (action,)]
        
        # REPLACE THIS
        self.q_table[self.last_state + (action,)] = old_val
        # TODO: use self.learning_rate to perform an update on the current state
        
        
    def state_to_bucket(self, obs):
        #this function turns continuous states into discrete 'binned' states,
        #letting us perform tabular Q learning
        bucket_indice = []
        for i in range(len(obs)):
            if obs[i] <= self.state_bounds[i][0]:
                bucket_index = 0
            elif obs[i] >= self.state_bounds[i][1]:
                bucket_index = self.num_buckets[i] - 1
            else:
                # Mapping the state bounds to the bucket array
                bound_width = self.state_bounds[i][1] - self.state_bounds[i][0]
                offset = (self.num_buckets[i]-1)*self.state_bounds[i][0]/bound_width
                scaling = (self.num_buckets[i]-1)/bound_width
                bucket_index = int(round(scaling*obs[i] - offset))
            bucket_indice.append(bucket_index)
        return tuple(bucket_indice)
    
    def update_learning_explore_rates(self, t):
        # gradually shrink explore and learning rate on later epochs
        self.explore_rate = max(.01, min(1, 1.0 - math.log10((t+1)/25)))
        self.learning_rate =  max(.1, min(0.5, 1.0 - math.log10((t+1)/25)))

In [None]:
def simulate(agent):
    ## Defining the simulation related constants
    NUM_EPISODES = 600
    STREAK_TO_END = 50
    SOLVED_T = 199
    num_streaks = 0
    time_step_list = []
    for episode in range(NUM_EPISODES):
        # Reset the environment
        obs = env.reset()
        agent.update_learning_explore_rates(episode)
        for t in range(SOLVED_T+1):
            #env.render()
            # Select an action
            action = agent.select_action(obs)
            # Execute the action and observe new state and reward
            obs, reward, done, _ = env.step(action)
            # Update the Q based on the result
            agent.update_q_table(obs, action, reward)
            if t == SOLVED_T:
                done = True
            if done:
                time_step_list.append(t)
                print("Episode %d finished after %f time steps" % (episode, t))
                if t == SOLVED_T:
                    num_streaks += 1
                else:
                    num_streaks = 0
                break
        # It's considered done when it's solved over 120 times consecutively
        if num_streaks > STREAK_TO_END:
            break
    return time_step_list

In [None]:
def make_video(agent, n_frames):
    num_episodes=100
    frames = []
    for i_episode in range(num_episodes):
        obs = env.reset()
        for t in range(200):
            # Render into buffer. 
            frames.append(env.render(mode = 'rgb_array'))
            if len(frames) > n_frames:
                break
            action = agent.select_action(obs)
            obs, reward, done, info = env.step(action)
            if done:
                break
        if len(frames) > n_frames:
            break
    return frames
def display_frames_as_gif(frames):
    """
    Displays a list of frames as a gif, with controls
    """
    #plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi = 72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames = len(frames), interval=50)
    display(display_animation(anim, default_mode='loop'))

In [None]:
agent = QLearner(env)
episode_lengths = simulate(agent)

In [None]:
plt.plot(range(len(episode_lengths)), episode_lengths)

In [None]:
display_frames_as_gif(make_video(agent, 200))

Task 1: Right now our QLearner just takes a random action and learns... nothing. Modify the `select_action` and `update_q_table` methods to turn our learner into an $\epsilon$-greed Q learner!

Task 2: This agent learns, but not as quickly as we like. Take a look at the agent performing the task. What seem to be the relevant environmental variables? How might we modify the state space to facilitate faster learning?