Esta notebook contiene bloques de código útiles para realizar Q-learning en el entorno "Descent Env"

In [1]:
import numpy as np
from descent_env import DescentEnv
import random 

pygame 2.5.2 (SDL 2.28.3, Python 3.10.8)
Hello from the pygame community. https://www.pygame.org/contribute.html
Using Python-based geo functions


In [2]:
# Cambiar render_mode a rgb_array para entrenar/testear
# env = DescentEnv(render_mode='human')
env = DescentEnv()

Reading config from C:\Users\npere\bluesky\settings.cfg
Reading magnetic variation data
Loading global navigation database...
Reading cache: C:\Users\npere\bluesky\cache\navdata.p
Successfully loaded OpenAP performance model
Failed to load BADA performance model
Successfully loaded legacy performance model
Successfully loaded plugin AREA
Successfully loaded plugin DATAFEED


Observation Space

In [3]:
env.observation_space

Dict('altitude': Box(-inf, inf, (1,), float64), 'runway_distance': Box(-inf, inf, (1,), float64), 'target_altitude': Box(-inf, inf, (1,), float64), 'vz': Box(-inf, inf, (1,), float64))

Action Space

In [23]:
env.action_space

Box(-1.0, 1.0, (1,), float64)

Discretización de los estados

**Nota:** es importante que chequeen el espacio de observación y el espacio de acción del entorno. Los números usados son ejemplos y pueden no ser correctos

**Discretizacion actualizada**

In [3]:
ALT_MIN = 2000
ALT_MAX = 4000
ALT_MEAN = 1500
ALT_STD = 3000
VZ_MEAN = 0
VZ_STD = 5
RWY_DIS_MEAN = 100
RWY_DIS_STD = 200
altitude_space = np.linspace(0, 1, 20)           
vertical_velocity_space = np.linspace(-5, 5, 15)  
target_altitude_space = np.linspace(0, 1, 20)   
runway_distance_space = np.linspace(0, 0.5, 15)  # 20 bins para distancia a la pista normalizada

Obtener el estado a partir de la observación

In [4]:
def get_state(obs):
    alt = obs['altitude'][0]
    vz = obs['vz'][0]
    target_alt = obs['target_altitude'][0]
    runway_dist = obs['runway_distance'][0]
    alt_idx = min(np.digitize(alt, altitude_space), len(altitude_space)-1)
    vz_idx = min(np.digitize(vz, vertical_velocity_space), len(vertical_velocity_space)-1)
    target_alt_idx = min(np.digitize(target_alt, target_altitude_space), len(target_altitude_space)-1)
    runway_dist_idx = min(np.digitize(runway_dist, runway_distance_space), len(runway_distance_space)-1)
    return alt_idx, vz_idx, target_alt_idx, runway_dist_idx

In [26]:
obs = env.observation_space.sample()
print(obs)
state = get_state(obs) # Ejemplo de obs
state

OrderedDict([('altitude', array([0.66164064])), ('runway_distance', array([0.10889728])), ('target_altitude', array([-1.95279903])), ('vz', array([0.51672287]))])


(45, 36, 0, 15)

Discretización de las acciones

In [5]:
actions = list(np.linspace(-1, 1, 15))
actions

[-1.0,
 -0.8571428571428572,
 -0.7142857142857143,
 -0.5714285714285714,
 -0.4285714285714286,
 -0.2857142857142858,
 -0.1428571428571429,
 0.0,
 0.1428571428571428,
 0.2857142857142856,
 0.4285714285714284,
 0.5714285714285714,
 0.7142857142857142,
 0.857142857142857,
 1.0]

In [6]:
def get_sample_action():
    return random.choice(actions)

Inicilización de la tabla Q

In [7]:
Q = np.zeros((len(altitude_space), len(vertical_velocity_space), len(target_altitude_space), len(runway_distance_space), len(actions)))
Q.shape

(20, 15, 20, 15, 15)

Obtención de la acción a partir de la tabla Q

In [8]:
def optimal_policy(state, Q):
    action = actions[np.argmax(Q[state])]
    return action

Epsilon-Greedy Policy

In [12]:
def epsilon_greedy_policy(state, Q, epsilon=0.1):
    explore = np.random.binomial(1, epsilon)
    if explore:
        action = get_sample_action()
    else:
        action = optimal_policy(state, Q)
        
    return action

Ejemplo de episodio 

In [22]:
obs, _ = env.reset()
print(obs)
done = False
total_reward = 0
state = get_state(obs)
steps = 0

min_runway_distance = float('inf')
max_runway_distance = float('-inf')

for _ in range(1):
    # Acción del modelo
    action = epsilon_greedy_policy(state, Q, 0.5)
    action_idx = actions.index(action)
    real_action = np.array([action])
    obs, reward, done, _, _ = env.step(real_action)
    next_state = get_state(obs)
    
    # Guardar min y max runway_distance
    runway_distance = obs['runway_distance'][0]
    if runway_distance < min_runway_distance:
        min_runway_distance = runway_distance
    if runway_distance > max_runway_distance:
        max_runway_distance = runway_distance

    state = next_state
    total_reward += reward
    steps += 1
    if done:
        obs, _ = env.reset()
        state = get_state(obs)
        done = False

env.close()
print('total_reward', total_reward)
print('steps', steps)
print('min_runway_distance:', min_runway_distance)
print('max_runway_distance:', max_runway_distance)


# Q-learning


In [None]:
import pickle
import sys
sys.stdout = open('output.txt', 'w', buffering=1)

In [17]:
i = 8000
total_reward = 0
rewards = []
obs, _ = env.reset()
done = False

def get_explore_prob(i):
    return 0.2
    
while True:
    obs, _ = env.reset()
    done = False
    episode_reward = 0
    while not done:
        p = random.uniform(0, 1)
        state = get_state(obs)
        prob = get_explore_prob(i)
        if p < prob:
            action = get_sample_action()
        else:
            action = optimal_policy(state, Q)
        next_obs, reward, done, _, _ = env.step(np.array([action]))
        next_state = get_state(next_obs)
        action_idx = actions.index(action)
        Q[state][action_idx] = Q[state][action_idx] + 0.5 * (reward + 0.9 * np.max(Q[next_state]) - Q[state][action_idx])
        obs = next_obs
        episode_reward += reward
    rewards.append(episode_reward)
    if (i + 1) % 100 == 0:
        mean_reward = np.mean(rewards[-100:])
        print(f"Media de reward en episodios {i-98} a {i+1}: {mean_reward} con exploracion {prob}")
    if (i + 1) % 1000 == 0:
        with open('Q.pkl', 'wb') as f:
            pickle.dump(Q, f)
    i += 1

KeyboardInterrupt: 

In [1]:
import pickle

with open('Q.pkl', 'rb') as f:
    Q = pickle.load(f)

print("Tipo:", type(Q))
print("Shape:", Q.shape)
print("Tamaño en memoria (MB):", Q.nbytes / 1024 / 1024)

Tipo: <class 'numpy.ndarray'>
Shape: (20, 15, 20, 15, 15)
Tamaño en memoria (MB): 10.2996826171875


In [19]:
# Validación con 100 episodios usando la política aprendida (greedy)
env = DescentEnv()
total_rewards = []
total_steps = []

for episode in range(100):
    obs, _ = env.reset()
    done = False
    episode_reward = 0
    steps = 0
    while not done:
        state = get_state(obs)
        action = optimal_policy(state, Q)  # Política greedy
        obs, reward, done, _, _ = env.step(np.array([action]))
        episode_reward += reward
        steps += 1
        # Si no quieres render, comenta la siguiente línea
        # env.render()
    total_rewards.append(episode_reward)
    total_steps.append(steps)

env.close()
print(f"Recompensa media en 100 episodios: {np.mean(total_rewards)}")
print(f"Recompensa minima: {np.min(total_rewards)}")
print(f"Recompensa maxima: {np.max(total_rewards)}")
print(f"Pasos promedio por episodio: {np.mean(total_steps)}")

In [21]:
#Grabacion del resultado
from env_recorder_wrapper import VideoRecorderWrapper
env = DescentEnv(render_mode="rgb_array")
env = VideoRecorderWrapper(env, fps=10) 


total_rewards = []
total_steps = []

for episode in range(1):
    obs, _ = env.reset()
    done = False
    episode_reward = 0
    steps = 0
    while not done:
        state = get_state(obs)
        action = optimal_policy(state, Q) 
        obs, reward, done, truncated, _ = env.step(np.array([action]))
        episode_reward += reward
        steps += 1
        if done or truncated:
            break
    total_rewards.append(episode_reward)
    total_steps.append(steps)

env.close()

In [None]:
env = DescentEnv(render_mode="rgb_array")
frame = env.render()
print(type(frame), frame.shape if frame is not None else "None")

# Stochastic Q Learning

In [None]:
import math


def stoch_argmax(Q_values, k=None):
    n = len(Q_values)
    if k is None:
        k = max(1, int(math.log2(n)))  # O(log(n))
    subset = random.sample(range(n), k)
    best_action = subset[0]
    best_value = Q_values[best_action]
    for i in subset[1:]:
        if Q_values[i] > best_value:
            best_action = i
            best_value = Q_values[i]
    return best_action

In [None]:
k_stochmax = 3  
rewards = []
i=0
def get_explore_prob(i):
    """
    Devuelve la probabilidad de exploración (epsilon) según el número de episodio i.
    Alterna entre 0.7 y 0.3 cada 500 episodios.
    """
    if 0 <= i <= 500:
        return 0.7
    elif 501 <= i <= 1000:
        return 0.3
    elif 1001 <= i <= 1500:
        return 0.7
    elif 1501 <= i <= 2000:
        return 0.3
    elif 2001 <= i <= 2500:
        return 0.7
    elif 2501 <= i <= 3000:
        return 0.3
    elif 3001 <= i <= 3500:
        return 0.7
    elif 3501 <= i <= 4000:
        return 0.3
    elif 4001 <= i <= 4500:
        return 0.7
    elif 4501 <= i <= 5000:
        return 0.3
    else:
        return 0.1  # valor por defecto fuera de rango

while i < 5000:
    obs, _ = env.reset()
    done = False
    episode_reward = 0
    while not done:
        p = random.uniform(0, 1)
        state = get_state(obs)
        prob = get_explore_prob(i)
        if p < prob:
            action = get_sample_action()
        else:
            action_idx = stoch_argmax(Q[state], k=k_stochmax)
            action = actions[action_idx]
        next_obs, reward, done, _, _ = env.step(np.array([action]))
        next_state = get_state(next_obs)
        best_next_action_idx = stoch_argmax(Q[next_state], k=k_stochmax)
        Q[state][action_idx] += 0.9 * (reward + 0.9 * Q[next_state][best_next_action_idx] - Q[state][action_idx])
        obs = next_obs
        episode_reward += reward
    rewards.append(episode_reward)
    if (i + 1) % 100 == 0:
        mean_reward = np.mean(rewards[-100:])
        print(f"Media de reward en episodios {i-98} a {i+1}: {mean_reward} con exploración {prob}")
    i += 1