
#Tarea 3
---
*03-06-2023*

*Martín Gallegos*

# Identificación

In [None]:
Nombre = "Bruno Farfán"
id = "2064230J"

# Preguntas Teóricas:

## Pregunta 1:

Que el problema sea de tipo Markoviano es importante, ya que nos permite resumir todo el entorno en el estado actual del modelo (ya que la probabilidad de ocurrencia de estados futuros solamente depende del estado actual), haciendo que el Actor pueda tomar una acción basandose exclusivamente en el estado actual, y con este obtener una imagen completa del mundo en el que se encuentra.
Si el problema no fuese Markoviano, entonces el modelo tendría que tener en consideración los estados pasados para tomar una acción y aunque el algoritmo considera estocasticidad y por lo tanto tiene resultados futuros inciertos, como se mencionó, esta incertidumbre depende exclusivamente del estado actual y no de los estados pasados.

## Pregunta 2:

La función `Q(s,a)` nos entrega la recompensa de ejecutar la acción `a` cuando estamos en el estado `s`. Es decir, nos está diciendo que tan bueno o malo es tomar una acción `a` cuando nos encontramos en el estado `s`.
Nos interesa estimarla, debido a que al decirnos que tan buena o mala es cierta acción en determinado estado, nos está dando una guía o mapa para proceder dentro del mundo del agente. Con una función `Q` precisa el agente podrá predecir mejor el impacto de sus decisiones y planificar sus acciones futuras, generando así una política cada vez mejor para interactuar con su mundo.

## Pregunta 3:

La implementación de dos redes neuronales para estimar la función `Q` es debido a que tener dos redes neuronales ayuda a evitar el sesgo de sobreestimación, lo que es común en modelos de RL basados en estimación de la función `Q`. Al entrenar se usan dos redes independientes y se actualizan los parámetros con el mínimo entre las estimaciones de ambas redes.
Además al tener dos redes el entrenamiento se logra hacer más estable ya que las redes pueden irse compensando mutuamente.

## Pregunta 4:

Que un algoritmo de aprendizaje RL sea _model-free_ significa que no necesita un modelo explícito de su entorno para aprender a tomar decisiones. Es decir el agente aprende mediante sus interacciones con el entorno y no necesita conocer la función de recompensa.
Que un algoritmo sea _off-policy_ quiere decir que puede aprender la política óptima utilizando experiencias que no fueron generadas por la política que está siendo actualmente aprendida. Es decir, puede aprender de un _buffer_ que fue generado por otra política.

## Pregunta 5:

El algoritmo SAC aborda el _trade-off_ entre exploración y explotación mediante el uso de **entropía**. La entropía mide el grado de incertidumbre de la política de decisión del agente. El algoritmo SAC incluye un termino de entropía en la función objetivo con el propósito de fomentar la exploración. Esta se pondera con un parámtro α, que aumenta o disminuye la tendencía a la exploración.

## Pregunta 6:

El algoritmo DQN está diseñado para operar en entornos discretos, con cantidad finita de acciones y conocida. Como el problema abordado involucra el control de un brazo robótico con dos grados de libertad, la cantidad de acciones es infinita, por lo que un algoritmo como DQN no serviría. Sería necesario discretizar las acciones del brazo, lo que podría llevar a errores en la precisión y estabilidad del robot.

# Librerias

## PyTorch

In [1]:
from torch.distributions.normal import Normal
import torchvision.transforms as TF
from collections import namedtuple
import torch.nn.functional as F
import torch.optim as optim
import torch.nn as nn
import torch

## Librerias *comunes*

In [2]:
import os
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import gymnasium as gym

# Setup Reacher v4

In [3]:
env = gym.make("Reacher-v4", render_mode ="rgb_array")

observation, info = env.reset()

# for _ in range(1000):
#     action = env.action_space.sample()  # agent policy that uses the observation and info
#     observation, reward, terminated, truncated, info = env.step(action)
#     obs = env.render()

#     if terminated or truncated:
#         observation, info = env.reset()

# env.close()
# plt.imshow(obs)
# plt.show()

# Aprendizaje Reforzado

No es estrictamente necesario realizar un pre-procesamiento de los estados del sistema, a diferencia de otros problemas en ML. Sin embargo, esto no descarta que pueda ser útil para el entrenamiento y la optimización realizar dicho proceso.
Un tipo común de preprocesamiento que se peude considerar es normalizar los estados del sistema. Normalizar los estados puede ser útil para estandarizar la escala de las observaciones y hacer que el aprendizaje sea más eficiente. Se puede calcular la media y la desviación estándar de los estados observados durante un período de tiempo y luego usarlas para normalizar los estados en cada paso de tiempo. Esto puede ayudar a estabilizar el entrenamiento y mejorar la convergencia del modelo.

Como se mencionó, en este caso el pre-procesamiento no es necesario debido a que el agente SAC ya es capaz de aprender usando los datos tal y como los entrega el `env` de `gymnasium`.

En cuanto a la secuencialidad y la dinámica del sistema, estas están implícitas en la forma en que se diseñó el entorno Reacher-v4. El entorno tiene una representación interna del estado del sistema que evoluciona con el tiempo a medida que se toman acciones. La secuencialidad y la dinámica del sistema se reflejan en las observaciones que recibe tu agente en cada paso de tiempo y en cómo estas observaciones cambian en respuesta a las acciones tomadas.

## ReplayBuffer

In [10]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = deque(maxlen=capacity)

    def append(self, experience):
        self.buffer.append(experience)

    def sample_batch(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return {
            'states': torch.tensor(np.array(states), dtype=torch.float32),
            'actions': torch.tensor(np.array(actions), dtype=torch.float32),
            'rewards': torch.tensor(np.array(rewards), dtype=torch.float32),
            'next_states': torch.tensor(np.array(next_states), dtype=torch.float32),
            'dones': torch.tensor(np.array(dones), dtype=torch.float32)
        }

    def initialise_buffer(self, env, size):
        state, _ = env.reset()
        for _ in range(size):
            action = env.action_space.sample()
            next_state, reward, done, truncated, _ = env.step(action)
            experience = (state, action, reward, next_state, done or truncated)
            self.append(experience)
            state = next_state
            if done or truncated:
                state, _ = env.reset()

## Actor Network

In [5]:
class ActorNetwork(nn.Module):
    def __init__(self, input_dims, num_actions, hidden_dims, name='actor'):
        super(ActorNetwork, self).__init__()
        self.input_dims = input_dims
        self.hidden_dims = hidden_dims
        self.num_actions = num_actions
        self.name = name

        self.classifier = nn.Sequential(
            nn.Linear(input_dims, hidden_dims),
            nn.ReLU(),
            nn.Linear(hidden_dims, hidden_dims),
            nn.ReLU()
        )

        self.mu = nn.Sequential(
            nn.Linear(hidden_dims, num_actions),
            nn.Tanh()
        )

        self.sigma = nn.Sequential(
            nn.Linear(hidden_dims, num_actions),
            nn.Softplus()
        )

    def forward(self, x):
        x = self.classifier(x)
        mu = self.mu(x)
        log_sigma = self.sigma(x)
        return mu, log_sigma

## Critic Network

In [6]:
class CriticNetwork(nn.Module):
    def __init__(self, input_dims, num_actions, output_dims, hidden_dims, name='critic'):
        super(CriticNetwork, self).__init__()
        self.input_dims = input_dims
        self.output_dims = output_dims
        self.hidden_dims = hidden_dims
        self.num_actions = num_actions
        self.name = name

        self.classifier = nn.Sequential(
            nn.Linear(input_dims + num_actions, hidden_dims),
            nn.ReLU(),
            nn.Linear(hidden_dims, hidden_dims),
            nn.ReLU(),
            nn.Linear(hidden_dims, output_dims)
        )

    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        out = self.classifier(x)
        return out

## SAC Agent

In [7]:
import os

def save_checkpoint(agent, episode, checkpoint_dir='checkpoints'):
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)
    torch.save(agent.critic1.state_dict(), os.path.join(checkpoint_dir, f'critic1_{episode}.pth'))
    torch.save(agent.critic2.state_dict(), os.path.join(checkpoint_dir, f'critic2_{episode}.pth'))
    torch.save(agent.target_critic1.state_dict(), os.path.join(checkpoint_dir, f'target_critic1_{episode}.pth'))
    torch.save(agent.target_critic2.state_dict(), os.path.join(checkpoint_dir, f'target_critic2_{episode}.pth'))
    torch.save(agent.actor.state_dict(), os.path.join(checkpoint_dir, f'actor_{episode}.pth'))

In [21]:
Rollout = namedtuple('Rollout', ['state', 'action', 'reward', 'next_state', 'done'])

class SACAgent():
    def __init__(self, env, lr, gamma, soft_update_tau, memory_size, hidden_size, alpha=0.2, log_std_range=[-20,2]):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.env = env
        self.n_states = self.env.observation_space.shape[0]
        self.n_actions = self.env.action_space.shape[0]
        self.lr = lr
        self.gamma = gamma
        self.alpha = alpha

        self.tau = soft_update_tau
        self.hidden_size = hidden_size
        self.min_clamp = log_std_range[0]
        self.max_clamp = log_std_range[-1]

        self._init_model()
        self.update_target_networks(tau=1)

        self.criterion = nn.MSELoss()

        self.memory = ReplayBuffer(capacity=memory_size)

    def _init_model(self):
        self.critic1 = CriticNetwork(self.n_states, self.n_actions, 1, self.hidden_size).to(self.device)
        self.critic2 = CriticNetwork(self.n_states, self.n_actions, 1, self.hidden_size).to(self.device)
        self.target_critic1 = CriticNetwork(self.n_states, self.n_actions, 1, self.hidden_size).to(self.device)
        self.target_critic2 = CriticNetwork(self.n_states, self.n_actions, 1, self.hidden_size).to(self.device)
        self.actor = ActorNetwork(self.n_states, self.n_actions, self.hidden_size).to(self.device)

        self.critic_optim1 = optim.Adam(self.critic1.parameters(), lr=self.lr)
        self.critic_optim2 = optim.Adam(self.critic2.parameters(), lr=self.lr)
        self.actor_optim = optim.Adam(self.actor.parameters(), lr=self.lr)

    def update_target_networks(self, tau=None):
        tau = self.tau if tau is None else tau
        for target_param, param in zip(self.target_critic1.parameters(), self.critic1.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
        for target_param, param in zip(self.target_critic2.parameters(), self.critic2.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

    def get_action(self, state):
        state = torch.tensor(state, dtype=torch.float32).to(self.device)
        mean, log_sigma = self.actor(state)
        sigma = log_sigma.exp().clamp(self.min_clamp, self.max_clamp)
        normal = torch.distributions.Normal(mean, sigma)
        z = normal.rsample()
        action = torch.tanh(z)
        log_prob = normal.log_prob(z) - torch.log(1 - action**2 + 1e-7)
        log_prob = log_prob.sum(-1, keepdim=True)
        return action, log_prob

    def critic_loss(self, samples):
        states = samples['states'].to(self.device)
        actions = samples['actions'].to(self.device)
        rewards = samples['rewards'].to(self.device)
        next_states = samples['next_states'].to(self.device)
        dones = samples['dones'].to(self.device)

        with torch.no_grad():
            next_actions, next_log_probs = self.get_action(next_states)
            next_q1 = self.target_critic1(next_states, next_actions)
            next_q2 = self.target_critic2(next_states, next_actions)
            next_q = torch.min(next_q1, next_q2) - self.alpha * next_log_probs
            target_q = rewards + (self.gamma * (1 - dones) * next_q)

        pred_q1 = self.critic1(states, actions)
        pred_q2 = self.critic2(states, actions)

        loss1 = self.criterion(pred_q1, target_q.detach())
        loss2 = self.criterion(pred_q2, target_q.detach())

        return loss1, loss2

    def actor_loss(self, states):
        actions, log_probs = self.get_action(states)
        q1 = self.critic1(states, actions)
        q2 = self.critic2(states, actions)
        q = torch.min(q1, q2)

        policy_loss = (self.alpha * log_probs - q).mean()
        return policy_loss, log_probs

    def _choose_action(self, state, random=False):
        if random:
            actions = self.env.action_space.sample()
        else:
            with torch.no_grad():
                actions, _ = self.get_action(state)
        return actions

    def learn(self, samples, target_update=False):
        critic_loss1, critic_loss2 = self.critic_loss(samples)
        self.critic_optim1.zero_grad()
        self.critic_optim2.zero_grad()
        critic_loss1.backward()
        critic_loss2.backward()
        self.critic_optim1.step()
        self.critic_optim2.step()

        actor_loss, _ = self.actor_loss(samples['states'].to(self.device))
        self.actor_optim.zero_grad()
        actor_loss.backward()

        for name, param in self.actor.named_parameters():
            if param.grad is None:
                print(f'No gradient for {name}')

        self.actor_optim.step()

        if target_update:
            self.update_target_networks()
        return critic_loss1.item(), critic_loss2.item(), actor_loss.item()

    def train(self, n_episode=1000, batch_size=64, report_freq=10,
              checkpoint_freq=50, initial_memory=1024, target_actualizations=100):
        self.memory.initialise_buffer(self.env, size=initial_memory)
        results = []
        for i in range(n_episode):
            state, _ = self.env.reset()
            done, truncated = False, False
            eps_reward = 0
            steps = 0

            while not (done or truncated):
                action = self._choose_action(state)
                action = action.detach().cpu().numpy()
                next_state, reward, done, truncated, _ = self.env.step(action)
                roll = Rollout(state, action, reward, next_state, done or truncated)
                self.memory.append(roll)
                state = next_state

                samples = self.memory.sample_batch(batch_size)
                update_target_networks = (i % np.ceil(n_episode / target_actualizations) == 0)
                critic_loss1, critic_loss2, actor_loss = self.learn(samples, target_update=update_target_networks)

                eps_reward += reward
                steps += 1

            results.append(eps_reward)

            if i % report_freq == 0:
                print(f'Episode {i}/{n_episode} \t Reward: {eps_reward:.4f} \t Critic Loss: {critic_loss1:.3f} \t Actor Loss: {actor_loss:.3f}')

            if i % checkpoint_freq == 0:
                save_checkpoint(self, i)

        return results

# Train

In [22]:
# Define el nombre del entorno y el número de episodios
env_name = "Reacher-v4"
epos = 1000

# Crea el entorno
env = gym.make(env_name)
print(np.shape(env.observation_space.sample()))
print(np.shape(env.action_space.sample()))

# Inicializa el agente SAC con los parámetros apropiados
agent = SACAgent(env, lr=0.0001, gamma=0.99, soft_update_tau=0.005, alpha=0.5,
                 memory_size=10000, hidden_size=256)

# Entrena al agente
learning_data = agent.train(n_episode=epos, batch_size=256, report_freq=10, checkpoint_freq=100)

# Grafica los resultados
plt.plot(learning_data, label='Reward over episodes')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('SAC Training on Reacher-v4')
plt.grid()
plt.legend()
plt.show()

# Cierra el entorno
env.close()

(11,)
(2,)


  state = torch.tensor(state, dtype=torch.float32).to(self.device)


Episode 0/1000 	 Reward: -61.5931 	 Critic Loss: 0.439 	 Actor Loss: 0.130
Episode 10/1000 	 Reward: -45.3458 	 Critic Loss: 0.223 	 Actor Loss: 0.006
Episode 20/1000 	 Reward: -46.5470 	 Critic Loss: 0.235 	 Actor Loss: 0.041
Episode 30/1000 	 Reward: -50.0572 	 Critic Loss: 0.242 	 Actor Loss: 0.149
Episode 40/1000 	 Reward: -52.4593 	 Critic Loss: 0.251 	 Actor Loss: 0.188
Episode 50/1000 	 Reward: -54.7397 	 Critic Loss: 0.239 	 Actor Loss: 0.227
Episode 60/1000 	 Reward: -49.3942 	 Critic Loss: 0.231 	 Actor Loss: 0.313
Episode 70/1000 	 Reward: -41.4976 	 Critic Loss: 0.226 	 Actor Loss: 0.378
Episode 80/1000 	 Reward: -52.4545 	 Critic Loss: 0.228 	 Actor Loss: 0.438
Episode 90/1000 	 Reward: -53.0727 	 Critic Loss: 0.242 	 Actor Loss: 0.483
Episode 100/1000 	 Reward: -50.7237 	 Critic Loss: 0.236 	 Actor Loss: 0.535
Episode 110/1000 	 Reward: -52.1891 	 Critic Loss: 0.271 	 Actor Loss: 0.593
Episode 120/1000 	 Reward: -56.5967 	 Critic Loss: 0.259 	 Actor Loss: 0.746
Episode 13

KeyboardInterrupt: 