In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

In [None]:
# Establecer hiperparámetros
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
hidden_dim = 256
actor_lr = 3e-4
critic_lr = 3e-4
alpha_lr = 3e-4
gamma = 0.99
tau = 0.005
buffer_size = 1e6
batch_size = 128
alpha = 0.2  # Entropy coefficient

In [None]:
class ReplayBuffer:
    """
    Clase para almacenar y gestionar una memoria de experiencia para el aprendizaje por refuerzo.
    Utiliza un buffer de repetición para almacenar transiciones y muestrea aleatoriamente de este buffer para el entrenamiento.
    """

    def __init__(self, capacity):
        """
        Inicializa el ReplayBuffer con una capacidad máxima.

        :param capacity: Capacidad máxima del buffer de repetición.
        """
        self.buffer = deque(maxlen=int(capacity))  # Inicializa una deque con una capacidad máxima dada.

    def push(self, state, action, reward, next_state, done):
        """
        Añade una transición (estado, acción, recompensa, siguiente estado, terminado) al buffer de repetición.

        :param state: Estado actual del entorno.
        :param action: Acción tomada en el estado actual.
        :param reward: Recompensa obtenida después de tomar la acción.
        :param next_state: Estado del entorno después de tomar la acción.
        :param done: Indicador booleano que indica si el episodio ha terminado.
        """
        self.buffer.append((state, action, reward, next_state, done))  # Añade la transición al buffer.

    def sample(self, batch_size):
        """
        Muestra aleatoriamente un batch de transiciones del buffer de repetición.

        :param batch_size: Tamaño del batch a muestrear.
        :return: Tupla de arrays numpy que contienen estados, acciones, recompensas, siguientes estados y si el episodio ha terminado.
        """
        # Extrae las transiciones del buffer de forma aleatoria.
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        # Convierte las transiciones a arrays numpy y retorna.
        return np.array(state), np.array(action), np.array(reward, dtype=np.float32), np.array(next_state), np.array(done, dtype=np.float32)

    def __len__(self):
        """
        Retorna el número actual de transiciones almacenadas en el buffer.

        :return: El tamaño del buffer.
        """
        return len(self.buffer)  # Retorna el tamaño actual del buffer de repetición.


In [None]:
class Actor(nn.Module):
    """
    Clase que define el modelo de un actor en un algoritmo de aprendizaje por refuerzo basado en políticas,
    como el Proximal Policy Optimization (PPO) o el Actor-Critic.
    """

    def __init__(self):
        """
        Inicializa la red neuronal del actor.
        Define las capas de la red: dos capas completamente conectadas para la extracción de características,
        y dos capas de salida para calcular la media y el logaritmo de la desviación estándar de la distribución de la política.
        """
        super(Actor, self).__init__()
        # Capa completamente conectada que toma el estado como entrada y produce una representación oculta.
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        # Segunda capa completamente conectada para aumentar la capacidad de la red.
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        # Capa de salida que calcula la media de la distribución de la política.
        self.mean = nn.Linear(hidden_dim, action_dim)
        # Capa de salida que calcula el logaritmo de la desviación estándar de la distribución de la política.
        self.log_std = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        """
        Propaga el estado a través de la red para obtener la media y el logaritmo de la desviación estándar
        de la distribución de la política.

        :param state: Tensor de entrada que representa el estado del entorno.
        :return: media y logaritmo de la desviación estándar de la distribución de la política.
        """
        x = torch.relu(self.fc1(state))  # Aplicar ReLU después de la primera capa oculta.
        x = torch.relu(self.fc2(x))      # Aplicar ReLU después de la segunda capa oculta.
        mean = self.mean(x)             # Calcular la media de la distribución de la política.
        log_std = self.log_std(x)       # Calcular el logaritmo de la desviación estándar de la distribución de la política.
        log_std = torch.clamp(log_std, min=-20, max=2)  # Limitar el rango de los valores de log_std para estabilidad numérica.
        return mean, log_std

    def sample(self, state):
        """
        Muestra una acción basada en el estado actual utilizando la distribución de la política.

        :param state: Tensor de entrada que representa el estado del entorno.
        :return: Acción muestreada utilizando la distribución de la política.
        """
        mean, log_std = self.forward(state)  # Obtener la media y log_std del estado actual.
        std = log_std.exp()  # Calcular la desviación estándar a partir del logaritmo de la desviación estándar.
        normal = torch.distributions.Normal(mean, std)  # Crear una distribución normal con media y desviación estándar calculadas.
        x_t = normal.rsample()  # Muestra una acción usando el truco de reparametrización (mean + std * N(0,1)).
        action = torch.tanh(x_t)  # Aplicar la función tanh para asegurar que la acción esté en el rango [-1, 1].
        return action


In [None]:
class Critic(nn.Module):
    """
    Clase que define el modelo del crítico en un algoritmo de aprendizaje por refuerzo basado en métodos Actor-Critic.
    El crítico estima el valor de una acción en un estado dado.
    """

    def __init__(self):
        """
        Inicializa la red neuronal del crítico.
        Define las capas de la red: una capa para combinar estado y acción, dos capas ocultas y una capa de salida
        que estima el valor del estado-acción.
        """
        super(Critic, self).__init__()
        # Capa completamente conectada que toma la concatenación del estado y la acción como entrada y produce una representación oculta.
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        # Segunda capa completamente conectada para aumentar la capacidad de la red.
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        # Capa de salida que produce una única estimación del valor del estado-acción.
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        """
        Propaga el estado y la acción a través de la red para obtener la estimación del valor del estado-acción.

        :param state: Tensor que representa el estado del entorno.
        :param action: Tensor que representa la acción tomada.
        :return: Valor estimado del estado-acción.
        """
        # Concatenar el estado y la acción para formar la entrada completa para el crítico.
        x = torch.cat([state, action], dim=1)
        # Aplicar ReLU después de la primera capa oculta.
        x = torch.relu(self.fc1(x))
        # Aplicar ReLU después de la segunda capa oculta.
        x = torch.relu(self.fc2(x))
        # Obtener la estimación del valor del estado-acción.
        x = self.fc3(x)
        return x


In [None]:
class SACAgent:
    """
    Clase que define un agente de Aprendizaje por Refuerzo Basado en Actor-Critic con Entropía Suave (Soft Actor-Critic, SAC).
    SAC es un algoritmo que maximiza la recompensa esperada mientras también maximiza la entropía para explorar más.
    """

    def __init__(self):
        """
        Inicializa el agente SAC, creando redes para el actor y dos críticos (con sus redes objetivo),
        optimizadores y un buffer de repetición.
        """
        # Inicialización de la red del actor.
        self.actor = Actor()
        # Inicialización de dos redes críticas (Critic) para la estimación de Q-values.
        self.critic_1 = Critic()
        self.critic_2 = Critic()
        # Inicialización de las redes críticas objetivo (target) para la actualización suave.
        self.target_critic_1 = Critic()
        self.target_critic_2 = Critic()
        # Copia los parámetros de las redes críticas a las redes objetivo.
        self.target_critic_1.load_state_dict(self.critic_1.state_dict())
        self.target_critic_2.load_state_dict(self.critic_2.state_dict())
        # Inicialización de los optimizadores para el actor y los críticos.
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_1_optimizer = optim.Adam(self.critic_1.parameters(), lr=critic_lr)
        self.critic_2_optimizer = optim.Adam(self.critic_2.parameters(), lr=critic_lr)
        # Inicialización del buffer de repetición para almacenar las transiciones.
        self.replay_buffer = ReplayBuffer(buffer_size)

    def select_action(self, state):
        """
        Selecciona una acción basada en el estado dado utilizando el actor.

        :param state: Estado actual del entorno.
        :return: Acción seleccionada por el actor.
        """
        # Convierte el estado a un tensor y añade una dimensión extra para el batch.
        state = torch.FloatTensor(state).unsqueeze(0)
        # Usa el actor para muestrear una acción.
        action = self.actor.sample(state)
        # Retorna la acción como un array de numpy.
        return action.detach().numpy()[0]

    def update(self, batch_size, gamma=gamma, tau=tau, alpha=alpha):
        """
        Actualiza las redes del actor y los críticos utilizando una muestra de transiciones del buffer.

        :param batch_size: Tamaño del minibatch de la muestra.
        :param gamma: Factor de descuento para las recompensas futuras.
        :param tau: Factor de actualización suave para las redes objetivo.
        :param alpha: Parámetro de entropía que controla la cantidad de exploración.
        """
        # Solo actualiza si hay suficiente memoria en el buffer.
        if len(self.replay_buffer) < batch_size:
            return
        # Muestra una muestra del buffer.
        state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
        # Convierte las muestras a tensores.
        state = torch.FloatTensor(state)
        next_state = torch.FloatTensor(next_state)
        action = torch.FloatTensor(action)
        reward = torch.FloatTensor(reward).unsqueeze(1)
        done = torch.FloatTensor(np.float32(done)).unsqueeze(1)

        # Calcula los valores objetivo Q para el siguiente estado utilizando las redes objetivo.
        with torch.no_grad():
            next_state_action = self.actor.sample(next_state)
            target_q1_next = self.target_critic_1(next_state, next_state_action)
            target_q2_next = self.target_critic_2(next_state, next_state_action)
            target_q_min = torch.min(target_q1_next, target_q2_next) - alpha * torch.log(1 - next_state_action.pow(2) + 1e-6)
            target_q = reward + (1 - done) * gamma * target_q_min

        # Actualización de la red crítica 1.
        current_q1 = self.critic_1(state, action)
        critic_1_loss = F.mse_loss(current_q1, target_q)
        self.critic_1_optimizer.zero_grad()
        critic_1_loss.backward()
        self.critic_1_optimizer.step()

        # Actualización de la red crítica 2.
        current_q2 = self.critic_2(state, action)
        critic_2_loss = F.mse_loss(current_q2, target_q)
        self.critic_2_optimizer.zero_grad()
        critic_2_loss.backward()
        self.critic_2_optimizer.step()

        # Actualización de la red del actor.
        entropy = torch.log(1 - self.actor.sample(state).pow(2) + 1e-6)
        actor_loss = (-self.critic_1(state, self.actor.sample(state)) + alpha * entropy).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Actualización suave de las redes objetivo críticas.
        for target_param, param in zip(self.target_critic_1.parameters(), self.critic_1.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
        for target_param, param in zip(self.target_critic_2.parameters(), self.critic_2.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)


In [None]:
env = gym.make("CarRacing-v2")

In [None]:
agent = SACAgent()

In [None]:
num_episodes = 100  # Define el número total de episodios para el entrenamiento.

for episode in range(num_episodes):
    # Reinicia el entorno para comenzar un nuevo episodio.
    state = env.reset()
    episode_reward = 0  # Inicializa el total de recompensa del episodio en 0.
    done = False  # Marca el estado del episodio como no terminado al principio.

    while not done:
        # Selecciona una acción basada en el estado actual utilizando el agente.
        action = agent.select_action(state)
        # Toma la acción en el entorno y recibe el siguiente estado, la recompensa y si el episodio terminó.
        next_state, reward, done, _ = env.step(action)
        # Almacena la transición (estado, acción, recompensa, siguiente estado, done) en el buffer de repetición.
        agent.replay_buffer.push(state, action, reward, next_state, done)
        # Actualiza el agente utilizando una muestra del buffer de repetición.
        agent.update(batch_size)
        # Actualiza el estado actual al siguiente estado.
        state = next_state
        # Acumula la recompensa obtenida en el episodio.
        episode_reward += reward

    # Imprime el total de recompensa acumulada al final del episodio.
    print(f"Episode {episode}: Total Reward: {episode_reward}")
