[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/MaxMitre/DeepLearning/blob/main/Semana10_Actor_Critic.ipynb)

# Método Actor-Critic

En el método Actor-Crítico, la política se conoce como el actor que propone un conjunto de posibles acciones dado un estado, y la función de valor estimado se denomina crítica , que evalúa las acciones tomadas por el actor en función de la política dada.

In [None]:
!pip install gym
!pip install pyglet

In [None]:
# Install additional packages for visualization
!sudo apt-get install -y xvfb python-opengl > /dev/null 2>&1
!pip install pyvirtualdisplay > /dev/null 2>&1
!pip install git+https://github.com/tensorflow/docs > /dev/null 2>&1

In [None]:
!pip show tensorflow

# CartPole-v0

En el entorno , se une un poste a un carro que se mueve a lo largo de una pista sin fricción. El poste comienza en posición vertical y el objetivo del agente es evitar que se caiga aplicando una fuerza de -1 o +1 al carro. Se otorga una recompensa de +1 por cada paso de tiempo que el poste permanece en posición vertical. Un episodio termina cuando (1) el poste está a más de 15 grados de la vertical o (2) el carro se mueve a más de 2,4 unidades del centro.

El problema se considera "resuelto" cuando la recompensa total promedio del episodio llega a 195 en 100 intentos consecutivos.

In [None]:
import collections
import gym
import numpy as np
import statistics
import tensorflow as tf
import tqdm

from matplotlib import pyplot as plt
from tensorflow.keras import layers
from typing import Any, List, Sequence, Tuple


# Create the environment
env = gym.make("CartPole-v0")

# Set seed for experiment reproducibility
seed = 42
env.seed(seed)
tf.random.set_seed(seed)
np.random.seed(seed)

# Small epsilon value for stabilizing division operations
eps = np.finfo(np.float32).eps.item()

# Modelo

El actor y el crítico se modelarán utilizando una red neuronal que genera las probabilidades de acción y el valor crítico, respectivamente. Este tutorial utiliza subclases de modelos para definir el modelo.

Durante el paso hacia adelante, el modelo tomará el estado como entrada y generará tanto las probabilidades de acción como el valor crítico $V$, que modela la función de valor dependiente del estado. El objetivo es entrenar un modelo que elija acciones basadas en una política $\pi$ que maximice el rendimiento esperado.

Para Cartpole-v0, hay cuatro valores que representan el estado: posición del carro, velocidad del carro, ángulo del poste y velocidad del poste, respectivamente. El agente puede realizar dos acciones para empujar el carro hacia la izquierda (0) y hacia la derecha (1) respectivamente.

In [None]:
class ActorCritic(tf.keras.Model):
  """Combined actor-critic network."""

  def __init__(
      self,
      num_actions: int,
      num_hidden_units: int):
    """Initialize."""
    super().__init__()

    self.common = layers.Dense(num_hidden_units, activation="relu")
    self.actor = layers.Dense(num_actions)
    self.critic = layers.Dense(1)

  def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    x = self.common(inputs)
    return self.actor(x), self.critic(x)

In [None]:
num_actions = env.action_space.n  # 2
num_hidden_units = 128

model = ActorCritic(num_actions, num_hidden_units)

In [None]:
# Wrap OpenAI Gym's `env.step` call as an operation in a TensorFlow function.
# This would allow it to be included in a callable TensorFlow graph.

def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
  """Returns state, reward and done flag given an action."""

  state, reward, done, _ = env.step(action)
  return (state.astype(np.float32),
          np.array(reward, np.int32),
          np.array(done, np.int32))


def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
  return tf.numpy_function(env_step, [action],
                           [tf.float32, tf.int32, tf.int32])

In [None]:
def run_episode(
    initial_state: tf.Tensor,
    model: tf.keras.Model,
    max_steps: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
  """Runs a single episode to collect training data."""

  action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

  initial_state_shape = initial_state.shape
  state = initial_state

  for t in tf.range(max_steps):
    # Convert state into a batched tensor (batch size = 1)
    state = tf.expand_dims(state, 0)

    # Run the model and to get action probabilities and critic value
    action_logits_t, value = model(state)

    # Sample next action from the action probability distribution
    action = tf.random.categorical(action_logits_t, 1)[0, 0]
    action_probs_t = tf.nn.softmax(action_logits_t)

    # Store critic values
    values = values.write(t, tf.squeeze(value))

    # Store log probability of the action chosen
    action_probs = action_probs.write(t, action_probs_t[0, action])

    # Apply action to the environment to get next state and reward
    state, reward, done = tf_env_step(action)
    state.set_shape(initial_state_shape)

    # Store reward
    rewards = rewards.write(t, reward)

    if tf.cast(done, tf.bool):
      break

  action_probs = action_probs.stack()
  values = values.stack()
  rewards = rewards.stack()

  return action_probs, values, rewards

# Rendimientos esperados

La secuencia de recompensas para cada paso de tiempo $t$, $\{r_{t}\}^{T}_{t=1}$ recopilada durante un episodio se convierte en una secuencia de rendimientos esperados $\{G_{t}\}^{T}_{t=1}$ en la que la suma de recompensas se toma del paso de tiempo actual $t$ a $T$ y tiempo cada la recompensa se multiplica con un factor de descuento exponencialmente decreciente $\gamma$:

$$ G_{t} = \sum^{T}_{t'=t} \gamma^{t'-t}r_{t'}$$
Desde $\gamma \in(0,1)$, las recompensas más alejadas del paso de tiempo actual tienen menos peso.

Intuitivamente, el rendimiento esperado simplemente implica que las recompensas ahora son mejores que las recompensas posteriores. En un sentido matemático, es para asegurar que la suma de las recompensas converja.

Para estabilizar el entrenamiento, la secuencia resultante de retornos también está estandarizada (es decir, para tener media cero y desviación estándar unitaria).

In [None]:
def get_expected_return(
    rewards: tf.Tensor,
    gamma: float,
    standardize: bool = True) -> tf.Tensor:
  """Compute expected returns per timestep."""

  n = tf.shape(rewards)[0]
  returns = tf.TensorArray(dtype=tf.float32, size=n)

  # Start from the end of `rewards` and accumulate reward sums
  # into the `returns` array
  rewards = tf.cast(rewards[::-1], dtype=tf.float32)
  discounted_sum = tf.constant(0.0)
  discounted_sum_shape = discounted_sum.shape
  for i in tf.range(n):
    reward = rewards[i]
    discounted_sum = reward + gamma * discounted_sum
    discounted_sum.set_shape(discounted_sum_shape)
    returns = returns.write(i, discounted_sum)
  returns = returns.stack()[::-1]

  if standardize:
    returns = ((returns - tf.math.reduce_mean(returns)) /
               (tf.math.reduce_std(returns) + eps))

  return returns

# Función de pérdida

Dado que se utiliza un modelo híbrido actor-crítico, la función de pérdida elegida es una combinación de pérdidas de actor y crítico para el entrenamiento, como se muestra a continuación:
$$ L = L_{actor} + L_{critic}$$





## Pérdida de actores

La pérdida del actor se basa en gradientes de política con el crítico como una línea de base dependiente del estado y se calcula con estimaciones de muestra única (por episodio).

$$ L_{actor} = -\sum^{T}_{t=1} \log\pi_{\theta}(a_{t} | s_{t})[G(s_{t}, a_{t})  - V^{\pi}_{\theta}(s_{t})]$$

donde:

$T$ : el número de pasos de tiempo por episodio, que puede variar por episodio

$s_t$ : el estado en el paso de tiempo $t$

$a_t$ : acción elegida en el paso de tiempo $t$ dado estado $s$

$\pi_\theta$ : es la política (actor) parametrizada por $\theta$

$V_{\theta}^{\pi}$ : es la función de valor (crítica) también parametrizada por $\theta$

$G = G_t$ : el rendimiento esperado para un estado dado, par de acciones en el paso de tiempo $t$

Se agrega un término negativo a la suma ya que la idea es maximizar las probabilidades de que las acciones produzcan mayores recompensas al minimizar la pérdida combinada.

## Pérdida crítica

El entrenamiento $V$ para que esté lo más cerca posible de $G$ se puede configurar como un problema de regresión con la siguiente función de pérdida:

$$L_{critic} = L_{\delta}(G, V^{\pi}_{\theta})$$

donde $L_{\delta}$ es la pérdida de [Huber](https://en.wikipedia.org/wiki/Huber_loss), que es menos sensible a los valores atípicos en los datos que la pérdida por error cuadrático.

In [None]:
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

def compute_loss(
    action_probs: tf.Tensor,
    values: tf.Tensor,
    returns: tf.Tensor) -> tf.Tensor:
  """Computes the combined actor-critic loss."""

  advantage = returns - values

  action_log_probs = tf.math.log(action_probs)
  actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)

  critic_loss = huber_loss(values, returns)

  return actor_loss + critic_loss

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)


@tf.function
def train_step(
    initial_state: tf.Tensor,
    model: tf.keras.Model,
    optimizer: tf.keras.optimizers.Optimizer,
    gamma: float,
    max_steps_per_episode: int) -> tf.Tensor:
  """Runs a model training step."""

  with tf.GradientTape() as tape:

    # Run the model for one episode to collect training data
    action_probs, values, rewards = run_episode(
        initial_state, model, max_steps_per_episode)

    # Calculate expected returns
    returns = get_expected_return(rewards, gamma)

    # Convert training data to appropriate TF tensor shapes
    action_probs, values, returns = [
        tf.expand_dims(x, 1) for x in [action_probs, values, returns]]

    # Calculating loss values to update our network
    loss = compute_loss(action_probs, values, returns)

  # Compute the gradients from the loss
  grads = tape.gradient(loss, model.trainable_variables)

  # Apply the gradients to the model's parameters
  optimizer.apply_gradients(zip(grads, model.trainable_variables))

  episode_reward = tf.math.reduce_sum(rewards)

  return episode_reward

# Entrenamiento

In [None]:
%%time

min_episodes_criterion = 100
max_episodes = 10000
max_steps_per_episode = 1000

# Cartpole-v0 is considered solved if average reward is >= 195 over 100
# consecutive trials
reward_threshold = 195
running_reward = 0

# Discount factor for future rewards
gamma = 0.99

# Keep last episodes reward
episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)

with tqdm.trange(max_episodes) as t:
  for i in t:
    initial_state = tf.constant(env.reset(), dtype=tf.float32)
    episode_reward = int(train_step(
        initial_state, model, optimizer, gamma, max_steps_per_episode))

    episodes_reward.append(episode_reward)
    running_reward = statistics.mean(episodes_reward)

    t.set_description(f'Episode {i}')
    t.set_postfix(
        episode_reward=episode_reward, running_reward=running_reward)

    # Show average episode reward every 10 episodes
    if i % 10 == 0:
      pass # print(f'Episode {i}: average reward: {avg_reward}')

    if running_reward > reward_threshold and i >= min_episodes_criterion:
        break

print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')

In [None]:
!sudo apt-get install xvfb


# Visualización

In [None]:
from IPython import display as ipythondisplay
from PIL import Image
from pyvirtualdisplay import Display


display = Display(visible=0, size=(400, 300))
display.start()


def render_episode(env: gym.Env, model: tf.keras.Model, max_steps: int):
  screen = env.render(mode='rgb_array')
  im = Image.fromarray(screen)

  images = [im]

  state = tf.constant(env.reset(), dtype=tf.float32)
  for i in range(1, max_steps + 1):
    state = tf.expand_dims(state, 0)
    action_probs, _ = model(state)
    action = np.argmax(np.squeeze(action_probs))

    state, _, done, _ = env.step(action)
    state = tf.constant(state, dtype=tf.float32)

    # Render screen every 10 steps
    if i % 10 == 0:
      screen = env.render(mode='rgb_array')
      images.append(Image.fromarray(screen))

    if done:
      break

  return images


# Save GIF image
images = render_episode(env, model, max_steps_per_episode)
image_file = 'cartpole-v0.gif'
# loop=0: loop forever, duration=1: play each frame for 1ms
images[0].save(
    image_file, save_all=True, append_images=images[1:], loop=0, duration=1)

In [None]:
import tensorflow_docs.vis.embed as embed
embed.embed_file(image_file)

# Referencias

[Natural Actor–Critic Algorithms](https://inria.hal.science/hal-00840470/document)

[Neuronlike Adaptive Elements That Can Solve Difficult Learning Control Problems](https://www.derongliu.org/adp/adp-cdrom/Barto1983.pdf)