### Imports

In [2]:
import collections
import gym
import numpy as np
import statistics
import tensorflow as tf
import tqdm

from matplotlib import pyplot as plt
from tensorflow.keras import layers
from typing import Any, List, Sequence, Tuple

Matplotlib is building the font cache; this may take a moment.


In [3]:
# Create the environment
env = gym.make("CartPole-v1")

# Set seed for experiment reproducibility
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

# Small epsilon value for stabilizing division operations
eps = np.finfo(np.float32).eps.item()

### The Model

The Actor and Critical will be modeled using one neural network that generates action probabilities and the Critic value respectively. 

During the forward pass, the model will take in the state as the input and will output both action probabilities and critic value. The goal is to train a model that chooses actions based on a policy that maximizes expected return.

In [4]:
class ActorCritic(tf.keras.Model):
    """Actor-Critic Network Model"""
    def __init__(self, num_actions: int, num_hidden_units: int):
        #Initialize
        super().__init__()

        self.common = layers.Dense(num_hidden_units, activation="relu")
        self.actor = layers.Dense(num_actions)
        self.critic = layers.Dense(1)

    def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
        x = self.common(inputs)
        return self.actor(x), self.critic(x)

In [5]:
num_actions = env.action_space.n
num_hidden_units = 128

#build model
model = ActorCritic(num_actions, num_hidden_units)

### Agent Training

Training the agent will follow these steps
- Run the agent on the environment to collect training data per episode
- Compute expected return at each time step
- Compute the loss for the combined Actor-Critic model
- Compute gradients and update network parameters
- Repeat until success criterion or max episodes has been reached

In [6]:
@tf.numpy_function(Tout=[tf.float32, tf.int32, tf.int32])
def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    #returns state, reward and done flag given an action
    state, reward, done, truncated, info = env.step(action)
    return (state.astype(np.float32), np.array(reward, np.int32), np.array(done, np.int32))

In [7]:
def run_episode(initial_state: tf.Tensor, 
                model: tf.keras.Model,
                max_steps: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
    "runs a single episode to collect training data"
    action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

    initial_state_shape = initial_state.shape
    state = initial_state

    for t in tf.range(max_steps):
        #convert state into a batched tesnor (batch size = 1)
        state = tf.expand_dims(state, 0)

        #run the model and to get action probabilities and critic value
        action_logits_t, value = model(state)

        #sample next action from the action probability distribution
        action = tf.random.categorical(action_logits_t, 1)[0, 0]
        action_probs_t = tf.nn.softmax(action_logits_t)

        #store critic values
        values = values.write(t, tf.squeeze(value))

        #store log probability of the action chosen
        action_probs = action_probs.write(t, action_probs_t[0, action])

        #apply action to the environment to get next state and reward
        state, reward, done = env_step(action)
        state.set_shape(initial_state_shape)

        #store reward
        rewards = rewards.write(t, reward)

        if tf.cast(done, tf.bool):
            break

    actions_probs = action_probs.stack()
    values = values.stack()
    rewards = rewards.stack()

    return action_probs, values, rewards


### Compute Expected Returns

Expected returns ensures that the sum of rewards converges, because it implies that rewards now are better than rewards later.

In [None]:
def get_expected_return(rewards: tf.Tensor, gamma: float, standardize: bool = True) -> tf.Tensor:
    #compute expected returns per timestep
    n = tf.shape(rewards)[0]
    returns = tf.TensorArray(dtype=tf.float32, size=n)

    #start from end of rewards and accumulate reward sums
    rewards = tf.cast(rewards[::-1], dtype=tf.float32)
    discounted_sum = tf.constant(0.0)
    discounted_sum_shape = discounted_sum.shape
    for i in tf.range(n):
        reward = rewards[i]
        discounted_sum = reward + gamma * discounted_sum
        discounted_sum.set_shape(discounted_sum_shape)
        returns = returns.write(i, discounted_sum)
    returns = returns.stack()[::-1]

    if standardize:
        returns = ((returns - tf.math.reduce_mean(returns)) / 
                   (tf.math.reduce_std(returns) + eps))
    return returns