In [None]:
# x_change_training
# Cell 1

import gymnasium as gym
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize, VecMonitor
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.logger import TensorBoardOutputFormat
from stable_baselines3.common.utils import get_linear_fn
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize, VecMonitor
from statistics import mean, stdev
import pandas as pd
import time



Just pulling in Gym, NumPy, Stable‑Baselines3 and the SB3 helpers I’ll need. No logic yet, just getting tools ready

In [None]:
# Cell 2
# wrapper to extract custom metrics (fuel, landing error)
class LunarWrapper(gym.Wrapper):
    def reset(self, **kwargs):
        obs, info = super().reset(**kwargs)
        return obs, info

    def step(self, action):
        obs, reward, terminated, truncated, info = super().step(action)
        # approximate “fuel used”
        info["fuel_used"] = 1.0 if action == 2 else 0.0
        # landing error = distance from pad center
        info["landing_err"] = np.linalg.norm(obs[0:2])
        return obs, reward, terminated, truncated, info

# callback to log metrics to TensorBoard
class MetricsCallback(BaseCallback):
    def _on_training_start(self):
        if not any(isinstance(fmt, TensorBoardOutputFormat)
                   for fmt in self.logger.output_formats):
            self.logger.configure(format_strings=["tensorboard"])
    def _on_step(self) -> bool:
        for info in self.locals.get("infos", []):
            if "episode" in info:
                self.logger.record("custom/fuel_used",   info["fuel_used"])
                self.logger.record("custom/landing_err", info["landing_err"])
        return True

# build vec-env, monitor, normalize
def make_env():
    env = gym.make("LunarLander-v3")
    env = LunarWrapper(env)
    return env

env = DummyVecEnv([make_env])
# write one line per episode into monitor_04.csv
env = VecMonitor(env, filename="monitor_04.csv")
env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10.0)

# instantiate PPO with fine-tuned hyperparameters
model = PPO(
    policy="MlpPolicy",
    env=env,
    tensorboard_log="./ppo_lunar_tb",
    verbose=1,
    n_steps=4096,
    batch_size=256,
    learning_rate=get_linear_fn(5e-4, 1e-5, 0.1),
    clip_range=0.15,
    ent_coef=0.005,
    policy_kwargs=dict(net_arch=[dict(pi=[128,128], vf=[128,128])]),
)

# train for 1 500 000 steps (to get score over 200), logging metrics 
model.learn(
    total_timesteps=1_500_000,
    callback=MetricsCallback(),
    tb_log_name="fine_tune"
)

# save final model and normalization stats
model.save("ppo_lunar_with_metrics")
env.save("ppo_vecnormalize.pkl")
print("Training complete and model saved.")


Using cpu device
Logging to ./ppo_lunar_tb\fine_tune_10
-----------------------------------
| custom/            |            |
|    fuel_used       | 0          |
|    landing_err     | 0.42016345 |
| rollout/           |            |
|    ep_len_mean     | 94.4       |
|    ep_rew_mean     | -162       |
| time/              |            |
|    fps             | 1388       |
|    iterations      | 1          |
|    time_elapsed    | 2          |
|    total_timesteps | 4096       |
-----------------------------------
------------------------------------------
| custom/                 |              |
|    fuel_used            | 0            |
|    landing_err          | 0.07190467   |
| rollout/                |              |
|    ep_len_mean          | 92.1         |
|    ep_rew_mean          | -155         |
| time/                   |              |
|    fps                  | 1283         |
|    iterations           | 2            |
|    time_elapsed         | 6            |
|  

I wrap the env to log fuel and landing error, vectorize/monitor/normalize it, pick my PPO hyperparameters, then run 1.5 M training steps (just manages to reach reward of 200, which is what is classified as a success). Finally I save the model and normalization stats.

In [None]:
# Cell 3: Playback to confirm random‐start

# copy wrappers from Cell 1, but make reset return (obs, info)
class RandomStartWrapper(gym.Wrapper):
    def reset(self, **kwargs):
        obs, info = super().reset(**kwargs)
        # pick and apply world shift
        self.pad_shift = self.np_random.uniform(-1.0, 1.0)
        uw = self.unwrapped
        uw.helipad_x1 += self.pad_shift
        uw.helipad_x2 += self.pad_shift
        obs = obs.copy(); obs[0] -= self.pad_shift
        return obs, info   # ← two values

    def step(self, action):
        obs, reward, terminated, truncated, info = super().step(action)
        obs = obs.copy(); obs[0] -= self.pad_shift
        return obs, reward, terminated, truncated, info

class LunarWrapper(gym.Wrapper):
    def reset(self, **kwargs):
        return super().reset(**kwargs)    # ← passes through (obs, info)
    def step(self, action):
        obs, reward, term, trunc, info = super().step(action)
        info["fuel_used"] = 1.0 if action==2 else 0.0
        return obs, reward, term, trunc, info

def play_back(model_path, n_steps=500, sleep=0.016):
    def make_env():
        e = gym.make("LunarLander-v3", render_mode="human")
        e = RandomStartWrapper(e)
        e = LunarWrapper(e)
        return e

    ev = DummyVecEnv([make_env])
    ev = VecMonitor(ev)  
    ev = VecNormalize.load("ppo_vecnormalize.pkl", ev)
    ev.training=False; ev.norm_reward=False

    m = PPO.load(model_path, env=ev)
    obs = ev.reset()               # ← unpack (obs, infos)
    for i in range(n_steps):
        action, i = m.predict(obs, deterministic=True)
        obs, i, dones, i = ev.step(action)
        ev.render()
        time.sleep(sleep)
        if dones[0]:
            break
    ev.close()

print("Playback random‐start policy:")
play_back("ppo_lunar_with_metrics.zip")


Playback random‐start policy:


Reload the saved policy, rebuild the same normalization pipeline, and render up to 500 frames in human mode so you can see the agent actually land.

In [24]:
# Cell 4
class RandomPadWrapper(gym.Wrapper):
    def reset(self, **kwargs):
        obs, info = super().reset(**kwargs)
        # pick random shift Δ
        self.pad_shift = self.np_random.uniform(-0.4, 0.4)
        # move the pad flags in the unwrapped env
        uw = self.unwrapped
        uw.helipad_x1 += self.pad_shift
        uw.helipad_x2 += self.pad_shift
        # shift obs so agent still sees pad at 0
        obs = obs.copy()
        obs[0] -= self.pad_shift
        return obs, info

    def step(self, action):
        obs, reward, terminated, truncated, info = super().step(action)
        obs = obs.copy()
        obs[0] -= self.pad_shift
        return obs, reward, terminated, truncated, info


Here’s a little class that on each reset randomly shifts the pad’s world‑x by ±0.4, while keeping the agent’s observation centered at x = 0.

In [None]:
# Cell 5: Quantitative evaluation + CSV export

# bring in LunarWrapper so reset returns (obs, info) and step adds fuel_used
class LunarWrapper(gym.Wrapper):
    def reset(self, **kwargs):
        obs, info = super().reset(**kwargs)
        return obs, info
    def step(self, action):
        obs, reward, term, trunc, info = super().step(action)
        info["fuel_used"] = 1.0 if action == 2 else 0.0
        return obs, reward, term, trunc, info

def make_eval_env():
    e = gym.make("LunarLander-v3")
    e = RandomPadWrapper(e)   # shift the pad
    e = LunarWrapper(e)       # now track fuel_used
    return e                  # ← return that wrapped env

# build vec‑env & load normalization
eval_env = DummyVecEnv([make_eval_env])
eval_env = VecMonitor(eval_env)
eval_env = VecNormalize.load("ppo_vecnormalize.pkl", eval_env)
eval_env.training = False
eval_env.norm_reward = False

# load model
model = PPO.load("ppo_lunar_with_metrics.zip", env=eval_env)

# evaluation loop
def evaluate(model, env, n_episodes=50):
    rewards, fuels, successes = [], [], 0
    for i in range(n_episodes):
        obs = env.reset()       # ← only one return value now
        done = False
        R, F = 0.0, 0.0
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, dones, infos = env.step(action)
            R += reward[0]
            F += infos[0]["fuel_used"]
            done = dones[0]
        rewards.append(R)
        fuels.append(F)
        if R >= 200:
            successes += 1
    return rewards, fuels, successes

# run and report
rewards, fuels, successes = evaluate(model, eval_env, n_episodes=50)
mean_r, mean_f = mean(rewards), mean(fuels)
succ_rate = successes / 50

print(f"Mean reward:    {mean_r:.1f} ± {stdev(rewards):.1f}")
print(f"Mean fuel used: {mean_f:.1f} ± {stdev(fuels):.1f}")
print(f"Success rate:   {successes}/50 = {succ_rate:.0%}")

# ── write out metrics for bar chart ──
df = pd.DataFrame([{
    "experiment":   "ppo_x_change",
    "mean_reward":  mean_r,
    "mean_fuel":    mean_f,
    "success_rate": succ_rate
}])
df.to_csv("04_x_change_metrics.csv", index=False)
print("→ metrics written to 04_x_change_metrics.csv")


Mean reward:    195.4 ± 84.3
Mean fuel used: 124.2 ± 22.5
Success rate:   35/50 = 70%
→ metrics written to 04_x_change_metrics.csv


Now I combine the fuel‑logger and random‑pad wrappers, load the fixed‑pad policy, and run 50 episodes, printing mean reward and mean fuel so I can see how well the policy generalizes.