In [2]:
import random
import collections
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim

# ---------------------------
# Red neuronal para Q(s,a)
# ---------------------------
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def forward(self, x):
        return self.fc(x)


# ---------------------------
# Replay Buffer
# ---------------------------
class ReplayBuffer:
    def __init__(self, capacity=10000):
        self.buffer = collections.deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states), np.array(actions), np.array(rewards, dtype=np.float32),
                np.array(next_states), np.array(dones, dtype=np.float32))

    def __len__(self):
        return len(self.buffer)


# ---------------------------
# Entrenamiento DQN
# ---------------------------
def train_dqn(env, episodes=500, gamma=0.99, batch_size=64, lr=1e-3):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    q_net = DQN(state_dim, action_dim).to(device)
    optimizer = optim.Adam(q_net.parameters(), lr=lr)
    replay_buffer = ReplayBuffer()

    epsilon = 1.0
    epsilon_min = 0.01
    epsilon_decay = 0.995

    for ep in range(episodes):
        state, _ = env.reset()
        total_reward = 0

        while True:
            # Política epsilon-greedy
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
                q_values = q_net(state_tensor)
                action = q_values.argmax().item()

            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            replay_buffer.push(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward

            # Entrenamiento
            if len(replay_buffer) >= batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

                states = torch.FloatTensor(states).to(device)
                actions = torch.LongTensor(actions).to(device)
                rewards = torch.FloatTensor(rewards).to(device)
                next_states = torch.FloatTensor(next_states).to(device)
                dones = torch.FloatTensor(dones).to(device)

                q_values = q_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
                next_q_values = q_net(next_states).max(1)[0]
                target = rewards + gamma * next_q_values * (1 - dones)

                loss = nn.MSELoss()(q_values, target.detach())

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            if done:
                break

        epsilon = max(epsilon_min, epsilon * epsilon_decay)
        print(f"Episode {ep+1}: total_reward={total_reward}, epsilon={epsilon:.3f}")

    return q_net

# ---------------------------
# Evaluación del agente entrenado
# ---------------------------
def test_dqn(env, q_net, episodes=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    q_net.eval()  # modo evaluación
    for ep in range(episodes):
        state, _ = env.reset()
        total_reward = 0
        done = False
        while not done:
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
            with torch.no_grad():
                q_values = q_net(state_tensor)
                action = q_values.argmax().item()
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            state = next_state
            total_reward += reward
        print(f"[TEST] Episodio {ep+1}: recompensa total = {total_reward}")



# ---------------------------
# Main
# ---------------------------
if __name__ == "__main__":
    env = gym.make("CartPole-v1", render_mode="human")
    trained_qnet = train_dqn(env, episodes=200)

    # Probar la red entrenada
    test_dqn(env, trained_qnet, episodes=5)

Episode 1: total_reward=39.0, epsilon=0.995
Episode 2: total_reward=11.0, epsilon=0.990
Episode 3: total_reward=45.0, epsilon=0.985
Episode 4: total_reward=15.0, epsilon=0.980
Episode 5: total_reward=13.0, epsilon=0.975
Episode 6: total_reward=30.0, epsilon=0.970
Episode 7: total_reward=28.0, epsilon=0.966
Episode 8: total_reward=21.0, epsilon=0.961
Episode 9: total_reward=16.0, epsilon=0.956
Episode 10: total_reward=20.0, epsilon=0.951
Episode 11: total_reward=23.0, epsilon=0.946
Episode 12: total_reward=20.0, epsilon=0.942
Episode 13: total_reward=28.0, epsilon=0.937
Episode 14: total_reward=41.0, epsilon=0.932
Episode 15: total_reward=32.0, epsilon=0.928
Episode 16: total_reward=34.0, epsilon=0.923
Episode 17: total_reward=10.0, epsilon=0.918
Episode 18: total_reward=17.0, epsilon=0.914
Episode 19: total_reward=12.0, epsilon=0.909
Episode 20: total_reward=12.0, epsilon=0.905
Episode 21: total_reward=47.0, epsilon=0.900
Episode 22: total_reward=27.0, epsilon=0.896
Episode 23: total_r