# Bibliotecas

In [17]:
import os
import sys
import random
import time
import argparse

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Gymnasium e wrappers
import gymnasium as gym

# Stable-Baselines3
import torch
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecMonitor
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback

# Wrappers para customizações do LunarLander-v3

In [18]:
class RewardShapingWrapper(gym.Wrapper):
    """
    Reward shaping baseado nas observações do LunarLander.
    Obs: [x, y, vx, vy, angle, angular_vel, leg1_contact, leg2_contact]
    """
    def __init__(self, env, angle_penalty=20.0, dist_penalty=0.5, height_penalty=1.0, land_bonus=100.0):
        super().__init__(env)
        self.angle_penalty = angle_penalty
        self.dist_penalty = dist_penalty
        self.land_bonus = land_bonus
        self.height_penalty = height_penalty
        print(f"[RewardShapingWrapper] Exponential penalties: height_penalty={height_penalty}, dist_penalty={dist_penalty}")

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        
        # Calcular reward shaping baseado na observação
        shaping = 0.0
        
        try:
            # Penalizar por quantidade de steps (incentivar aterragem rápida)
            shaping -= 0.1


            # LunarLander obs: [x, y, vx, vy, angle, angular_vel, leg1_contact, leg2_contact]
            x = float(obs[0])
            y = float(obs[1])
            angle = float(obs[4])
            angular_vel = float(obs[5])
            leg1_contact = bool(obs[6])
            leg2_contact = bool(obs[7])
            
            # BÔNUS: Aterrar com ambas as pernas
            if leg1_contact and leg2_contact:
                shaping += self.land_bonus

            # PENALIZAR: Ângulo e velocidade angular (queremos horizontal e estável)
            shaping -= self.angle_penalty * (abs(angle) + abs(angular_vel))

            # PENALIZAR: Distância horizontal ao centro (x=0 é o helipad)
            # Penalização EXPONENCIAL: quanto mais longe, pior fica
            x_penalty = self.dist_penalty * (np.exp(abs(x)) - 1)
            shaping -= x_penalty
            
            # PENALIZAR: Altura (y > 0 = acima do solo)
            # No LunarLander, helipad está aproximadamente em y=0
            # Penalização EXPONENCIAL: quanto mais alto, muito pior
            height_above = max(0.0, y)
            y_penalty = self.height_penalty * (np.exp(height_above) - 1) * 2
            shaping -= y_penalty

            # Penalalizar velocidade consoante mais perto do solo
            
            if y < 1.0:
                vy = float(obs[3])
                vel_penalty = abs(vy) * (2 - y)  # Quanto mais perto do solo, maior a penalização
                shaping -= vel_penalty

            if leg1_contact or leg2_contact:
                vx = float(obs[2])
                ground_vel_penalty = abs(vx) * 5.0  # Penalização maior por velocidade horizontal ao tocar o solo
                shaping -= ground_vel_penalty
                if leg1_contact and leg2_contact:
                    shaping += 20.0  # Pequeno bónus por tocar com ambas as pernas

            

        except Exception as e:
            print(f"[RewardShaping ERROR] {e}")
            import traceback
            traceback.print_exc()
            shaping = 0.0
        
        # Adicionar shaping ao reward original
        shaped_reward = reward + shaping
        
        return obs, shaped_reward, terminated, truncated, info

In [19]:

class WindForceWrapper(gym.Wrapper):
    """Aplica perturbação lateral (vento) à velocidade horizontal 'observada'."""
    def __init__(self, env, wind_strength=0.02, deterministic=False):
        super().__init__(env)
        self.wind_strength = wind_strength
        self.deterministic = deterministic

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        done_flag = (terminated or truncated)

        if isinstance(obs, np.ndarray) and obs.shape[0] >= 3:
            wind = self.wind_strength if self.deterministic else self.wind_strength * (2*np.random.rand()-1)
            obs = obs.copy()
            obs[2] += wind
            reward -= abs(wind) * 5.0

        # devolver no mesmo formato do original
        return obs, reward, terminated, truncated, info

In [20]:

class ObservationNoiseWrapper(gym.ObservationWrapper):
    """Add gaussian noise to observations to make the task more robust.
    """
    def __init__(self, env, noise_std=0.01):
        super().__init__(env)
        self.noise_std = noise_std


    def observation(self, observation):
        if isinstance(observation, np.ndarray):
            return observation + np.random.normal(scale=self.noise_std, size=observation.shape)
        return observation

# Criação do ambiente customizado

In [21]:
def make_env_factory(env_id='LunarLander-v3', seed=None, config_name='orig', monitor_dir=None):
    """
    Retorna uma função _init compatível com DummyVecEnv que:
     - cria env,
     - aplica wrappers consoante config_name,
     - envolve com Monitor(escreve monitor.csv em monitor_dir).
    config_name em {'orig', 'reward', 'wind', 'noise', 'all'}
    """
    def _init():
        env = gym.make(env_id)
        if seed is not None:
            # seed reset (Gymnasium)
            try:
                env.reset(seed=seed)
            except TypeError:
                env.reset()
            env.action_space.seed(seed)
            env.observation_space.seed(seed)

        # Aplicar wrappers conforme config
        if config_name == 'reward':
            env = RewardShapingWrapper(env, angle_penalty=30.0, dist_penalty=0.8, height_penalty=2.0, land_bonus=100.0)
        elif config_name == 'wind':
            env = WindForceWrapper(env, wind_strength=0.03, deterministic=False)
        elif config_name == 'noise':
            env = ObservationNoiseWrapper(env, noise_std=0.02)
        elif config_name == 'all':
            env = RewardShapingWrapper(env, angle_penalty=30.0, dist_penalty=0.8, height_penalty=2.0, land_bonus=80.0)
            env = WindForceWrapper(env, wind_strength=0.03, deterministic=False)
            env = ObservationNoiseWrapper(env, noise_std=0.02)
        # caso 'orig' => nenhum wrapper

        # Monitor: grava os episódios neste ficheiro
        if monitor_dir is not None:
            os.makedirs(monitor_dir, exist_ok=True)
            monitor_file = os.path.join(monitor_dir, 'monitor.csv')
            env = Monitor(env, filename=monitor_file)
        else:
            env = Monitor(env)

        return env
    return _init


In [22]:
def make_env_factory_multiple_configs(env_id='LunarLander-v3', seed=None, config_name='orig', monitor_dir=None):
    """
    Retorna uma função _init compatível com DummyVecEnv que:
     - cria env,
     - aplica wrappers consoante config_name,
     - envolve com Monitor(escreve monitor.csv em monitor_dir).
    config_name em {'orig', 'reward', 'wind', 'noise', 'all'}
    """
    def _init():
        env = gym.make(env_id)
        if seed is not None:
            # seed reset (Gymnasium)
            try:
                env.reset(seed=seed)
            except TypeError:
                env.reset()
            env.action_space.seed(seed)
            env.observation_space.seed(seed)

        # Aplicar wrappers conforme config
        if 'reward' in config_name:
            print("[CREATING ENV] Applying RewardShapingWrapper")
            env = RewardShapingWrapper(env, angle_penalty=30.0, dist_penalty=0.8, height_penalty=1.0, land_bonus=100.0)
        if 'wind' in config_name:
            print("[CREATING ENV] Applying WindForceWrapper")
            env = WindForceWrapper(env, wind_strength=0.01, deterministic=False)
        if 'noise' in config_name:
            print("[CREATING ENV] Applying ObservationNoiseWrapper")
            env = ObservationNoiseWrapper(env, noise_std=0.02)
        if config_name == 'all':
            env = RewardShapingWrapper(env, angle_penalty=30.0, dist_penalty=0.8, height_penalty=1.0, land_bonus=80.0)
            env = WindForceWrapper(env, wind_strength=0.03, deterministic=False)
            env = ObservationNoiseWrapper(env, noise_std=0.02)
        
        # caso 'orig' => nenhum wrapper
        
        # Monitor: grava os episódios neste ficheiro
        if monitor_dir is not None:
            os.makedirs(monitor_dir, exist_ok=True)
            monitor_file = os.path.join(monitor_dir, 'monitor.csv')
            env = Monitor(env, filename=monitor_file)
        else:
            env = Monitor(env)

        return env
    return _init


# Função de treino do PPO

In [None]:
def train_configs(config_name='orig', seed=0, timesteps=300_000, hyperparams=None, out_root='./experiments'):
    """
    Treina um PPO para a configuração especificada.
    - Guarda modelo e monitor.csv em out_root/config_name/seedX/
    - hyperparams: dict que sobrepõe os defaults do PPO (learning_rate,n_steps,batch_size,n_epochs,...)
    """
    if hyperparams is None:
        hyperparams = {}

    name = ''

    for i, c in enumerate(config_name):
        name += c
        if (i < len(config_name) - 1):
            name += '_'

    out_dir = os.path.join(out_root, name, f'seed{seed}')
    os.makedirs(out_dir, exist_ok=True)

    # reproducibilidade
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)

    # criar env factory (monitor dentro da pasta out_dir)
    env_fn = make_env_factory(seed=seed, config_name=config_name, monitor_dir=out_dir)
    vec_env = DummyVecEnv([env_fn])
    vec_env = VecMonitor(vec_env)

    # policy defaults
    policy_kwargs = dict(activation_fn=torch.nn.Tanh, net_arch=[dict(pi=[256,256], vf=[256,256])])

    # PPO defaults (podes sobrepor via hyperparams)
    ppo_defaults = dict(
        policy='MlpPolicy',
        env=vec_env,
        verbose=1,
        seed=seed,
        learning_rate=3e-5,
        n_steps=4096,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        ent_coef=0.0,
        vf_coef=0.5,
        clip_range=0.2,
        policy_kwargs=policy_kwargs,
        tensorboard_log=os.path.join(out_dir, 'tb')
    )
    # update defaults with provided hyperparams
    ppo_defaults.update(hyperparams)

    model = PPO(**ppo_defaults)

    # callbacks: evaluation e checkpoints (guardam em out_dir)
    eval_env = DummyVecEnv([make_env_factory_multiple_configs(seed=seed+1000, config_name=config_name, monitor_dir=None)])
    eval_env = VecMonitor(eval_env)
    eval_callback = EvalCallback(eval_env, best_model_save_path=out_dir,
                                 log_path=out_dir, eval_freq=max(10_000, ppo_defaults['n_steps']*2),
                                 n_eval_episodes=5, deterministic=True, render=False)
    checkpoint_callback = CheckpointCallback(save_freq=max(50_000, ppo_defaults['n_steps']*5),
                                             save_path=out_dir, name_prefix='ppo_checkpoint')

    model.learn(total_timesteps=timesteps, callback=[eval_callback, checkpoint_callback])

    model_path = os.path.join(out_dir, f'ppo_{config_name}_seed{seed}.zip')
    model.save(model_path)

    vec_env.close()
    eval_env.close()
    print(f"[TRAIN] Saved model: {model_path}")
    return model_path, out_dir


# Função de avaliação com critério revisado

In [24]:
def evaluate_custom(model_path, config_name='orig', seed=None, episodes=50):
    """
    Avalia o modelo (usa DummyVecEnv com mesma config). Critério de sucesso:
     - em qualquer step do episódio both legs touched, OR total_reward >= 200
    Retorna dicionário com métricas e lista de recompensas por episódio.
    """
    # carregar env (para avaliação, monitor não é necessário)
    env_fn = make_env_factory(seed=seed, config_name=config_name, monitor_dir=None)
    vec_env = DummyVecEnv([env_fn])
    vec_env = VecMonitor(vec_env)

    model = PPO.load(model_path, env=vec_env)

    # avaliação rápida via evaluate_policy (apenas para ter mean/std)
    mean_reward, std_reward = evaluate_policy(model, vec_env, n_eval_episodes=min(10, episodes), deterministic=True)
    print(f"[EVAL] quick evaluate_policy: mean={mean_reward:.2f} std={std_reward:.2f}")

    # per-episode sampling para success metric
    successes = 0
    rewards = []
    for _ in range(episodes):
        reset_out = vec_env.reset()
        obs = reset_out[0] if isinstance(reset_out, tuple) else reset_out

        done = False
        total_r = 0.0
        landed_flag = False
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            step_out = vec_env.step(action)
            # VecEnv.step returns (obs, reward, done, info)
            obs, reward, done, info = step_out
            # reward pode ser array shape (1,), garantir float
            try:
                total_r += float(np.array(reward).item())
            except Exception:
                total_r += float(reward)

            # verificar contacto das pernas na observação atual
            try:
                last_obs = obs[0]  # porque DummyVecEnv usa batch dimension
                if bool(last_obs[6]) and bool(last_obs[7]):
                    landed_flag = True
            except Exception:
                pass

        if landed_flag or total_r >= 200:
            successes += 1
        rewards.append(total_r)

    vec_env.close()
    rewards = np.array(rewards)
    result = {
        'mean_reward': float(rewards.mean()),
        'std_reward': float(rewards.std()),
        'median_reward': float(np.median(rewards)),
        'success_rate': float(successes) / len(rewards),
        'per_episode': rewards
    }
    print(f"[EVAL] result: mean={result['mean_reward']:.2f} std={result['std_reward']:.2f} success_rate={result['success_rate']:.2f}")
    return result


# Função para plot de treino

In [25]:
def plot_training_monitor(monitor_csv_path, window=10, show=True, out_png=None):
    """Plota rewards (raw + smoothed) a partir do monitor.csv gerado pelo Monitor."""
    if not os.path.exists(monitor_csv_path):
        print("[PLOT] monitor file not found:", monitor_csv_path)
        return
    df = pd.read_csv(monitor_csv_path, comment='#')
    df['r_smooth'] = df['r'].rolling(window=window, min_periods=1).mean()
    plt.figure(figsize=(10,4))
    plt.plot(df['r'], alpha=0.25, label='raw')
    plt.plot(df['r_smooth'], label=f'smoothed({window})')
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.title(os.path.basename(os.path.dirname(monitor_csv_path)))
    plt.legend()
    if out_png:
        plt.savefig(out_png, bbox_inches='tight')
        print("[PLOT] saved to", out_png)
    if show:
        plt.show()
    plt.close()

def plot_compare_configs(base_dir, configs, seed, window=20):
    """Plota curvas suavizadas (por seed) comparando várias configs no mesmo gráfico."""
    plt.figure(figsize=(10,5))
    for cfg in configs:
        monitor_csv = os.path.join(base_dir, cfg, f'seed{seed}', 'monitor.csv')
        if not os.path.exists(monitor_csv):
            print("[COMPARE] monitor not found for", cfg, "seed", seed)
            continue
        df = pd.read_csv(monitor_csv, comment='#')
        df['r_smooth'] = df['r'].rolling(window=window, min_periods=1).mean()
        plt.plot(df['r_smooth'], label=f'{cfg}')
    plt.xlabel('Episode')
    plt.ylabel('Smoothed Reward')
    plt.title(f'Compare configs (seed {seed})')
    plt.legend()
    plt.show()

# Execução para múltiplas seeds

In [26]:
def run_experiments(configs, seeds, timesteps=50_000, hyperparams=None, out_root='./experiments'):
    """
    Roda treinos e avaliações para lista de configs e seeds.
    Retorna um dict results[config][seed] = metrics
    """
    results = {}
    for s in seeds:
        print("\n\n============================")
        print(f"Training config={configs} seed={s}")
        model_path, out_dir = train_configs(config_name=configs, seed=s, timesteps=timesteps,
                                            hyperparams=hyperparams, out_root=out_root)
        print(f"Evaluating model for config={configs} seed={s}")
        res = evaluate_custom(model_path=model_path, config_name=configs, seed=s, episodes=50)
    return results


In [27]:
from itertools import product

def run_hyperparam_grid(config_name, seed, grid, timesteps=300_000, out_root='./experiments_grid'):
    """
    grid: dict of lists, e.g. {'learning_rate':[3e-5,1e-4],'n_steps':[2048,4096]}
    Vai gerar todas as combinações, treinar e guardar resultados em out_root/config_name/seed/hparam_i
    Retorna lista de (hparam_dict, metrics)
    """
    keys, values = zip(*grid.items())
    combos = [dict(zip(keys, v)) for v in product(*values)]
    results = []
    for i, combo in enumerate(combos):
        print(f"\n---- Grid {i+1}/{len(combos)}: {combo}")
        # colocar cada combo numa subpasta
        out_root_combo = os.path.join(out_root, config_name, f'seed{seed}', f'grid_{i}')
        os.makedirs(out_root_combo, exist_ok=True)
        model_path, _ = train_configs(config_name=config_name, seed=seed, timesteps=timesteps,
                                    hyperparams=combo, out_root=out_root_combo)
        metrics = evaluate_custom(model_path=model_path, config_name=config_name, seed=seed, episodes=30)
        results.append((combo, metrics, model_path, out_root_combo))
    return results


# Test Models

In [28]:
def visualize_model(model_path, config_name='orig', episodes=5, seed=None, render_mode='human'):
    """
    Carrega um modelo treinado e renderiza episódios para visualização.
    configs = ['orig','reward','wind','noise','all']
    seeds = [0, 7, 42]
    results = run_experiments(configs, seeds, timesteps=50_000, hyperparams=None, out_root='./experiments')
    Args:
        model_path: caminho para o ficheiro .zip do modelo
        config_name: configuração do ambiente ('orig', 'reward', 'wind', 'noise', 'all')
        episodes: número de episódios para visualizar
        seed: seed para reprodutibilidade
        render_mode: 'human' para janela ou 'rgb_array' para gravar
    """
    # Criar ambiente COM renderização
    env = gym.make('LunarLander-v3', render_mode=render_mode)
    
    if seed is not None:
        env.reset(seed=seed)
    
    # Aplicar mesmos wrappers usados no treino
    if 'reward' in config_name:
        print("[VISUALIZE] Applying RewardShapingWrapper")
        env = RewardShapingWrapper(env, angle_penalty=30.0, dist_penalty=0.8, height_penalty=2.0, land_bonus=100.0)
    if 'wind' in config_name:
        print("[VISUALIZE] Applying WindForceWrapper")
        env = WindForceWrapper(env, wind_strength=0.01, deterministic=False)
    if 'noise' in config_name:
        print("[VISUALIZE] Applying ObservationNoiseWrapper")
        env = ObservationNoiseWrapper(env, noise_std=0.02)
    if config_name == 'all':
        env = RewardShapingWrapper(env, angle_penalty=30.0, dist_penalty=0.8, height_penalty=2.0, land_bonus=80.0)
        env = WindForceWrapper(env, wind_strength=0.03, deterministic=False)
        env = ObservationNoiseWrapper(env, noise_std=0.02)
    
    # Carregar modelo
    model = PPO.load(model_path)
    
    # Executar episódios
    for ep in range(episodes):
        obs, _ = env.reset()
        done = False
        total_reward = 0
        steps = 0
        
        print(f"\n=== Episódio {ep+1}/{episodes} ===")
        
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            total_reward += reward
            steps += 1
            
            if render_mode == 'human':
                env.render()
                time.sleep(0.01)  # pequeno delay para suavizar visualização
                if done:
                    time.sleep(1.0)  # pausa no final do episódio
        

        
        print(f"Recompensa total: {total_reward:.2f} | Steps: {steps}")
    
    env.close()
    print("\nVisualização concluída!")

In [29]:
configs = ['reward', 'wind']
seeds = [random.randint(0, 100)]
results = run_experiments(configs, seeds, timesteps=1_000, hyperparams=None, out_root='./experiments')



Training config=['reward', 'wind'] seed=73
Using cuda device
[CREATING ENV] Applying RewardShapingWrapper
[RewardShapingWrapper] Exponential penalties: height_penalty=1.0, dist_penalty=0.8
[CREATING ENV] Applying WindForceWrapper
Logging to ./experiments/rewardwind/seed73/tb/PPO_1




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 96.7     |
|    ep_rew_mean     | -182     |
| time/              |          |
|    fps             | 1179     |
|    iterations      | 1        |
|    time_elapsed    | 3        |
|    total_timesteps | 4096     |
---------------------------------
[TRAIN] Saved model: ./experiments/rewardwind/seed73/ppo_['reward', 'wind']_seed73.zip
Evaluating model for config=['reward', 'wind'] seed=73
[EVAL] quick evaluate_policy: mean=-783.89 std=357.17
[EVAL] result: mean=-862.39 std=527.51 success_rate=0.00


In [None]:
# Visualizar o melhor modelo da config 'all', seed 0
visualize_model(
    model_path=f'./experiments/{configs}/seed{seeds[0]}/ppo_{configs}_seed{seeds[0]}.zip',
    config_name=configs,
    episodes=1,
    seed=seeds[0]
)

[VISUALIZE] Applying RewardShapingWrapper
[RewardShapingWrapper] Exponential penalties: height_penalty=2.0, dist_penalty=0.8
[VISUALIZE] Applying WindForceWrapper


FileNotFoundError: [Errno 2] No such file or directory: "experiments/['reward', 'wind']/seed73/ppo_['reward', 'wind']_seed73.zip.zip"

: 