In [9]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers
import random
from collections import deque

In [10]:
def create_q_network(input_shape, action_space):
    model = tf.keras.Sequential([
        layers.InputLayer(input_shape=input_shape),
        layers.Conv2D(32, 8, strides=4, activation='relu'),
        layers.Conv2D(64, 4, strides=2, activation='relu'),
        layers.Conv2D(64, 3, strides=1, activation='relu'),
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.Dense(action_space, activation='linear')
    ])
    return model


In [11]:
def choose_action(state, q_network, epsilon, action_space):
    if random.random() < epsilon:
        return action_space.sample()  # Exploración
    else:
        q_values = q_network.predict(np.expand_dims(state, axis=0), verbose=0)
        return q_values.flatten()  # Explotación


In [12]:
def train_network(q_network, target_network, memory, gamma, batch_size, optimizer):
    if len(memory) < batch_size:
        return  # No entrenar hasta que haya suficiente memoria

    minibatch = random.sample(memory, batch_size)
    states, actions, rewards, next_states, dones = zip(*minibatch)

    states = np.array(states)
    next_states = np.array(next_states)
    
    q_values = q_network.predict(states, verbose=0)
    q_values_next = target_network.predict(next_states, verbose=0)

    for i in range(batch_size):
        target_q = rewards[i]
        if not dones[i]:
            target_q += gamma * np.max(q_values_next[i])
        q_values[i] = q_values[i].flatten()

    with tf.GradientTape() as tape:
        q_values_pred = q_network(states)
        loss = tf.reduce_mean(tf.square(q_values_pred - q_values))
    grads = tape.gradient(loss, q_network.trainable_variables)
    optimizer.apply_gradients(zip(grads, q_network.trainable_variables))


In [13]:
def calculate_distance_to_start(state, start_position):
    # Asume que `state` y `start_position` son posiciones 2D (x, y) del agente en la pista
    return np.linalg.norm(np.array(state[:2]) - np.array(start_position))


In [None]:
def train(episodes, max_steps=1000, batch_size=64, gamma=0.99, epsilon_decay=0.995, epsilon_min=0.1, render=False):
    # Inicialización del entorno y de los parámetros de entrenamiento
    env = gym.make("CarRacing-v3", render_mode='human')
    input_shape = env.observation_space.shape
    action_space = env.action_space
    
    # Inicialización de las redes Q y objetivo
    q_network = create_q_network(input_shape, action_space.shape[0])
    target_network = create_q_network(input_shape, action_space.shape[0])
    target_network.set_weights(q_network.get_weights())

    # Definición del optimizador y parámetros de epsilon para exploración
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
    epsilon = 1.0  # Comenzar con alta exploración
    memory = deque(maxlen=100000)  # Memoria de experiencias
    reward_per_episode = np.zeros(episodes)

    # Parámetros adicionales para el cálculo de vueltas y recompensas
    proximity_threshold = 10.0  # Distancia para considerar que el agente ha completado una vuelta
    lap_reward = 100  # Recompensa adicional por completar una vuelta
    slow_progress_penalty = -10  # Penalización por progreso lento
    min_progress_distance = 5  # Distancia mínima que debe recorrer en cada intervalo

    # Ciclo de entrenamiento a través de los episodios
    for episode in range(episodes):
        # Reiniciar el entorno y establecer el punto inicial
        state, _ = env.reset(seed=42)
        initial_position = state[:2]  # Guardar la posición inicial del agente
        state = np.array(state, dtype=np.float32)
        episode_reward = 0
        done = False
        step_count = 0
        last_position = initial_position  # Posición del último progreso registrado
        progress_interval = max_steps // 10  # Intervalo para verificar progreso

        print(f"Comenzando episodio {episode + 1}")

        while not done and step_count < max_steps:
            # Selección de acción usando la política epsilon-greedy
            action = choose_action(state, q_network, epsilon, action_space)
            next_state, reward, done, _, _ = env.step(action)
            next_state = np.array(next_state, dtype=np.float32)

            # Calcular la distancia actual al punto de inicio
            distance_to_start = calculate_distance_to_start(next_state, initial_position)

            # Penalización adicional si el agente no ha avanzado en cierta distancia
            if step_count % progress_interval == 0:
                distance_covered = calculate_distance_to_start(next_state, last_position)
                if distance_covered < min_progress_distance:
                    reward += slow_progress_penalty  # Penalización por progreso lento
                    print("Penalización por progreso lento aplicada.")
                last_position = next_state[:2]  # Actualizar última posición de progreso

            # Aplicar recompensa adicional si el agente completa una vuelta
            if distance_to_start < proximity_threshold and step_count > max_steps // 2:
                reward += lap_reward
                print("¡Vuelta completada! Recompensa adicional otorgada.")
            
            # Almacenar la transición en la memoria
            memory.append((state, action, reward, next_state, done))
            state = next_state
            episode_reward += reward
            step_count += 1

            # Entrenamiento de la red Q usando experiencias almacenadas en la memoria
            train_network(q_network, target_network, memory, gamma, batch_size, optimizer)

            # Finalizar el episodio si la recompensa acumulada es muy negativa
            if episode_reward < -100:
                done = True
                print(f"Episodio {episode + 1} terminado anticipadamente por baja recompensa acumulada.")
        
        # Reducir epsilon después de cada episodio para disminuir exploración gradualmente
        epsilon = max(epsilon * epsilon_decay, epsilon_min)
        
        # Almacenar la recompensa total del episodio
        reward_per_episode[episode] = episode_reward

        # Sincronizar la red objetivo periódicamente
        if (episode + 1) % 10 == 0:
            target_network.set_weights(q_network.get_weights())

        # Mostrar progreso cada 100 episodios
        if (episode + 1) % 100 == 0:
            print(f"Episodio: {episode + 1} - Recompensa acumulada: {reward_per_episode[episode]}")

    # Graficar la recompensa acumulada por episodio al finalizar el entrenamiento
    plt.plot(reward_per_episode)
    plt.xlabel("Episodios")
    plt.ylabel("Recompensa acumulada")
    plt.title("Desempeño del agente durante el entrenamiento")
    plt.show()
    
    # Evaluación del agente entrenado
    evaluate_agent(env, q_network)


In [15]:
def evaluate_agent(env, q_network, max_steps=1000):
    state, _ = env.reset(seed=42)
    state = np.array(state, dtype=np.float32)
    done = False
    step_count = 0
    total_reward = 0

    while not done and step_count < max_steps:
        action = q_network.predict(np.expand_dims(state, axis=0), verbose=0).flatten()
        state, reward, done, _, _ = env.step(action)
        state = np.array(state, dtype=np.float32)
        total_reward += reward
        step_count += 1
        env.render()

    print(f"Recompensa total en evaluación: {total_reward}")
    env.close()


In [16]:
train(episodes=100)


Comenzando episodio 1
Comenzando episodio 2
Comenzando episodio 3
Comenzando episodio 4
Comenzando episodio 5
Comenzando episodio 6


KeyboardInterrupt: 