# Reinforcement Learning with OpenAI Gym
This notebook serves as a simple working example of how to perform RL with OpenAI's AI Gym.

## Environment Setup
1. Install swig: https://www.dev2qa.com/how-to-install-swig-on-macos-linux-and-windows/
2. Set up a python venv (optional):

`pip3 install virtualenv`

`python3 -m virtualenv venv`

`source venv/bin/activate`

3. Install required python packages:
`pip3 install gym==0.17.2 box2d-py==2.3.8`


In [1]:
import numpy as np
from numpy import pi
import gym

import matplotlib.pyplot as plt
%matplotlib inline

from operator import itemgetter

# for agent video MP4 rendering:
import io
import base64
from gym import wrappers
from IPython.display import HTML, display

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
from torch.utils.data import TensorDataset, DataLoader 
# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Reinforcement Learning
The general statement of an RL problem can be formulated as follows:

The probability $p_\theta$ of a play-out for a game composed of a sequence of state vectors $\textbf{s}_t$ and agent actions $\textbf{a}_t$ is factored into the policy vector $\pi_\theta(\textbf{a}_t|\textbf{s}_t)$ and the model $p(\textbf{s}_{t+1}|\textbf{s}_t, \textbf{a}_t)$:

$$
p_{\theta}(\textbf{s}_1,\textbf{a}_1,...,\textbf{s}_T,\textbf{a}_T)=p(\textbf{s}_1)\Pi_{t=1}^{T}\pi_{\theta}(\textbf{a}_t|\textbf{s}_t)p(\textbf{s}_{t+1}|\textbf{s}_t,\textbf{a}_t)
$$

There is additionally a reward, $r(\textbf{s}_t,\textbf{a}_t)$, given to the agent for each step in the game. The goal is to teach an agent to maximize this reward, i.e.

$$
max_{\theta}E_{p_{\theta}}[\Sigma_{t}r(\textbf{s}_t,\textbf{a}_t)]
$$

In model-free RL we ignore the model and teach our agent to maximize the reward based purely on the current state and possibly the agent's history (past states and actions).

In model-based RL our agent tries to learn a model which correctly predicts future rewards, so that the policy can be easily chosen to maximize the cumulative reward.

## Bipedal Walker
The following variables are used for defining the actions and states of the game "BipedalWalker-v3" from the OpenAI gym

BipedalWalker has 2 legs. Each leg has 2 joints. You can apply the torque on each of these joints in the range of (-1, 1)

The state of the game is given by 24 variables, described in more detail here: https://github.com/openai/gym/wiki/BipedalWalker-v2


In [2]:
STATE_SPACE = 24
ACTION_SPACE = 4
ENV = gym.make("BipedalWalker-v3")

# actions
Hip_1 = 0
Knee_1 = 1
Hip_2 = 2
Knee_2 = 3

# state
HULL_ANGLE = 0
HULL_ANGULAR_VELOCITY = 1
VEL_X = 2
VEL_Y = 3
HIP_JOINT_1_ANGLE = 4
HIP_JOINT_1_SPEED = 5
KNEE_JOINT_1_ANGLE = 6
KNEE_JOINT_1_SPEED = 7
LEG_1_GROUND_CONTACT_FLAG = 8
HIP_JOINT_2_ANGLE = 9
HIP_JOINT_2_SPEED = 10
KNEE_JOINT_2_ANGLE = 11
KNEE_JOINT_2_SPEED = 12
LEG_2_GROUND_CONTACT_FLAG = 13



Now we'll define the basic interface with the game. The game is essentially a very rapid turn-based game. In each round, there are 2 main steps:
1. An action is taken by the agent according to the agent's policy function.
2. The game updates according to its current state and the input action from the agent.


In [3]:
def disable_view_window():
    from gym.envs.classic_control import rendering
    org_constructor = rendering.Viewer.__init__

    def constructor(self, *args, **kwargs):
        org_constructor(self, *args, **kwargs)
        self.window.set_visible(visible=False)

    rendering.Viewer.__init__ = constructor
    
    
def reset_view_window():
    from gym.envs.classic_control import rendering
    """org_constructor = rendering.Viewer.__init__

    def constructor(self, *args, **kwargs):
        org_constructor(self, *args, **kwargs)
        self.window.set_visible(visible=True)

    rendering.Viewer.__init__ = constructor"""


class Agent():
    """
    The base class for all agent-based reinforcement learning.
    Provides default implementations for __init__() and play() methods.
    
    __init__() by default only requires a policy function, which selects the next action for the agent to take
    
    play() steps through the game using the policy function provided by the user to select an action at each step
    
    """
    
    
    def __init__(self, policy_function):
        self.policy = policy_function
    
    
    def play(self, env, steps=500, monitor=True, show=False, verbose=False, sample_flag=False, *args, **kwargs):        
        # When using wrapers.Monitor to create videos, 'show' is always forced. 
        # This block prevents that behaviour and restores 'show' functionality
        if not show:
            disable_view_window()  
        else:
            reset_view_window()
            
        if monitor:
            # Note force=True cleans the output folder (delete all files with prefix "openaigym.")
            env = wrappers.Monitor(ENV, "./gym-results", force=True)
            
        
        state = env.reset()
        
        if sample_flag:
            state_history = []
            action_history = []
            reward_history = []
            state_history.append(state)
        cumulative_score = 0
        if show:
            env.render()
        
        # Main loop of game iteration    
        for step in range(steps):
            if verbose:
                print("state:", state)
            
            action = self.policy(state, *args, **kwargs)
            
            if verbose:
                print("action:", action)
            
            state, reward, terminal, info = env.step(action)
            
            if terminal:
                break
            if sample_flag:
                state_history.append(state)
                action_history.append(action)
                reward_history.append(reward)
            if show:
                env.render()
                
            cumulative_score += reward
            
        steps_taken = step
        env.close()
        
        if sample_flag:
            X = np.array([[state_history[s], action_history[s]] for s in range(steps_taken)])
            Y = np.array([[state_history[s+1], reward_history[s]] for s in range(steps_taken)])
            return env, X, Y
        else:
            return env, cumulative_score
        
    def html_video(self, env):
        video = io.open('./gym-results/openaigym.video.%s.video000000.mp4' % env.file_infix, 'r+b').read()
        encoded = base64.b64encode(video)
        display(
            HTML(data='''
            <video width="360" height="auto" alt="test" controls><source src="data:video/mp4;base64,{0}" type="video/mp4" /></video>'''
            .format(encoded.decode('ascii'))
                ) 
        )
        return


In the BipedalWalker-v3 game, the agent gets a positive reward proportional to the distance walked on the terrain. It can get a total of 300+ reward all the way up to the end.

If agent tumbles, it gets a reward of -100.
There is some negative reward proportional to the torque applied on the joint so that agent learns to walk smoothly with minimal torque.

Let's define a couple of test policy functions to play with:

In [4]:
def random_policy(state):
    """This agent returns random actions."""
    return np.random.uniform(low=-1.0, high=1.0, size=4)


def stupid_policy(state):
    """A very simple expert system."""
    if state[LEG_1_GROUND_CONTACT_FLAG] == 1 and state[LEG_2_GROUND_CONTACT_FLAG] == 1:
        return np.random.uniform(low=-1.0, high=1.0, size=4)
    if state[KNEE_JOINT_1_SPEED] < -0.5:
        return np.array([-0.05, -0.2, 0., 0.])
    if state[KNEE_JOINT_2_SPEED] < -0.5:
        return np.array([0., 0., -0.05, -0.2])
    return np.random.uniform(low=-1.0, high=1.0, size=4)

In [5]:
my_agent = Agent(random_policy)
env, _ = my_agent.play(ENV, steps=100, show=True, verbose=False)
my_agent.html_video(env)

## Model-free RL

This section will provide a simple template for model-free reinforcement learning.

The goal in model-free RL is to sample game iterations and thereby train a model which correctly predicts the future reward given the current state and candidate action. The 'future reward' may be the reward in the next step of the game, or the cumulative reward over many future steps.

The choice of action can then be as simple as choosing the action which leads to the largest future reward.

Below, we define a basic ModelFreeAgent class which can serve as a template for creating your own custom model-free agent.

In [6]:
class ModelFreeAgent(Agent):
    """
    Template for a model-free RL agent. 
    
    To instantiate an agent, the user must provide the following functions:
    
    policy_function(): A method for choosing what action to take next. This can be a selection among a  discrete set
                       of actions (as in the probability of each legal move for a current board position),
                       or a function which chooses an action vector by some heuristic (applied when the action space 
                       is a continuous-valued vector with like the four torques for our Bipedal Walker).
    
    action_generator(): A method for generating candidate actions. This can be deterministic (generate *all* legal moves
                        for a given chess board position) or probabilistic (generate candidate actions by sampling from
                        some - possibly learned - distribution over the action space)
                           
    reward_predictor(): A method for predicting the reward for a given state:action pair (s_t, a_t)
    """
    
    def __init__(self, policy_function, action_generator, reward_predictor):
        self.policy = policy_function
        self.action_generator = action_generator
        self.reward_predictor = reward_predictor
        

In [7]:
def model_free_policy(current_state, *args, **kwargs):
    """
    The model-free agent chooses an action by generating candidate actions, predicting the future reward for each
    candidate action, and then applying its policy function to select among the action:reward pairs. In this example
    the policy is simply to choose the action which maximizes the predicted reward in the next time step.

    Returns the next action to be input to the environment.
    """
    action_generator = kwargs['action_generator']
    reward_predictor = kwargs['reward_predictor']
    
    # Generate a list of candidate actions
    candidate_actions =  action_generator(current_state)
    # Predict the future reward for each candidate action
    action_reward_pairs = []
    for action in candidate_actions:
        reward = reward_predictor(current_state, action)
        action_reward_pairs.append((action, reward))
    # Apply the policy function to the list of action:reward pairs. 
    # In this case, the policy is to choose the maximum predicted reward for the next time step.
    best_action = max(action_reward_pairs, key=itemgetter(1))[0]

    return best_action

        
def model_free_action_generator(state):
    """
    The action generator for our model-free agent. In this example, the function generates four random action vectors.
    
    (This code is a template which should be replaced for optimal agent performance)
    """
    return np.random.uniform(low=-1.0, high=1.0, size=(4,4))


def model_free_reward_predictor(current_state, candidate_action):
    """
    The function to predict the reward for a given environment state and candidate action.
    
    (This code is a template which should be replaced for optimal agent performance)
    """
    return np.random.uniform(low=-100.0, high=10.0)

Let's test out our template model-free agent. We expect it to do about as well as the random agent from above, since the candidate actions and predicted rewards are providing absolutely zero additional information to the agent.

In [8]:
my_model_free_agent = ModelFreeAgent(model_free_policy, model_free_action_generator, model_free_reward_predictor)

env, _ = my_model_free_agent.play(ENV,
                                  steps=100,
                                  show=True,
                                  action_generator=my_model_free_agent.action_generator,
                                  reward_predictor=my_model_free_agent.reward_predictor)
my_model_free_agent.html_video(env)

## Model-based RL

This section will provide a simple template for model-based reinforcement learning.

The goal in model-based RL is to sample game iterations and thereby train a model which correctly predicts the future environment state and corresponding reward, given the current state and candidate action. 

The main advantage of model-based RL is that a model for the environment's evolution allows the agent to predict the cumulative reward over many future steps. This allows the agent to 'plan ahead', hopefully leading to a more successful policy.

The choice of action can then be as simple as choosing the action which leads to the largest cumulative future reward.

Below, we define a basic ModelBasedAgent class which can serve as a template for creating your own custom model-based agent.

In [9]:
class ModelBasedAgent(Agent):
    """
    Template for a model-based RL agent. 
    
    To instantiate an agent, the user must provide the following functions:
    
    policy_function(): A method for choosing what action to take next. This can be a selection among a  discrete set
                       of actions (as in the probability of each legal move for a current board position),
                       or a function which chooses an action vector by some heuristic (applied when the action space 
                       is a continuous-valued vector with like the four torques for our Bipedal Walker).
                       
 
    
    action_generator(): A method for generating candidate actions. This can be deterministic (generate *all* legal moves
                        for a given chess board position) or probabilistic (generate candidate actions by sampling from
                        some - possibly learned - distribution over the action space)
                        
    environment_model(): A method for predicting the next environment state given the current environment state and the 
                         candidate action. 
                         
    reward_predictor(): A method for predicting the reward for a given state:action pair (s_t, a_t)
    """
    
    def __init__(self, policy_function, action_generator, environment_model, reward_predictor):
        self.policy = policy_function
        self.action_generator = action_generator
        self.environment_model = environment_model        
        self.reward_predictor = reward_predictor
        

In [10]:
def model_based_policy(current_state, *args, **kwargs):
    """
    The model-based agent chooses an action by iteratively generating candidate actions and the corresponding
    predicted future environment state, up to some depth, and then predicting the reward for each
    candidate action in each trajectory. 
    The agent then applies its policy function to select the action which is expected to maximize the future
    cumulative reward. 
    
    In this example the policy is simply to choose the action which maximizes the predicted cumulative reward
    over the next <depth> number of steps. That is, the action which leads to the largest total reward across
    all subsequent steps.
    
    This implementation generates the default number of candidate trajectories from the action_generator method
    (say, <C> candidate trajectories) at each step, leading to <C>**<depth> random sample trajectories to compare.

    Returns the next action to be input to the environment.
    """
    action_generator = kwargs['action_generator']
    environment_model = kwargs['environment_model']
    reward_predictor = kwargs['reward_predictor']
    depth = kwargs['depth']
    discount = kwargs['discount']
    
    assert 0.0 < discount < 1.0
        
    # Generate a list of randomly sampled game trajectories
    # Each node in the search tree contains a tuple of [state, action reward]
    sampled_trajectories = {"0": [current_state, 0, 0]}
    for d in range(depth):
        keys_to_expand = [k for k in sampled_trajectories.keys() if k[-1]==str(d)]

        for key in keys_to_expand:
            state = sampled_trajectories[key][0]
            candidate_actions =  action_generator(state)
            
            for num, action in enumerate(candidate_actions):
                new_state = environment_model(state, action)
                reward = reward_predictor(current_state, action)
                new_key = key + str(num)
                sampled_trajectories.update({new_key: [new_state, action, reward]})
                # Now add the discounted reward from the candidate action to the parent node in the search tree
                sampled_trajectories[key][2] += discount**d * reward
                
    # Apply the policy function to the sampled trajectories. 
    # In this case, the policy is to choose the maximum cumulative predicted reward at depth 1.
    action_reward_pairs = [(el[1], el[2]) for key, el in sampled_trajectories.items() if key[-1]=="1"]
    best_action = max(action_reward_pairs, key=itemgetter(1))[0]

    return best_action

        
def model_based_action_generator(state):
    """
    The action generator for our model-free agent. In this example, the function generates four random action vectors.
    
    (This code is a template which should be replaced for optimal agent performance)
    """
    return np.random.uniform(low=-1.0, high=1.0, size=(4,4))


def model_based_environment_model(current_state, candidate_action):
    """
    The environment model for our model-based agent. In this example, the function generates a random state vector.
    
    (This code is a template which should be replaced for optimal agent performance)
    """
    return ENV.observation_space.sample()


def model_based_reward_predictor(current_state, candidate_action):
    """
    The function to predict the reward for a given environment state and candidate action.
    
    (This code is a template which should be replaced for optimal agent performance)
    """
    return np.random.uniform(low=-100.0, high=10.0)

Let's test out our template model-based agent. We expect it to do about as well as the random agent from above, since the candidate actions and predicted rewards are providing absolutely zero additional information to the agent.

In [11]:
# OLD
my_model_based_agent = ModelBasedAgent(model_based_policy, model_based_action_generator,
                                       model_based_environment_model, model_based_reward_predictor)

env, _ = my_model_based_agent.play(ENV,
                                   steps=100,
                                   show=True,
                                   action_generator=my_model_based_agent.action_generator,
                                   environment_model=my_model_based_agent.environment_model,
                                   reward_predictor=my_model_based_agent.reward_predictor,
                                   depth=3,
                                   discount=0.5)
my_model_based_agent.html_video(env)

##  RL Training

Now we will build a training loop for our reward predictor and environment model. This will require us to sample many independent tuples of 

(state, action) -> (new_state, reward)

We can do this with the Agent method play using the sample_flag

In [12]:
env, x_raw, y_raw = my_model_free_agent.play(ENV,
                                             steps=100,
                                             sample_flag=True,
                                             action_generator=my_model_free_agent.action_generator,
                                             reward_predictor=my_model_free_agent.reward_predictor)

# For each sample, concatenate the state and action vectors into a single vector of 
# length (STATE_SPACE + ACTION_SPACE)
x_cat = [np.concatenate((x_raw[r,0], x_raw[r,1])) for r in range(len(x_raw))]

# For each sample, concatenate the new state and reward into a single vector of 
# length STATE_SPACE + 1 
y_cat = [np.concatenate((y_raw[r,0], [y_raw[r,1]])) for r in range(len(y_raw))]

# Define Pytorch tensors which will be used for training
X = torch.tensor(x_cat, dtype=torch.float32)
Y = torch.tensor(y_cat, dtype=torch.float32)
Y_reward = torch.unsqueeze(Y[:,-1], 1)
Y_new_state = Y[:,0:-1]

Now let's make a basic feedforward neural network to predict the reward given a state and an action.

In [13]:
class RewardNet(nn.Module):

    def __init__(self):
        super().__init__()
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(STATE_SPACE + ACTION_SPACE, 16)  # 16 node feedforward layer
        self.fc2 = nn.Linear(16, 4)
        self.fc3 = nn.Linear(4, 1)

    def forward(self, x):
        # Defines the activation functions. Pytorch will automatically build the backward method.
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    def num_flat_features(self, x):
        size = x.size()[1:]  # all dimensions except the batch dimension
        num_features = 1
        for s in size:
            num_features *= s
        return num_features


test_reward_predictor = RewardNet()
print(test_reward_predictor)

RewardNet(
  (fc1): Linear(in_features=28, out_features=16, bias=True)
  (fc2): Linear(in_features=16, out_features=4, bias=True)
  (fc3): Linear(in_features=4, out_features=1, bias=True)
)


Below, I build a PyTorch dataset object, which is just to make sure that our data interfaces with various PyTorch classes nicely. I also define the loss function to be mean square error implemented in PyTorch.

In [14]:
reward_dataset = TensorDataset(X, Y_reward)
sample = reward_dataset.__getitem__(0)
print(sample)
criterion = nn.MSELoss() # this is an instantiation of a class defined in the PyTorch nn library
loss = criterion(test_reward_predictor(sample[0]), sample[1])
print(loss)

(tensor([ 2.7475e-03, -5.6576e-06,  4.4011e-04, -1.6000e-02,  9.2073e-02,
        -5.8079e-04,  8.6020e-01,  1.8324e-03,  1.0000e+00,  3.2479e-02,
        -5.8075e-04,  8.5375e-01,  4.1074e-04,  1.0000e+00,  4.4081e-01,
         4.4582e-01,  4.6142e-01,  4.8955e-01,  5.3410e-01,  6.0246e-01,
         7.0915e-01,  8.8593e-01,  1.0000e+00,  1.0000e+00, -6.1842e-01,
        -9.2849e-01,  4.1988e-01, -8.6329e-01]), tensor([-0.1544]))
tensor(0.2984, grad_fn=<MseLossBackward>)


Backpropogation is easily performed in PyTorch using the backprop method. First, clear the gradient buffer. This must be done because the backprop method accumulates gradients from existing gradients by default.

In [15]:
test_reward_predictor.zero_grad()

print('fc1.bias.grad before backward')
print(test_reward_predictor.fc1.bias.grad)

loss.backward()

print('fc1.bias.grad after backward')
print(test_reward_predictor.fc1.bias.grad)

fc1.bias.grad before backward
None
fc1.bias.grad after backward
tensor([ 0.1484,  0.0000,  0.0171,  0.0000,  0.0000,  0.0177, -0.0366,  0.0000,
         0.0604,  0.0114,  0.0000,  0.1068, -0.0550,  0.0000,  0.0000,  0.0000])


Now let's combine everything we've seen into a training function that we can call to train our model.

In [16]:
def train(net, data, batch_size, epochs):
    # Select optimizing algorithm
    optimizer = optim.Adam(net.parameters(), lr=0.01)
    
    # Build the dataset into a DataLoader 
    dl = DataLoader(data, batch_size=batch_size)
    
    # training loop:
    for epoch in range(epochs):
        batch, target = next(iter(dl))
        optimizer.zero_grad()   # zero the gradient buffers
        output = net(batch)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()    # Does the update

 # Watch trained agent

In [17]:
# TODO
"""
env, _ = my_trained_agent.play(ENV, steps=100, ...)
my_trained_agent.html_video(env)
"""


'\nenv, _ = my_trained_agent.play(ENV, steps=100, ...)\nmy_trained_agent.html_video(env)\n'

# References

- https://stackoverflow.com/questions/52726475/display-openai-gym-in-jupyter-notebook-only  mp4 visualization trick
- https://designrl.github.io/ interesting extension