In [1]:
# Importación de las librerías necesarias para el entorno, manipulación de datos, redes neuronales y memoria
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import os

# Ruta para guardar y cargar el modelo
# Definir la ruta completa para guardar el modelo
MODEL_PATH = "C:/Users/pablo/OneDrive - usfx/Desktop/pruebasPythorch/Laboratorio6PLC/q_network_model.pth"

# Definir un conjunto de acciones discretas representativas para el entorno
DISCRETE_ACTIONS = [
    [-1.0, 0.5, 0.0],   # Giro fuerte izquierda con aceleración
    [-0.5, 0.5, 0.0],   # Giro suave izquierda con aceleración
    [0.0, 0.8, 0.0],    # Recto con mayor aceleración
    [0.5, 0.5, 0.0],    # Giro suave derecha con aceleración
    [1.0, 0.5, 0.0],    # Giro fuerte derecha con aceleración
    [0.0, 0.0, 0.3]     # Frenado más fuerte
]

# Función para verificar la disponibilidad de CUDA en PyTorch
def check_pytorch_cuda():
    """Check if PyTorch can access the GPU."""
    print("PyTorch CUDA availability:")
    print("CUDA available: ", torch.cuda.is_available())
    print("Device count: ", torch.cuda.device_count())
    if torch.cuda.is_available():
        print("CUDA device name: ", torch.cuda.get_device_name(0))

check_pytorch_cuda()

# Seleccionar el dispositivo CUDA si está disponible para el entrenamiento
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Definir la red Q con PyTorch para calcular los valores Q para cada acción
class QNetwork(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(QNetwork, self).__init__()
        # Definir capas convolucionales para extraer características de las observaciones del entorno
        self.conv1 = nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)

        # Calcular el tamaño de la salida de la última capa convolucional para pasar a las capas densas
        dummy_input = torch.zeros(1, *input_shape)
        conv_out_size = self._get_conv_output(dummy_input)
        
        # Definir las capas densas para calcular el valor Q final para cada acción
        self.fc1 = nn.Linear(conv_out_size, 512)
        self.fc2 = nn.Linear(512, num_actions)

    # Método para obtener el tamaño de salida después de las capas convolucionales
    def _get_conv_output(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        return x.view(1, -1).size(1)

    # Método para el paso hacia adelante (forward) en la red
    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)  # Aplanar la salida de la última capa convolucional
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

# Definir la función para seleccionar acciones usando la política epsilon-greedy
def choose_action(state, q_network, epsilon):
    # Selección de acción aleatoria para exploración
    if random.random() < epsilon:
        return random.choice(DISCRETE_ACTIONS)  # Exploración
    else:
        # Selección de la mejor acción estimada por la red para explotación
        with torch.no_grad():
            state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            q_values = q_network(state_tensor)
            action_idx = torch.argmax(q_values).item()
            return DISCRETE_ACTIONS[action_idx]

# Función para entrenar la red Q usando muestras de la memoria
def train_network(q_network, target_network, memory, gamma, batch_size, optimizer):
    # Asegurarse de que hay suficiente memoria para entrenar
    if len(memory) < batch_size:
        return
    # Seleccionar un batch aleatorio de la memoria
    batch = random.sample(memory, batch_size)
    states, actions, rewards, next_states, dones = zip(*batch)
    
    # Convertir las transiciones en tensores para el entrenamiento
    states = torch.tensor(states, dtype=torch.float32, device=device)
    actions = torch.tensor(actions, dtype=torch.long, device=device)
    rewards = torch.tensor(rewards, dtype=torch.float32, device=device)
    next_states = torch.tensor(next_states, dtype=torch.float32, device=device)
    dones = torch.tensor(dones, dtype=torch.bool, device=device)

    # Calcular los valores Q actuales y los valores objetivo para la actualización
    q_values = q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
    with torch.no_grad():
        next_q_values = target_network(next_states).max(1)[0]
    targets = rewards + gamma * next_q_values * (~dones)

    # Calcular el error cuadrático medio y actualizar la red
    loss = nn.functional.mse_loss(q_values, targets)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Definir la función principal de entrenamiento del agente
def train(episodes, max_steps=500, batch_size=64, gamma=0.99, epsilon_decay=0.995, epsilon_min=0.01, render=False):
    # Crear el entorno CarRacing en modo de renderizado
    env = gym.make("CarRacing-v3", render_mode='human')
    input_shape = (env.observation_space.shape[2],) + env.observation_space.shape[:2]
    num_actions = len(DISCRETE_ACTIONS)
    
    # Crear las redes Q y de objetivo y sincronizar pesos
    q_network = QNetwork(input_shape, num_actions).to(device)
    target_network = QNetwork(input_shape, num_actions).to(device)
    target_network.load_state_dict(q_network.state_dict())
    optimizer = optim.Adam(q_network.parameters(), lr=0.001)
    #Actualizar los pesos de q_network
    epsilon = 1.0
    memory = deque(maxlen=100000)
    #deque para almacenar experiencias pasadas, con un tamaño máximo de 100,000
    reward_per_episode = np.zeros(episodes)
    #Arreglo para almacenar la recompensa total obtenida en cada episodio.
    
    # Bucle principal de entrenamiento por episodio
    for episode in range(episodes):
        if (episode+1) % 1 == 0:
            env.close()
            env = gym.make("CarRacing-v3", render_mode="human")
        else:
            env.close()
            env = gym.make("CarRacing-v3")

        state, _ = env.reset(seed=42)
        state = np.transpose(state, (2, 0, 1))
        #(alto, ancho, canales) a (canales, alto, ancho)
        episode_reward = 0
        #recompensas acumuladas en cada episodio
        done = False
        step_count = 0
        #pasos o acciones
        print(f"Comenzando episodio {episode + 1}")

        # Bucle de pasos dentro de cada episodio
        while not done and step_count < max_steps:
            action = choose_action(state, q_network, epsilon)
            action_idx = DISCRETE_ACTIONS.index(action)
            next_state, reward, done, _, _ = env.step(np.array(action, dtype=np.float32))
            #toma una accion y ejecuta un paso 
            
            # Añadir recompensa si el agente acelera y penalizar giros fuertes
            if action[1] > 0:
                reward += 1.0  # Recompensa adicional por acelerar
            if reward > 0 and action_idx == 2:
                reward += 0.5
            elif action_idx in [0, 4]:
                reward -= 5

            next_state = np.transpose(next_state, (2, 0, 1))
            # Reorganaizar las dimensiones de la observación del entorno
            memory.append((state, action_idx, reward, next_state, done))
            state = next_state
            episode_reward += reward
            step_count += 1
            train_network(q_network, target_network, memory, gamma, batch_size, optimizer)
            #train_network se llama para entrenar q_network con un batch de experiencias de 
            # memory, ajustando los pesos de q_network en función de los valores objetivo
        
        # Reducir epsilon gradualmente para favorecer la explotación
        epsilon = max(epsilon * epsilon_decay, epsilon_min)
        #Se reduce para que el agente explore menos con el tiempo
        reward_per_episode[episode] = episode_reward

        print( f"Recompensa del episodio : {episode_reward}")

        # Actualizar los pesos de la red de objetivo cada 10 episodios
        if (episode + 1) % 10 == 0:
            target_network.load_state_dict(q_network.state_dict())
            #e actualiza copiando los pesos de q_network. 
            # Esto estabiliza el entrenamiento al reducir la varianza en los valores Q
    
    torch.save(q_network.state_dict(), MODEL_PATH)
    print(f"Modelo guardado en {MODEL_PATH}")
    # Graficar el desempeño del agente en términos de recompensa acumulada por episodio
    plt.plot(reward_per_episode)
    plt.xlabel("Episodios")
    plt.ylabel("Recompensa acumulada")
    plt.title("Desempeño del agente durante el entrenamiento")
    plt.show()

# Llamar a la función de entrenamiento con 1000 episodios
train(episodes=100)

PyTorch CUDA availability:
CUDA available:  True
Device count:  1
CUDA device name:  NVIDIA GeForce RTX 4060 Laptop GPU
Comenzando episodio 1


  states = torch.tensor(states, dtype=torch.float32, device=device)


Recompensa del episodio : -333.0265017667845
Comenzando episodio 2


KeyboardInterrupt: 

In [None]:
print(q_values)