In [4]:
# Avoid warning from TF

# Full imports
import gym
import mlflow

# Aliased imports
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.express as px

# Partial Import
from gym.wrappers.monitoring.video_recorder import VideoRecorder
from tqdm.notebook import tqdm, trange
from IPython.display import clear_output
from collections import namedtuple
from typing import Any, List, Sequence, Tuple

In [5]:
# Remember to export to export "LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib/"" if using linux
# Drop numa errors in term: "for a in /sys/bus/pci/devices/*; do echo 0 | sudo tee -a $a/numa_node; done"

# Check if we have GPU
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

## META CONSTANTS

In [6]:
DEFAULT_ENV = "CartPole-v1"
EXPERIMENT_NAME = "cart_pole_a2c"
TAGS = {
    "type": "RL",
    "env": "Discrete Cart Pole",
    "algorithm": "A2C",
    "sub-algorithm": "Hybrid Model"
}

In [7]:
EPS = np.finfo(np.float32).eps.item()

## GLOBAL AUX DEFS

In [8]:
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

## IMPLEMENTATION

In [60]:
class ActorCritic(tf.keras.Model):
    def __init__(self, env: gym.Env) -> None:
        """_summary_

        Args:
            env (gym.Env): _description_
        """

        # Call super to properly init
        super().__init__()

        # Define model
        self.base_1 = tf.keras.layers.Dense(128)
        self.activation_1 = tf.keras.layers.LeakyReLU()
        self.actor_out = tf.keras.layers.Dense(env.action_space.n, activation="softmax")
        self.critic_out = tf.keras.layers.Dense(1)

        
    def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
        """_summary_

        Args:
            inputs (tf.Tensor): _description_

        Returns:
            Tuple[tf.Tensor, tf.Tensor, tf.Tensor]: _description_
        """
        # Apply base layers
        x = self.base_1(inputs)
        x = self.activation_1(x)

        # Compute distribution of actions for actor and pick one and corresponding log_prob
        #action_dist = tfp.distributions.Categorical(probs=self.actor_out(x), dtype=tf.int32)
        #action = action_dist.sample()

        # Compute estimate for value
        #value = self.critic_out(x)

        return self.actor_out(x), self.critic_out(x)
        

In [45]:
class RLUtils:
    def __init__(self) -> None:
        pass

    @staticmethod
    def r_to_g(rewards: tf.Tensor, gamma: float, std: bool = True) -> tf.Tensor:
        """_summary_

        Args:
            rewards (tf.Tensor): _description_
            gamma (float): _description_
            std (bool, optional): _description_. Defaults to True.

        Returns:
            tf.Tensor: _description_
        """
        # Prepare aux vars
        rewards = tf.cast(rewards, dtype=tf.float32)
        t = tf.range(tf.size(rewards), dtype=tf.float32)

        # Compute factors
        delta = rewards * gamma ** t
        g = tf.cumsum(delta[::-1])[::-1] / gamma ** t

        # Std if needed
        if std:
            g = (g - tf.reduce_mean(g)) / (tf.math.reduce_std(g) + EPS)
        
        # Return expected returns
        return g

In [61]:
class Agent:
    def __init__(self, env: gym.Env, gamma: float, model: tf.keras.Model, optimizer: tf.keras.optimizers.Optimizer) -> None:
        """Returns an agent acting on "env" using the specified "gamma" and "model".

        Args:
            env (gym.Env): Gym Envinronment
            gamma (float): Discount factor
            model (tf.keras.Model): Keras model
        """

        # Init private props
        self._env = env
        self._gamma = gamma
        self._model = model
        self._optimizer = optimizer

        # Init public props
        #

    def _aux_np_step(self, action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Intermediate function used to perform steps using tensors.

        Args:
            action (np.ndarray): Action to be perfomed

        Returns:
            state (np.ndarray): new state
            reward (np.ndarray): reward obtained
            done (np.ndarray): indicates if episode finished
        """

        # Perform step
        state, reward, done, _, _ = self._env.step(action)

        # Cast to 0/1 to exploit tensor repr
        return (
            state.astype(np.float32),
            np.array(reward, np.float32),
            np.array(done, np.int32)
        )

    def _aux_tf_step(self, action: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
        """Auxiliar function used to perform steps using tensors.

        Args:
            action (tf.Tensor): action to be performed

        Returns:
            state (tf.float32): new state
            reward (tf.int32): reward obtained
            done (tf.int32): indicates if episode finished
        """
        return tf.numpy_function(self._aux_np_step, [action], [tf.float32, tf.float32, tf.int32])

    def _exec_trajectory(self, init_state: tf.Tensor, model: tf.keras.Model, max_steps: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
        """_summary_

        Args:
            init_state (tf.Tensor): _description_
            max_steps (tf.Tensor): _description_

        Returns:
            Tuple[tf.Tensor, tf.Tensor, tf.Tensor]: _description_
        """
        
        # Define buffers
        log_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
        values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
        rewards =  tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)

        # Set current state
        state = init_state

        for i in tf.range(max_steps):
            # state_i (n,) -> state_f (1, n)
            state = tf.expand_dims(state, axis=0)

            # Pick an action and get corresponding log prob and value
            action_probs, value = model(state)
            action_dist = tfp.distributions.Categorical(probs=action_probs, dtype=tf.int32)
            action = action_dist.sample()
            log_prob = action_dist.log_prob(action)

            # Perform next action
            state, reward, done = self._aux_tf_step(action[0])
            state.set_shape(init_state.shape)

            # Save in buffer
            log_probs.write(i, tf.squeeze(log_prob))
            values.write(i, tf.squeeze(value))
            rewards.write(i, reward)

            # Break if we finished the run
            if tf.cast(done, tf.bool):
                break
        
        # Stack into a single tensor
        log_probs = log_probs.stack()
        values = values.stack()
        rewards = rewards.stack()

        return log_probs, values, rewards
    

    def _loss(self, log_probs: tf.Tensor, values: tf.Tensor, returns: tf.Tensor) -> tf.Tensor:
        adv = returns - values

        # Actor loss
        actor_loss = -tf.math.reduce_sum(log_probs * adv)

        # Critic loss
        # Use hubber loss because it's more stable to outliers than delta^2
        critic_loss = huber_loss(values, returns)

        return actor_loss + critic_loss


    @tf.function
    def _train(self, init_state: tf.Tensor, model: tf.keras.Model, max_steps: int) -> tf.Tensor:
        """_summary_p

        Args:
            init_state (tf.Tensor): _description_
            max_steps (int): _description_

        Returns:
            tf.Tensor: _description_
        """
        with tf.GradientTape() as tape:    
            tape.watch(init_state)
            tape.watch(model.trainable_variables)
                
            log_probs, values, rewards = self._exec_trajectory(init_state, model, max_steps)
            returns = RLUtils.r_to_g(rewards, self._gamma)

            # Compute loss
            loss = self._loss(log_probs, values, returns)
        
        # Compute gradients
        grads = tape.gradient(loss, model.trainable_variables)

        # Update weights
        self._optimizer.apply_gradients(zip(grads, model.trainable_variables))

        # Retrun rewards
        return tf.math.reduce_sum(rewards)

    
    def train(self, init_state: np.ndarray, max_steps: int):
        init_state = tf.constant(init_state, dtype=tf.float32)
        return int(self._train(init_state, self._model, max_steps))

In [62]:
tapes gradient with class methodsLR = 1e-3
GAMMA = 0.99
N_EPISODES = 10000
MAX_STEPS = 500

# Define basic vars
env = gym.make(DEFAULT_ENV)
model = ActorCritic(env)
model.build(input_shape=(None, env.observation_space.shape[0]))
optimizer = tf.optimizers.Adam(learning_rate=LR)
agent = Agent(env, GAMMA, model, optimizer=optimizer)

rewards = []

# Start training
try:
    for e in (tbar := trange(N_EPISODES)):
        init_state = env.reset()[0]
        reward_e = agent.train(init_state, MAX_STEPS)

        # Save rewards
        rewards.append(reward_e)

        # Update progressbar
        tbar.set_postfix(reward=reward_e)

except KeyboardInterrupt:
    print("Training stopped...")




  0%|          | 0/10000 [00:00<?, ?it/s]

ValueError: in user code:

    File "/tmp/ipykernel_17521/1965587532.py", line 143, in _train  *
        self._optimizer.apply_gradients(zip(grads, model.trainable_variables))
    File "/home/main/anaconda3/envs/rlenv/lib/python3.8/site-packages/keras/optimizers/optimizer_v2/optimizer_v2.py", line 689, in apply_gradients  **
        grads_and_vars = optimizer_utils.filter_empty_gradients(grads_and_vars)
    File "/home/main/anaconda3/envs/rlenv/lib/python3.8/site-packages/keras/optimizers/optimizer_v2/utils.py", line 77, in filter_empty_gradients
        raise ValueError(

    ValueError: No gradients provided for any variable: (['dense_63/kernel:0', 'dense_63/bias:0', 'dense_64/kernel:0', 'dense_64/bias:0', 'dense_65/kernel:0', 'dense_65/bias:0'],). Provided `grads_and_vars` is ((None, <tf.Variable 'dense_63/kernel:0' shape=(4, 128) dtype=float32>), (None, <tf.Variable 'dense_63/bias:0' shape=(128,) dtype=float32>), (None, <tf.Variable 'dense_64/kernel:0' shape=(128, 2) dtype=float32>), (None, <tf.Variable 'dense_64/bias:0' shape=(2,) dtype=float32>), (None, <tf.Variable 'dense_65/kernel:0' shape=(128, 1) dtype=float32>), (None, <tf.Variable 'dense_65/bias:0' shape=(1,) dtype=float32>)).
