Note that this is heavily inspired by the example on the official documentation on TensorFlow. I rewrote it to gain more of an understanding of how they would do things.

In [1]:
import collections                             # Using a deque is helpful for keeping track of environment states and such
import gym                                     # Host the environment 
import numpy as np                             # Fast linear algebra
import tensorflow as tf                        # Fast machine learning 
import tqdm                                    # Only used once, is a progress bar (so optional)

from tensorflow.keras import layers            # Makes it easier to use functional API
from typing import Any, List, Sequence, Tuple  # Lets us use type checking by giving us names to call 

eps = np.finfo(np.float32).eps.item()
seed = 42

tf.random.set_seed(seed)
np.random.seed(seed)

### Plan:

* eps

#### Game: 
* env: gym.make() => env.step(), env.reset(), env.action_space.n, env.render(), env.close()
* env_step()
* tf_env_step()
  
#### Agent: 
* Model: ActorCritic()
* Get Loss: compute_loss()
* Get action and values: info()
* Feedforward: __call__()
* Property: trainable_weights

#### Main loop: 
* run_episode()
* get_expected_return()
* train_step()
* Training Loop

### Main Logic

#### Game: 
* advance a time step
* reset to initial state
* render time step 

#### Training:
* Run an episode and collect data
* Update a model's weights based on the data

In [2]:
class game:
    
    def __init__(self, seed = None):
        self.internal = gym.make("CartPole-v0")
        if seed is not None: self.internal.seed(seed)
            
        self.action_space = self.internal.action_space
    
    def helper_step(self, action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        state, reward, done, _ = self.internal.step(action)
        return state.astype(np.float32), np.array(reward, np.int32), np.array(done, np.int32)
    
    def step(self, action: np.ndarray) -> List[tf.Tensor]: 
        return tf.numpy_function(self.helper_step, [action], [tf.float32, tf.int32, tf.int32])
    
    def reset(self): return self.internal.reset()
    def render(self): return self.internal.render()
    def close(self): return self.internal.close()

In [3]:
class ActorCritic(tf.keras.Model):
    
    def __init__(self, num_actions: int, num_hidden_units: int):
        super().__init__()
        self.common = layers.Dense(num_hidden_units)
        self.actor = layers.Dense(num_actions)
        self.critic = layers.Dense(1)
    
    def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
        x = self.common(inputs)
        return self.actor(x), self.critic(x)

class Agent:
    
    def __init__(self, num_actions: int, num_hidden_units: int, 
                 loss : tf.keras.losses = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)):
        
        self.model = ActorCritic(num_actions, num_hidden_units)
        self.loss = loss 
    
    @property
    def trainable_variables(self):
        return self.model.trainable_variables
    
    @tf.function()
    def compute_loss(self, action_probs: tf.Tensor, values: tf.Tensor, returns: tf.Tensor) -> tf.Tensor:
        """Computes the combined actor-critic loss."""

        advantage = returns - values
        action_log_probs = tf.math.log(action_probs)

        actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)
        critic_loss = self.loss(values, returns)

        return actor_loss + critic_loss

    @tf.function()
    def __call__(self, state: tf.Tensor):
        return self.model(state)
    
    def action(self, state: tf.Tensor):
        return tf.random.categorical(self(state)[0], 1)[0, 0]

In [4]:
@tf.function()
def run_episode(initial_state: tf.Tensor, agent: Agent, max_steps: int, env: game) -> List[tf.Tensor]:
    """Runs a single epsiode to collect training data."""
    
    action_probs = tf.TensorArray(dtype = tf.float32, size = 0, dynamic_size = True)
    values = tf.TensorArray(dtype = tf.float32, size = 0, dynamic_size = True)
    rewards = tf.TensorArray(dtype = tf.int32, size = 0, dynamic_size = True)
    
    initial_state_shape = initial_state.shape
    state = initial_state
    
    for t in tf.range(max_steps):
        
        state = tf.expand_dims(state, 0) #Get it into batch form because right now it is one dimensional
        action_logits_t, value = agent(state)
        
        action = tf.random.categorical(action_logits_t, 1)[0, 0] #Shape is (1,1) so use [0,0] to get single number tensor
        action_probs_t = tf.nn.softmax(action_logits_t)
        
        values = values.write(t, tf.squeeze(value)) #Squeeze to get rid of batch aspect
        action_probs = action_probs.write(t, action_probs_t[0, action])
        
        state, reward, done = env.step(action)
        state.set_shape(initial_state_shape)
        
        rewards = rewards.write(t, reward)
        
        if tf.cast(done, tf.bool):
            break
            
    action_probs = action_probs.stack()
    values = values.stack()
    rewards = rewards.stack()
    
    return action_probs, values, rewards # Currently one-dimensional 

In [5]:
@tf.function()
def get_expected_return(rewards: tf.Tensor, gamma: float) -> tf.Tensor:
    """Compute expected returns per timestep."""
    
    n = tf.shape(rewards)[0]                           
    returns = tf.TensorArray(dtype=tf.float32, size=n)
    

    rewards = tf.cast(rewards[::-1], dtype=tf.float32) # We use [::-1] to order the rewards from last one to first
    discounted_sum = tf.constant(0.0)
    discounted_sum_shape = discounted_sum.shape
    
    for i in tf.range(n):
        reward = rewards[i]
        discounted_sum = reward + gamma * discounted_sum 
        discounted_sum.set_shape(discounted_sum_shape) 
        returns = returns.write(i, discounted_sum) 
        
    returns = returns.stack()[::-1] # Now the returns are ordered from last to first so we swap em again. Shape is (n, )
    returns = (returns - tf.math.reduce_mean(returns)) / (tf.math.reduce_std(returns) + eps)
    return returns

In [6]:
@tf.function()
def train_step(initial_state: tf.Tensor, agent: Agent, optimizer: tf.keras.optimizers.Optimizer,
               gamma: float, max_steps_per_episode: int, env: game) -> tf.Tensor:
    """Runs a model training step."""

    with tf.GradientTape() as tape:
        
        action_probs, values, rewards = run_episode(initial_state, agent, max_steps_per_episode, env)
        returns = get_expected_return(rewards, gamma)
        action_probs, values, returns = [tf.expand_dims(x, 1) for x in [action_probs, values, returns]]
        loss = agent.compute_loss(action_probs, values, returns)
        
    grads = tape.gradient(loss, agent.trainable_variables)
    optimizer.apply_gradients(zip(grads, agent.trainable_variables))        
    episode_reward = tf.math.reduce_sum(rewards)
    
    return episode_reward

In [7]:
env = game(seed)

num_actions = env.action_space.n
num_hidden_units = 128
agent = Agent(num_actions, num_hidden_units)

optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

In [8]:
%%time 

max_episodes = 10000
max_steps_per_episode = 1000

reward_threshold = 195
running_reward = 0

# Discount factor
gamma = 0.99

with tqdm.trange(max_episodes) as t:
    for i in t:
        initial_state = tf.constant(env.reset(), dtype = tf.float32)
        episode_reward = int(train_step(initial_state, agent, optimizer, gamma, max_steps_per_episode, env))
        
        running_reward = 0.01 * episode_reward + running_reward * 0.99
        
        t.set_description(f'Episode {i}')
        t.set_postfix(episode_reward=episode_reward, running_reward=running_reward)

        if running_reward > reward_threshold:  
            pass#break

print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')

Episode 9999: 100%|██████████████████████| 10000/10000 [10:03<00:00, 16.57it/s, episode_reward=186, running_reward=166]


Solved at episode 9999: average reward: 166.06!
Wall time: 10min 3s





In [9]:
initial_state = tf.constant(env.reset(), dtype = tf.float32)
max_steps = 1000

# env.reset() returns an intial observation. That is what initial_state is. 
initial_state_shape = initial_state.shape
state = initial_state 

for t in tf.range(max_steps):
    state = tf.expand_dims(state, 0)

    action_logits_t, value = agent(state)        
    action = tf.random.categorical(action_logits_t, 1)[0, 0] # This ends up having shape (1,1) so taking [0,0] corrects this

    env.render()
    state, reward, done = env.step(action)


    # End episode?
    if tf.cast(done, tf.bool):
        break

env.close()