
# Sistemas Inteligentes


In [1]:
!pip install swig
!pip install gymnasium[classic-control]


Collecting swig
  Downloading swig-4.4.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.4.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.4.1


In [2]:
import math
import random
from collections import deque
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gymnasium as gym
import imageio
from google.colab import files

In [3]:
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super().__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)

    def forward(self, x):
        # Utiliza relu como función de activación
        # fc1 - relu - fc2 - relu - fc3
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

In [4]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        # Define la tupla de experiencia
        experience_tuple = (state, action, reward, next_state, done)
        # Añade la tupla de experiencia a la memoria
        self.memory.append(experience_tuple)

    def __len__(self):
        return len(self.memory)

    def sample(self, batch_size):
        # Toma una muestra aleatoria de la memoria de tamaño batch_size
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states_tensor = torch.tensor(np.array(states), dtype=torch.float32)
        next_states_tensor = torch.tensor(np.array(next_states), dtype=torch.float32)
        rewards_tensor = torch.tensor(rewards, dtype=torch.float32)
        dones_tensor = torch.tensor(dones, dtype=torch.float32)
        actions_tensor = torch.tensor(actions, dtype=torch.long).unsqueeze(1)

        return states_tensor, actions_tensor, rewards_tensor, next_states_tensor, dones_tensor

In [5]:
def select_action(q_values, epsilon):
    if np.random.rand() < epsilon:
        # Escoge la acción aleatoria
        return random.choice(range(len(q_values)))
    else:
        # Escoge la acción greedy
        return torch.argmax(q_values).item()


def epsilon_decay_by_step(step, start, end, decay):
    return end + (start - end) * math.exp(-step / decay)

In [6]:
def update_target_network(target_network, online_network, tau):
    target_net_state_dict = target_network.state_dict()
    online_net_state_dict = online_network.state_dict()

    for key in online_net_state_dict:
        target_net_state_dict[key] = (
            online_net_state_dict[key] * tau +
            target_net_state_dict[key] * (1 - tau))

    target_network.load_state_dict(target_net_state_dict)

In [15]:
env = gym.make("CartPole-v1")
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

online_network = QNetwork(state_size, action_size)
target_network = QNetwork(state_size, action_size)
target_network.load_state_dict(online_network.state_dict())

optimizer = optim.Adam(online_network.parameters(), lr=1e-4)
loss_fn = nn.MSELoss()

buffer_size = 100_000
replay_buffer = ReplayBuffer(capacity=buffer_size)

In [16]:
batch_size = 64
gamma = 0.99
tau = 0.5
num_training_episodes = 1000
max_steps_per_episode = 1000
learning_starts = 1000
target_update_interval = 100

In [17]:
total_steps = 0
for episode in range(num_training_episodes):
    # Reset
    state, _ = env.reset()
    episode_reward = 0.0
    done = False

    for _ in range(max_steps_per_episode):
        total_steps += 1

        # Epsilon decay
        #epsilon = epsilon_decay_by_step(total_steps, start=0.8, end=0.05, decay=100000)
        epsilon = 0.15
        with torch.no_grad():
            state_t = torch.as_tensor(state, dtype=torch.float32).unsqueeze(0)
            q_values = online_network(state_t).squeeze(0)
        # Selecciona la acción
        action = select_action(q_values, epsilon)

        # Ejecuta un paso en el entorno según la acción escogida
        next_state, reward, terminated, truncated, _ = env.step(action)
        # Actualización del indicador done
        done = terminated or truncated

        # Agrega la nueva experiencia al replay buffer
        replay_buffer.push(state, action, reward, next_state, done)

        # Asegura que se hayan ejecutado los timesteps especificados por learning_starts
        # y que el replay_buffer disponga de al menos batch_size experiencias para muestrear
        if (total_steps >= learning_starts) and (len(replay_buffer) >= batch_size):
            # Muestrea un lote de experiencias del replay_buffer de tamaño batch_size
            states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

            # Predicción de la online network
            q_pred = online_network(states).gather(1, actions).squeeze(1)

            # Cálculo del target con la target network
            with torch.no_grad():
              next_actions = online_network(next_states).argmax(dim=1)              # [B]
              next_q = target_network(next_states).gather(1, next_actions.unsqueeze(1)).squeeze(1)  # [B]
              target = rewards + gamma * next_q * (1.0 - dones)


            # Cálculo de la pérdida MSE
            loss = loss_fn(q_pred, target)

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            # Limita la norma de los gradientes para evitar exploding gradient y mejorar estabilidad
            nn.utils.clip_grad_norm_(online_network.parameters(), max_norm=10.0)
            optimizer.step()

            # Copia de los pesos de la online network a la target network
            # cada target_update_interval steps
            if total_steps % target_update_interval == 0:
                update_target_network(target_network, online_network, tau)

        state = next_state
        episode_reward += reward

        if done:
            break

    print(f"Episode {episode} | steps={total_steps} | ε={epsilon:.3f} | reward={episode_reward:.1f}")

Episode 0 | steps=8 | ε=0.150 | reward=8.0
Episode 1 | steps=17 | ε=0.150 | reward=9.0
Episode 2 | steps=28 | ε=0.150 | reward=11.0
Episode 3 | steps=37 | ε=0.150 | reward=9.0
Episode 4 | steps=48 | ε=0.150 | reward=11.0
Episode 5 | steps=60 | ε=0.150 | reward=12.0
Episode 6 | steps=69 | ε=0.150 | reward=9.0
Episode 7 | steps=81 | ε=0.150 | reward=12.0
Episode 8 | steps=92 | ε=0.150 | reward=11.0
Episode 9 | steps=101 | ε=0.150 | reward=9.0
Episode 10 | steps=113 | ε=0.150 | reward=12.0
Episode 11 | steps=122 | ε=0.150 | reward=9.0
Episode 12 | steps=130 | ε=0.150 | reward=8.0
Episode 13 | steps=139 | ε=0.150 | reward=9.0
Episode 14 | steps=150 | ε=0.150 | reward=11.0
Episode 15 | steps=159 | ε=0.150 | reward=9.0
Episode 16 | steps=169 | ε=0.150 | reward=10.0
Episode 17 | steps=179 | ε=0.150 | reward=10.0
Episode 18 | steps=188 | ε=0.150 | reward=9.0
Episode 19 | steps=199 | ε=0.150 | reward=11.0
Episode 20 | steps=209 | ε=0.150 | reward=10.0
Episode 21 | steps=218 | ε=0.150 | reward=9

In [18]:
def evaluate_policy(eval_env, q_net, n_eval_episodes=10, max_steps=1000):
    q_net.eval()
    rewards = []

    for i in range(n_eval_episodes):
        with torch.no_grad():
            state, _ = env.reset()
            ep_reward = 0.0

            for _ in range(max_steps):
                state_t = torch.as_tensor(state, dtype=torch.float32).unsqueeze(0)
                q_values = q_net(state_t).squeeze(0)
                # Selección de la acción greedy - sin exploración
                action = torch.argmax(q_values).item()

                next_state, reward, terminated, truncated, _ = env.step(action)
                ep_reward += float(reward)
                if terminated or truncated:
                    break
                state = next_state
        rewards.append(ep_reward)

    mean_rewards = float(np.mean(rewards))
    std_rewards = float(np.std(rewards, ddof=1)) if len(rewards) >= 2 else 0.0
    return mean_rewards, std_rewards

In [19]:
eval_env = gym.make("CartPole-v1", render_mode="rgb_array")
mean_r, std_r = evaluate_policy(eval_env, online_network, n_eval_episodes=100)
print(f"Mean reward: {mean_r:.2f} | Std reward: {std_r:.2f}")

Mean reward: 500.00 | Std reward: 0.00


In [None]:
frames = []
state, _ = eval_env.reset()
totalReward = 0
for i in range(1000):
    state_t = torch.as_tensor(state, dtype=torch.float32).unsqueeze(0)
    q_values = online_network(state_t).squeeze(0)
    action = torch.argmax(q_values).item()

    next_state, reward, terminated, truncated, _ = eval_env.step(action)
    frame = eval_env.render()
    frames.append(frame)
    totalReward += reward
    if terminated or truncated:
        print("Ended")
        print(i)
        print(terminated)
        print(truncated)
        break
    state = next_state

# Guardar como GIF
imageio.mimsave("cartpole.gif", frames, duration=150)
print("Reward", totalReward)

Ended
8
True
False
Reward 9.0


In [None]:
files.download("cartpole.gif")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
env.close()

In [None]:
eval_env.close()

In [None]:
env = gym.make('CartPole-v1')

In [None]:
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

In [None]:
class PolicyNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)

    def forward(self, state):
        # fc1 - relu - fc2 - relu - fc3 - softmax
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        action_probs = torch.softmax(self.fc3(x), dim=-1)
        return action_probs

In [None]:
class BaseNetwork(nn.Module):
    def __init__(self, state_size):
        super(BaseNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 1)

    def forward(self, state):
        # fc1 - relu - fc2 - relu - fc3 - softmax
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        Gt = torch.relu(self.fc3(x))

        return Gt

In [None]:
def train_reinforce(policy, policy_optimizer, baseNet, baseNet_optimizer,n_episodes, gamma):
    # Almacenamos la recompensa total de cada episodio
    episode_returns = []
    # Iteramos sobre el número de episodios de entrenamiento
    for episode in range(n_episodes):
        # Reseteamos el entorno
        state, _ = env.reset()
        # Almacenamos las (log) probabilidades y las recompensas de cada timestep
        log_probs = []
        rewards = []
        base_rewards = []

        total_reward = 0
        steps = 0
        done = False

        # Red policy en modo entrenamiento
        policy.train()

        while not done:
            # Prepararemos la variable state para su entrada a la red neuronal
            state = torch.from_numpy(state).float().unsqueeze(0)
            # Obtenemos el output de la red policy
            action_probs = policy(state)

            # Escogeremos una acción a partir de esta distribución de probabilidades
            # y guardaremos su (log) probabilidad
            dist = torch.distributions.Categorical(action_probs)
            action = dist.sample()
            log_prob = dist.log_prob(action)

            # Ejecutamos un paso en el entorno con la acción escogida
            next_state, reward, terminated, truncated, _ = env.step(action.item())
            # Actualizamos el indicador de finalización de episodio
            done = terminated or truncated
            base_reward = baseNet(state)
            # Guardamos la información relevante
            log_probs.append(log_prob)
            rewards.append(reward)
            base_rewards.append(base_reward)
            # Actualizamos
            total_reward += reward
            steps += 1
            state = next_state

        # Obtenemos el retorno esperado del episodio recorriéndolo al revés (Monte Carlo)
        loss_base = 0
        returns_base = []
        G = 0
        i = len(rewards)-1
        for r in reversed(rewards):
            G = r + gamma * G
            returns_base.insert(0, G-base_rewards[i])
            loss_base += (G-base_rewards[i])**2
            i -= 1

        log_probs = torch.cat(log_probs)

        # Normalización del retorno para dar estabilidad
        returns_base = torch.tensor(returns_base)

        normalized_returns = (returns_base - returns_base.mean()) / returns_base.std()
        normalized_returns = normalized_returns.detach()

        # Calculamos la función de pérdida
        loss = -(normalized_returns * log_probs).sum()

        # Backpropagation policy
        policy_optimizer.zero_grad()
        loss.backward()
        policy_optimizer.step()

        # Backpropagation base
        baseNet_optimizer.zero_grad()
        loss_base.backward()
        baseNet_optimizer.step()
        episode_returns.append(total_reward)

        if episode % 10 == 0:
            print(f"| Episodio {episode} | Recompensa total: {total_reward} |")

In [None]:
n_episodes = 800
learning_rate = 0.001
gamma = 0.99

policy = PolicyNetwork(state_size, action_size)
baseNet = BaseNetwork(state_size)
optimizer = optim.Adam(policy.parameters(), lr=learning_rate)
base_optimizer = optim.Adam(baseNet.parameters(), lr=learning_rate)
train_reinforce(policy, optimizer, baseNet,base_optimizer,n_episodes, gamma)

Consider using tensor.detach() first. (Triggered internally at /pytorch/torch/csrc/autograd/generated/python_variable_methods.cpp:836.)
  returns_base = torch.tensor(returns_base)


| Episodio 0 | Recompensa total: 10.0 |
| Episodio 10 | Recompensa total: 42.0 |
| Episodio 20 | Recompensa total: 11.0 |
| Episodio 30 | Recompensa total: 17.0 |
| Episodio 40 | Recompensa total: 29.0 |
| Episodio 50 | Recompensa total: 20.0 |
| Episodio 60 | Recompensa total: 14.0 |
| Episodio 70 | Recompensa total: 38.0 |
| Episodio 80 | Recompensa total: 29.0 |
| Episodio 90 | Recompensa total: 52.0 |
| Episodio 100 | Recompensa total: 40.0 |
| Episodio 110 | Recompensa total: 88.0 |
| Episodio 120 | Recompensa total: 49.0 |
| Episodio 130 | Recompensa total: 23.0 |
| Episodio 140 | Recompensa total: 56.0 |
| Episodio 150 | Recompensa total: 35.0 |
| Episodio 160 | Recompensa total: 346.0 |
| Episodio 170 | Recompensa total: 139.0 |
| Episodio 180 | Recompensa total: 40.0 |
| Episodio 190 | Recompensa total: 102.0 |
| Episodio 200 | Recompensa total: 293.0 |
| Episodio 210 | Recompensa total: 303.0 |
| Episodio 220 | Recompensa total: 389.0 |
| Episodio 230 | Recompensa total: 200.

In [None]:
env.close()

In [None]:
eval_env = gym.make('CartPole-v1', render_mode='rgb_array')

In [None]:
def evaluate_agent(policy, env, n_episodes=10):
    # Política en modo evaluación
    policy.eval()
    episode_rewards = []

    for episode in range(n_episodes):
        state, _ = env.reset()
        done = False
        episode_reward = 0

        while not done:
            state = torch.from_numpy(state).float().unsqueeze(0)
            with torch.no_grad():
                action_probs = policy(state)
                # Escogemos acción determinista
                action = torch.argmax(action_probs, dim=-1).item()

            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            episode_reward += reward
            state = next_state

        episode_rewards.append(episode_reward)

    avg_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)
    print(f"\n>>> Recompensa promedio en evaluación sobre {n_episodes} episodios: {avg_reward} +/- {std_reward}")

In [None]:
evaluate_agent(policy, eval_env)


>>> Recompensa promedio en evaluación sobre 10 episodios: 500.0 +/- 0.0


In [None]:
def run_episode(env, policy):
    frames = []
    policy.eval()
    state, _ = env.reset()
    done = False

    while not done:
        state = torch.from_numpy(state).float().unsqueeze(0)
        with torch.no_grad():
            action_probs = policy(state)
            action = torch.argmax(action_probs, dim=-1).item()  # Acción determinista

        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        state = next_state
        frames.append(env.render())

    return frames

In [None]:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import imageio
from IPython.display import HTML
# Crear animación
frames = run_episode(eval_env, policy)
fig = plt.figure()
img = plt.imshow(frames[0])

def animate(i):
    img.set_data(frames[i])
    return [img]

ani = animation.FuncAnimation(fig, animate, frames=len(frames), interval=50, blit=True)
plt.close(fig)
HTML(ani.to_jshtml())

Output hidden; open in https://colab.research.google.com to view.

In [None]:
eval_env.close()