Esta notebook contiene bloques de código útiles para realizar Q-learning en el entorno "Descent Env"

In [None]:
import numpy as np
from descent_env import DescentEnv
from q_learning_agent import QLearningAgent
from q_learning_stochastic_agent import QLearningStochasticAgent

In [None]:
# Cambiar render_mode a rgb_array para entrenar/testear
from env_recorder_wrapper import VideoRecorderWrapper 
import wandb
env = DescentEnv(render_mode='rgb_array')

Observation Space

In [None]:
env.observation_space

Action Space

In [None]:
env.action_space

Discretización de los estados

**Nota:** es importante que chequeen el espacio de observación y el espacio de acción del entorno. Los números usados son ejemplos y pueden no ser correctos

In [None]:
altitude_space = np.concatenate([
    np.linspace(0, 0.2, 2, endpoint=False),
    np.linspace(0.2, 0.8, 6),
    np.linspace(0.8, 1.3, 3),
    np.linspace(1.3, 2.3,3),
    np.linspace(2.3, 2.5, 3)]) 

vertical_velocity_space = np.linspace(-2.7,2.5, 3)

target_altitude_space = np.concatenate([
    np.linspace(0,0.2, 2, endpoint=False),
    np.linspace(0.2, 0.8, 12),
    np.linspace(0.8, 0.9, 2, )])

#runway_distance_space = np.concatenate([
#    np.linspace(-0.6, -0.5, 3, endpoint=False),
#    np.linspace(-0.5, 0.1, 8),
#    np.linspace(0.1, 1, 3)])

runway_distance_space = np.concatenate([
    np.linspace(-0.3, -0.01, 3, endpoint=False),  # post-pista (negativos)
    np.linspace(-0.01, 0.1, 6),                   # justo antes y sobre pista
    np.linspace(0.1, 0.5, 4),                     # aproximación cercana
    np.linspace(0.5, 1.2, 3)                      # aproximación lejana
])
#print("altitude_space:", altitude_space)
#print("vertical_velocity_space:", vertical_velocity_space)
#print("target_altitude_space:", target_altitude_space)
#print("runway_distance_space:", runway_distance_space)


#import gymnasium as gym
#from descent_env import DescentEnv
#import numpy as np
#import random
#
#env = DescentEnv(render_mode="human") 
#obs, info = env.reset()
#
#num_episodes = 5 
#max_steps_per_episode = 500 
#
#all_altitudes = []
#all_vertical_velocities = []
#all_target_altitudes = []
#all_runway_distances = []
#
#for episode in range(num_episodes):
#    obs, info = env.reset()
#    done = False
#    steps = 0
#    print(f"\n--- Episodio {episode + 1} ---")
#    while not done and steps < max_steps_per_episode:
#        # Aquí generas una acción aleatoria válida para tu entorno
#        actions = [-1.0, -0.5, 0.0, 0.5, 1.0] # Acciones discretas sugeridas en el notebook
#        action = np.array([random.choice(actions)])
#
#        obs, reward, done, truncated, info = env.step(action)
#
#        # Recolectar datos
#        all_altitudes.append(obs["altitude"][0])
#        all_vertical_velocities.append(obs["vz"][0])
#        all_target_altitudes.append(obs["target_altitude"][0])
#        all_runway_distances.append(obs["runway_distance"][0])
#        steps += 1
#        
#    env.render()
#    env.close()
#
## Analizar los datos recolectados
#print("\n--- Análisis de rangos observados ---")
#print(f"Altitud: Min={np.min(all_altitudes):.2f}, Max={np.max(all_altitudes):.2f}")
#print(f"Velocidad Vertical: Min={np.min(all_vertical_velocities):.2f}, Max={np.max(all_vertical_velocities):.2f}")
#print(f"Altitud Objetivo: Min={np.min(all_target_altitudes):.2f}, Max={np.max(all_target_altitudes):.2f}")
#print(f"Distancia Pista: Min={np.min(all_runway_distances):.2f}, Max={np.max(all_runway_distances):.2f}")
#"""

In [None]:
actions = list(np.linspace(-1, 1, 10))
agent = QLearningStochasticAgent(
    altitude_space, 
    vertical_velocity_space, 
    target_altitude_space,
    runway_distance_space,  
    actions=actions,
    env=env
)

In [None]:
episodes = 6000
epsilon = 0.99
gamma = 0.5
alpha = 0.5
rewards = agent.train_agent(env=env, episodes=episodes, epsilon=epsilon, gamma=gamma, alpha=alpha)

Obtener el estado a partir de la observación

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def calculate_q_mean_heatmap_general(q_table, action_values, state_spaces, target_dim):
    """
    Calcula un mapa de calor de Q(s, a) promediado sobre las demás dimensiones.

    Args:
        q_table (np.ndarray): La tabla Q.
        action_values (list): Lista de valores de las acciones.
        state_spaces (list): Lista de espacios discretizados para cada dimensión (s1, s2, s3, s4).
        target_dim (int): Índice de la dimensión principal (0 para s1, 1 para s2, etc.).

    Returns:
        np.ndarray: Matriz de tamaño (len(state_spaces[target_dim]), len(action_values)) con los valores promedios.
    """
    heatmap = np.zeros((len(state_spaces[target_dim]), len(action_values)))
    other_dims = [i for i in range(len(state_spaces)) if i != target_dim]

    for target_index in range(len(state_spaces[target_dim])):
        for action_index in range(len(action_values)):
            q_sum = 0
            count = 0
            for indices in np.ndindex(*[len(state_spaces[dim]) for dim in other_dims]):
                full_index = [0] * len(state_spaces)
                full_index[target_dim] = target_index
                for i, dim in enumerate(other_dims):
                    full_index[dim] = indices[i]
                full_index.append(action_index)
                q_sum += q_table[tuple(full_index)]
                count += 1
            heatmap[target_index, action_index] = q_sum / count

    return heatmap

def plot_q_mean_heatmap_general(heatmap, state_space, action_values, state_label, title):
    """
    Grafica un mapa de calor para Q(s, a) promedio.

    Args:
        heatmap (np.ndarray): Matriz (s, a) con valores promedio.
        state_space (list): Valores discretizados para la dimensión principal.
        action_values (list): Lista de valores de las acciones.
        state_label (str): Etiqueta para la dimensión principal (e.g., "Altitud", "Velocidad Vertical").
        title (str): Título del gráfico.
    """
    plt.figure(figsize=(12, 6))
    im = plt.imshow(heatmap, aspect='auto', cmap='viridis', origin='lower',
                    extent=[action_values[0], action_values[-1], state_space[0], state_space[-1]])
    plt.colorbar(im, label="Q-value promedio")
    plt.xlabel("Acciones")
    plt.ylabel(state_label)
    plt.title(title)
    plt.tight_layout()
    plt.show()

# Graficar mapas de calor para todas las dimensiones
dimensions = ["Altitud (s1)", "Velocidad Vertical (s2)", "Altitud Objetivo (s3)", "Distancia a la Pista (s4)"]
state_spaces = [altitude_space, vertical_velocity_space, target_altitude_space, runway_distance_space]

for target_dim, dimension_label in enumerate(dimensions):
    heatmap = calculate_q_mean_heatmap_general(
        q_table=agent.q,
        action_values=actions,
        state_spaces=state_spaces,
        target_dim=target_dim
    )
    plot_q_mean_heatmap_general(
        heatmap=heatmap,
        state_space=state_spaces[target_dim],
        action_values=actions,
        state_label=dimension_label,
        title=f"Mapa de calor de Q({dimension_label}, a) promediando el resto de los estados"
    )

In [None]:
average_reward = np.mean(rewards)
print(f"Promedio de recompensas: {average_reward}")

In [None]:
import matplotlib.pyplot as plt
plt.plot(rewards)
plt.xlabel('Episodio')
plt.ylabel('Recompensa')
plt.title('Recompensas por episodio')
plt.show()

In [None]:
test_rewards = agent.test_agent(env, episodes=500)
test_average_reward = np.mean(test_rewards)
print(f"Promedio de recompensas en test: {test_average_reward}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Agrupar recompensas por intervalos de episodios
interval = 100  # Número de episodios por grupo
grouped_rewards = [rewards[i:i + interval] for i in range(0, len(rewards), interval)]

# Crear el boxplot
plt.figure(figsize=(10, 6))
plt.boxplot(grouped_rewards, patch_artist=True, boxprops=dict(facecolor='lightblue'))

# Etiquetas y título
plt.xlabel('Grupo de episodios (x100)')
plt.ylabel('Recompensa')
plt.title('Distribución de recompensas por grupo de episodios de TRAIN')
plt.xticks(ticks=range(1, len(grouped_rewards) + 1), labels=[f'{i*interval}-{(i+1)*interval}' for i in range(len(grouped_rewards))], rotation=45)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Agrupar recompensas de test por intervalos de episodios
interval = 100  # Número de episodios por grupo
grouped_test_rewards = [test_rewards[i:i + interval] for i in range(0, len(test_rewards), interval)]

# Crear el boxplot para las recompensas de test
plt.figure(figsize=(10, 6))
plt.boxplot(grouped_test_rewards, patch_artist=True, boxprops=dict(facecolor='lightgreen'))

# Etiquetas y título
plt.xlabel('Grupo de episodios de test (x100)')
plt.ylabel('Recompensa')
plt.title('Distribución de recompensas por grupo de episodios de TEST')
plt.xticks(ticks=range(1, len(grouped_test_rewards) + 1), labels=[f'{i*interval}-{(i+1)*interval}' for i in range(len(grouped_test_rewards))], rotation=45)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()

In [None]:
human_env = DescentEnv(render_mode="human")
agent.test_agent(human_env, episodes=10)

In [None]:
wandb.login(key="b1666b9050a5ade20a5130837a3c3c5ac2e39580")
wandb.init(project="descent_env_training", name="training_run")
# Guardar hiperparámetros
wandb.config.update({
    "epsilon": epsilon,
    "gamma": gamma,
    "alpha": alpha,
    "episodes": episodes,
})

# Guardar discretización de los datos
wandb.log({
    "test_average_reward": test_average_reward
})
wandb.finish()