## Actor-Critic methods

Actor-Critic methods are temporal difference (TD) learning methods that represent the policy function independent of the value function.

A policy function (or policy) returns a probability distribution over actions that the agent can take based on the given state. A value function determines the expected return for an agent starting at a given state and acting according to a particular policy forever after.

In the Actor-Critic method, the policy is referred to as the actor that proposes a set of possible actions given a state, and the estimated value function is referred to as the critic, which evaluates actions taken by the actor based on the given policy.

In this tutorial, both the Actor and Critic will be represented using one neural network with two outputs.

<b>Temporal Difference Learning</b>

https://en.wikipedia.org/wiki/Temporal_difference_learning

In [1]:
import collections
import gym
import numpy as np
import tensorflow as tf
import tqdm

from matplotlib import pyplot as plt
from tensorflow.keras import layers
from typing import Any, List, Sequence, Tuple

# Create an Environment 
env = gym.make('CartPole-v0')

# Set Seed for experimental reproducibility
seed = 42
env.seed(seed)
tf.random.set_seed(seed)
np.random.seed(seed)

# Small epsilon value for stablizing divison operations
eps = np.finfo(np.float32).eps.item()

## Model
The Actor and Critic will be modeled using one neural network that generates the action probabilities and critic value respectively. We use model subclassing to define the model.

During the forward pass, the model will take in the state as the input and will output both action probabilities and critic value V which models the state-dependent value function. The goal is to train a model that chooses actions based on a policy  that maximizes expected return.

For Cartpole-v0, there are four values representing the state: cart position, cart-velocity, pole angle and pole velocity respectively. The agent can take two actions to push the cart left (0) and right (1) respectively.

Refer to OpenAI Gym's CartPole-v0 wiki page for more information.

https://spinningup.openai.com/en/latest/spinningup/rl_intro.html#value-functions

https://spinningup.openai.com/en/latest/spinningup/rl_intro.html#reward-and-return

In [2]:
class ActorCritic(tf.keras.Model):
    '''Combined Actor-Critic Network'''
    
    def __init__(self, num_actions: int, num_hidden_units: int):
        '''Initialize'''
        super().__init__()
        
        self.common = layers.Dense(num_hidden_units, activation='relu')
        self.actor = layers.Dense(num_actions)
        self.critic = layers.Dense(1)
        
    def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
        x = self.common(inputs)
        return self.actor(x), self.critic(x)
        

In [3]:
num_actions = env.action_space.n # 2
num_hidden_units = 128

model = ActorCritic(num_actions, num_hidden_units)

## Training
To train the agent, you will follow these steps:

* Run the agent on the environment to collect training data per episode.
* Compute expected return at each time step.
* Compute the loss for the combined actor-critic model.
* Compute gradients and update network parameters.
* Repeat 1-4 until either success criterion or max episodes has been reached.

### 1. Collecting training data
As in supervised learning, in order to train the actor-critic model, we need to have training data. However, in order to collect such data, the model would need to be "run" in the environment.

We collect training data for each episode. Then at each time step, the model's forward pass will be run on the environment's state in order to generate action probabilities and the critic value based on the current policy parameterized by the model's weights.

The next action will be sampled from the action probabilities generated by the model, which would then be applied to the environment, causing the next state and reward to be generated.

This process is implemented in the <code>run_episode</code> function, which uses TensorFlow operations so that it can later be compiled into a TensorFlow graph for faster training. Note that <code>tf.TensorArrays</code> were used to support Tensor iteration on variable length arrays.

In [4]:
# Wrap OpenAI's Gym `env.step` call as an operation in Tensorflow function
# This allows it to be included in a callable Tensorflow Graph

def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    '''Returns state, reward and done flag given an action'''
    
    state, reward, done, _ = env.step(action)
    return (state.astype(np.float32),
            np.array(reward, np.int32),
            np.array(done, np.int32))

def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
    return tf.numpy_function(env_step, [action], [tf.float32, tf.int32, tf.int32])

In [5]:
def run_episode(initial_state: tf.Tensor, model: tf.keras.Model, max_steps: int) -> List[tf.Tensor]:
    '''Runs a Single Episode to collect Training Data'''
    
    action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)
    
    initial_state_shape = initial_state.shape
    state = initial_state
    
    for t in tf.range(max_steps):
        # Convert state into a batched Tensor (batch_size = 1)
        state = tf.expand_dims(state, 0)
        
        # Run model and to get action probs and critic value
        action_logits_t, value = model(state)
        
        # Sample next action from the action probability distribution
        action = tf.random.categorical(action_logits_t, 1)[0, 0]
        action_probs_t = tf.nn.softmax(action_logits_t)
        
        # Store critic values
        values = values.write(t, tf.squeeze(value))
        
        # Store log probability of the action choosen
        action_probs = action_probs.write(t, action_probs_t[0, action])
        
        # Apply action to the environment to get next state and reward
        state, reward, done = tf_env_step(action)
        state.set_shape(initial_state_shape)
        
        # Store reward
        rewards = rewards.write(t, reward)
        
        if tf.cast(done, tf.bool):
            break
    
    action_probs = action_probs.stack()
    values = values.stack()
    rewards = rewards.stack()

    return action_probs, values, rewards

### 2. Computing expected returns

In [6]:
# Computing Expected Returns
def get_expected_return(rewards: tf.Tensor, gamma: float, standardize: bool = True) -> tf.Tensor:
    '''Compute expected return per timestamp'''
    n = tf.shape(rewards)[0]
    returns = tf.TensorArray(dtype=tf.float32, size=n)
    
    # Start from the end of `rewards` and accumulate reward sums
    # into the `returns` array
    rewards = tf.cast(rewards[::-1], dtype=tf.float32)
    discounted_sum = tf.constant(0.0)
    discounted_sum_shape = discounted_sum.shape
    for i in tf.range(n):
        reward = rewards[i]
        discounted_sum = reward + gamma * discounted_sum
        discounted_sum.set_shape(discounted_sum_shape)
        returns = returns.write(i, discounted_sum)
    returns = returns.stack()[::-1]

    if standardize:
        returns = ((returns - tf.math.reduce_mean(returns)) / 
                   (tf.math.reduce_std(returns) + eps))

    return returns

### 3. The actor-critic loss
Since we are using a hybrid actor-critic model, we will use loss function that is a combination of actor and critic losses for training, as shown below:

<b>Critic loss</b>

Training  to be as close possible to  can be set up as a regression problem with the following loss function:

where  is the Huber loss, which is less sensitive to outliers in data than squared-error loss.

In [7]:
hubber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

def compute_loss(
    action_probs: tf.Tensor,
    values: tf.Tensor,
    returns: tf.Tensor) -> tf.Tensor:
    '''Computes the combined action-critic loss'''
    
    advantage = returns - values
    
    action_log_probs = tf.math.log(action_probs)
    actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)
    
    critic_loss = hubber_loss(values, returns)
    
    return actor_loss + critic_loss

### 4. Defining the training step to update parameters
We combine all of the steps above into a training step that is run every episode. All steps leading up to the loss function are executed with the <code>tf.GradientTape</code> context to enable automatic differentiation.

We use the Adam optimizer to apply the gradients to the model parameters.

We also compute the sum of the undiscounted rewards, episode_reward, in this step which would be used later on to evaluate if we have met the success criterion.

We apply the <code>tf.function</code> context to the <code>train_step</code> function so that it can be compiled into a callable TensorFlow graph, which can lead to 10x speedup in training.

In [8]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

@tf.function
def train_step(
    initial_step: tf.Tensor,
    model: tf.keras.Model, 
    optimizer: tf.keras.optimizers.Optimizer,
    gamma: float,
    max_steps_per_episode: int) -> tf.Tensor:
    
    with tf.GradientTape() as tape:
        # Run the model for one episode to collect training data
        action_probs, values, rewards = run_episode(initial_state, model, max_steps_per_episode)
        
        # Calculate expected returns
        returns = get_expected_return(rewards, gamma)

        # Convert training data to appropriate TF tensor shapes
        action_probs, values, returns = [
            tf.expand_dims(x, 1) for x in [action_probs, values, returns]] 

        # Calculating loss values to update our network
        loss = compute_loss(action_probs, values, returns)
    
    # Compute the gradients from the loss
    grads = tape.gradient(loss, model.trainable_variables)

    # Apply the gradients to the model's parameters
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    episode_reward = tf.math.reduce_sum(rewards)

    return episode_reward

In [10]:
# Run the Training Loop


max_episodes = 10000
max_steps_per_episode = 1000

# Cartpole-v0 is considered solved if average reward is >= 195 over 100 
# consecutive trials
reward_threshold = 195
running_reward = 0

# Discount factor for future rewards
gamma = 0.99

with tqdm.trange(max_episodes) as t:
    for i in t:
        initial_state = tf.constant(env.reset(), dtype=tf.float32)
        episode_reward = int(train_step(
            initial_state, model, optimizer, gamma, max_steps_per_episode))

        running_reward = episode_reward*0.01 + running_reward*.99

        t.set_description(f'Episode {i}')
        t.set_postfix(
            episode_reward=episode_reward, running_reward=running_reward)

        # Show average episode reward every 10 episodes
        if i % 10 == 0:
            pass # print(f'Episode {i}: average reward: {avg_reward}')

        if running_reward > reward_threshold:  
            break

print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')

Episode 5780:  58%|█████████████▎         | 5780/10000 [17:44<12:57,  5.43it/s, episode_reward=200, running_reward=195]


Solved at episode 5780: average reward: 195.01!





## Visualization
After training, it would be good to visualize how the model performs in the environment. You can run the cells below to generate a GIF animation of one episode run of the model. Note that additional packages need to be installed for OpenAI Gym to render the environment's images correctly in Colab.

In [13]:
# Render an episode and save as a GIF file

from IPython import display as ipythondisplay
from PIL import Image
from pyvirtualdisplay import Display


display = Display(visible=0, size=(400, 300))
display.start()


def render_episode(env: gym.Env, model: tf.keras.Model, max_steps: int): 
  screen = env.render(mode='rgb_array')
  im = Image.fromarray(screen)

  images = [im]

  state = tf.constant(env.reset(), dtype=tf.float32)
  for i in range(1, max_steps + 1):
    state = tf.expand_dims(state, 0)
    action_probs, _ = model(state)
    action = np.argmax(np.squeeze(action_probs))

    state, _, done, _ = env.step(action)
    state = tf.constant(state, dtype=tf.float32)

    # Render screen every 10 steps
    if i % 10 == 0:
      screen = env.render(mode='rgb_array')
      images.append(Image.fromarray(screen))

    if done:
      break

  return images


# Save GIF image
images = render_episode(env, model, max_steps_per_episode)
image_file = 'cartpole-v0.gif'
# loop=0: loop forever, duration=1: play each frame for 1ms
images[0].save(
    image_file, save_all=True, append_images=images[1:], loop=0, duration=1)


EasyProcessError: start error <EasyProcess cmd_param=['Xvfb', '-help'] cmd=['Xvfb', '-help'] oserror=[WinError 2] The system cannot find the file specified return_code=None stdout="None" stderr="None" timeout_happened=False>