In [1]:
import collections
import gym
import numpy as np
import statistics
import tensorflow as tf
import tqdm

from matplotlib import pyplot as plt
from tensorflow.keras import layers
from typing import Any, List, Sequence, Tuple

# Create the Environment -> Premade thing in the gym library
env = gym.make("CartPole-v1")

# Set a seed for environment reproductivity
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

# Small Epsilon value for stablizing division operations
eps = np.finfo(np.float32).eps.item()
print(eps)

2023-09-26 10:44:52.670898: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


1.1920928955078125e-07


In [2]:
class ActorCritic(tf.keras.Model):
                          
    # Basically all this class does is just initializes a basic neural network model that takes in as input the units for complexity and the amount of actions that the model is able to take
    
    def __init__(
        self,
        num_actions: int,
        num_hidden_units: int):
                  
        super().__init__()
                  
        self.common = layers.Dense(num_hidden_units, activation='relu')
        self.actor = layers.Dense(num_actions)
        self.critic = layers.Dense(1)
                  
    def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
        
        x = self.common(inputs)
        return self.actor(x), self.critic(x)
                  
                  
num_actions = env.action_space.n  # Basically the amount of actions that the RL Agent has (which is 2 because it can either move the cart left or right
print(num_actions)
num_hidden_units = 128 # Just represents the complexity of the neural network

model = ActorCritic(num_actions, num_hidden_units)                  


2


2023-09-26 10:44:55.612128: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    # this is used for compatibility purposes with Tensorflows' graph 
    # Also returns state, done, and reward variables given an action
    
    state, reward, done, truncated, info = env.step(action)
    return (state.astype(np.float32),
            np.array(reward, np.int32),
            np.array(done, np.int32))

def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
    return tf.numpy_function(env_step, [action], [tf.float32, tf.int32, tf.int32])

# This function creates training data by running a single trial run
def run_episode(
    initial_state: tf.Tensor,
    model: tf.keras.Model,
    max_steps: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
    
    # Create arrays that values will go into
    action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)
    
    initial_state_shape = initial_state.shape
    state = initial_state
    
    for i in tf.range(max_steps):
        # We convert the state so that it can be used in the model function one time. I think we make it batched, so that the model only runs it one time
        state = tf.expand_dims(state, 0)
        
        # Test the model on the current state
        action_logits_t, value = model(state)
        
        # We take a random action based on what the model spits out
        action = tf.random.categorical(action_logits_t, 1)[0, 0]
        action_probs_t = tf.nn.softmax(action_logits_t)
        
        values = values.write(t, tf.squeeze(value))
        
        actions_probs = action_probs.write(t, actions_probs_t[0, action])
        
        # Extract the state, reward, and whether or not it is done from the action (if it is, then the loop breaks)
        state, reward, done = tf_env_step(action)
        state.set_shape(initial_state_shape)

        rewards = rewards.write(t, reward)
        
        if tf.cast(done, tf.bool):
            break
            
    action_probs = action_probs.stack()
    values = values.stack()
    rewards = rewards.stack()
    
    return action_probs, values, rewards

In [4]:
def get_expected_return(rewards: tf.Tensor, gamma: float, standardize: bool = True) -> tf.Tensor:
    # This function computes the expected return in each timestep
    
    n = tf.shape(rewards)[0] # Gets the amount of timesteps
    returns = tf.TensorArray(dtype=tf.float32, size=n) # Creates a 'returns' variable with size equal to timesteps
    
    rewards = tf.cast(rewards[::-1], dtype=tf.float32)
    discounted_sum = tf.constant(0.0)
    discounted_sum_shape = discounted_sum.shape
    for i in tf.range(n):
        reward = rewards[i]
        discounted_sum = reward + gamma * discounted_sum
        discounted_sum.set_shape(discounted_sum_shape)
        returns = returns.write(i, discounted_sum)
    returns = returns.stack()[::-1]
    
    if standardize:
        returns = ((returns - tf.math.reduce_mean(returns)) / (tf.math.reduce_std(returns) + eps))
        
    return returns
    
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

def compute_loss(action_probs: tf.Tensor, values: tf.Tensor, returns: tf.Tensor) -> tf.Tensor:
    
    advantage = returns - values
    
    action_log_probs = tf.math.log(action_probs)
    actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)
    
    critic_loss = huber_loss(values, returns)
    
    return actor_loss + critic_loss
    

In [5]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

@tf.function
def train_step(initial_state: tf.Tensor, model: tf.keras.Model, 
               optimizer: tf.keras.optimizers.Optimizer, gamma: float, max_steps_per_episode: int) -> tf.Tensor:
    
    with tf.GradientTape() as tape:
        action_probs, values, rewards = run_episode(initial_state, model, max_steps_per_episode)
        
        returns = get_expected_returns(rewards, gamma)
        
        action_probs, values, returns = [tf.expand_dims(x, 1) for x in [action_probs, values, returns]]
        
        loss = compute_loss(action_probs, values, returns)
        
    grads = tape.gradient(loss, model.trainable_variables)
    
    episode_reward = tf.math.reduce_sum(rewards)
    
    return episode_reward



In [6]:
%%time

min_episodes_criterion = 100
max_episodes = 10000
max_steps_per_episode = 500

# `CartPole-v1` is considered solved if average reward is >= 475 over 500 
# consecutive trials
reward_threshold = 475
running_reward = 0

# Discount factor for future rewards
gamma = 0.99

episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)
    
t = tqdm.trange(max_episodes)
for i in t:
    initial_state, info = env.reset()
    initial_state = tf.constant(initial_state, dtype=tf.float32)
    episode_reward = int(train_step(initial_state, model, optimizer, gamma, max_steps_per_episode))
    
    episodes_reward.append(episode_reward)
    running_reward = statistics.mean(episodes_reward)
    
    t.set_postfix(episode_reward=episode_reward, running_reward=running_reward)
    
    if i % 10 == 0:
        pass
    
    if running_reward > reward_threshold and i >= min_episodes_criterion:
        break
        
print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')


  0%|                                                 | 0/10000 [00:00<?, ?it/s]


AttributeError: in user code:

    File "/var/folders/ph/5mp9dxqj3yx_w5c5mknynk780000gn/T/ipykernel_37699/4196616594.py", line 8, in train_step  *
        action_probs, values, rewards = run_episode(initial_state, model, max_steps_per_episode)
    File "/var/folders/ph/5mp9dxqj3yx_w5c5mknynk780000gn/T/ipykernel_37699/1098922951.py", line 38, in run_episode  *
        values = values.write(t, tf.squeeze(value))
    File "/Users/noahspurr/opt/anaconda3/lib/python3.9/site-packages/tqdm/utils.py", line 84, in __ge__
        return not self < other
    File "/Users/noahspurr/opt/anaconda3/lib/python3.9/site-packages/tqdm/utils.py", line 69, in __lt__
        return self._comparable < other._comparable

    AttributeError: 'Tensor' object has no attribute '_comparable'


In [7]:
from IPython import display as ipythondisplay
from PIL import Image

render_env = gym.make("CartPole-v1", render_mode='rgb_array')

def render_episode(env: gym.Env, model: tf.keras.Model, max_steps: int):
    state, info = env.reset()
    state = tf.constant(state, dtype=tf.float32)
    screen = env.render()
    images = [Image.fromarray(screen)]
    
    for i in range(1, max_steps + 1):
        state = tf.expand_dims(state, 0)
        action_probs, _ = model(state)
        action = np.argmax(np.squeeze(action_probs))
        
        state, reward, done, truncated, info = env.step(action)
        state = tf.constant(state, dtype=tf.float32)
        
        # Render Screen every 10 steps
        if i % 10 == 0:
            screen = env.render()
            images.append(Image.fromarray(screen))
            
        if done:
            break
            
    return images

images = render_episode(render_env, model, max_steps_per_episode)
image_file = 'cartpole-v1.gif'

# loop=0: loop forever, duration=1: play each frame for 1ms
images[0].save(image_file, save_all=True, append_images=images[1:], loop=0, duration=1)

In [8]:
import tensorflow_docs.vis.embed as embed
embed.embed_file(image_file)