In [3]:
!pip install stable-baselines3[extra]

Collecting ale-py>=0.9.0
  Downloading ale_py-0.11.2-cp310-cp310-win_amd64.whl (3.5 MB)
     ---------------------------------------- 0.0/3.5 MB ? eta -:--:--
     - -------------------------------------- 0.1/3.5 MB 1.8 MB/s eta 0:00:02
     ----- ---------------------------------- 0.5/3.5 MB 5.3 MB/s eta 0:00:01
     --------- ------------------------------ 0.8/3.5 MB 6.6 MB/s eta 0:00:01
     ----------- ---------------------------- 1.0/3.5 MB 5.6 MB/s eta 0:00:01
     ----------------- ---------------------- 1.5/3.5 MB 6.9 MB/s eta 0:00:01
     ------------------------ --------------- 2.1/3.5 MB 8.3 MB/s eta 0:00:01
     ------------------------- -------------- 2.2/3.5 MB 8.3 MB/s eta 0:00:01
     ---------------------------------- ----- 3.0/3.5 MB 8.4 MB/s eta 0:00:01
     -------------------------------------- - 3.4/3.5 MB 8.6 MB/s eta 0:00:01
     ---------------------------------------  3.5/3.5 MB 8.2 MB/s eta 0:00:01
     ---------------------------------------- 3.5/3.5 MB 7.9 

In [7]:
# SimpleLander-v0: LunarLander-like env WITHOUT Box2D
# - 8D observation: [x, y, x_dot, y_dot, angle, angle_dot, left_contact, right_contact]
# - Discrete(4) actions: 0=noop, 1=left thruster, 2=main thruster, 3=right thruster
# - Minimal 2D physics + simple rgb_array renderer
# - Train DQN, evaluate, and save MP4 + GIF (≤ 100 MB)

import os, math, io, sys, subprocess, importlib
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from gymnasium.envs.registration import register

# Ensure video deps
def ensure(spec, import_name=None):
    try:
        importlib.import_module(import_name or spec.split("[")[0])
    except Exception:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", spec])
try:
    import imageio.v2 as imageio
except Exception:
    ensure("imageio"); import imageio.v2 as imageio
try:
    import imageio_ffmpeg  # noqa: F401
except Exception:
    ensure("imageio-ffmpeg"); import imageio_ffmpeg
try:
    from PIL import Image, ImageDraw
except Exception:
    ensure("Pillow"); from PIL import Image, ImageDraw

# ------------------ Box2D-free Lander ------------------
class SimpleLanderEnv(gym.Env):
    """
    A lightweight, Box2D-free clone of LunarLander with similar API.
    This is NOT identical physics, but close enough for algorithm testing.
    """
    metadata = {"render_modes": ["rgb_array"], "render_fps": 30}

    def __init__(self, render_mode=None, seed=None):
        super().__init__()
        self.render_mode = render_mode
        self.dt = 1.0 / 30.0
        self.g = -9.0

        # World bounds (normalized): x in [-1.5, 1.5], y in [0, 1.4]
        self.x_min, self.x_max = -1.5, 1.5
        self.y_min, self.y_max = 0.0, 1.4
        self.ground_y = 0.0

        # Thruster strengths
        self.main_acc = 20.0     # upward
        self.side_acc = 6.0      # lateral accel
        self.torque = 6.0        # angular accel

        # State: x, y, x_dot, y_dot, angle, angle_dot, left_contact, right_contact
        high = np.array([np.inf]*6 + [1.0, 1.0], dtype=np.float32)
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)
        self.action_space = spaces.Discrete(4)

        # Rendering
        self._width, self._height = 600, 400
        self._last_action = 0

        # Episode management
        self.max_steps = 1000
        self.steps = 0
        self.np_random, _ = gym.utils.seeding.np_random(seed)

        self.state = None

    def reset(self, seed=None, options=None):
        if seed is not None:
            self.np_random, _ = gym.utils.seeding.np_random(seed)
        # Start near the top, small random offset and small initial rotation
        x = float(self.np_random.uniform(-0.3, 0.3))
        y = float(self.np_random.uniform(1.0, 1.2))
        x_dot = float(self.np_random.uniform(-0.5, 0.5))
        y_dot = float(self.np_random.uniform(-0.2, 0.2))
        angle = float(self.np_random.uniform(-0.1, 0.1))
        angle_dot = float(self.np_random.uniform(-0.2, 0.2))
        left_contact = 0.0
        right_contact = 0.0

        self.state = np.array([x, y, x_dot, y_dot, angle, angle_dot, left_contact, right_contact], dtype=np.float32)
        self.steps = 0
        self._last_action = 0
        return self.state.copy(), {}

    def step(self, action: int):
        self.steps += 1
        action = int(action)
        x, y, x_dot, y_dot, angle, angle_dot, left_c, right_c = self.state

        # Thrusters
        ax = 0.0
        ay = self.g
        a_angle = 0.0

        if action == 2:  # main thruster
            ay += self.main_acc
        elif action == 1:  # left thruster -> push right and rotate CCW
            ax += self.side_acc
            a_angle += self.torque
        elif action == 3:  # right thruster -> push left and rotate CW
            ax -= self.side_acc
            a_angle -= self.torque

        # Integrate
        x_dot += ax * self.dt
        y_dot += ay * self.dt
        angle_dot += a_angle * self.dt

        x += x_dot * self.dt
        y += y_dot * self.dt
        angle += angle_dot * self.dt

        # Simple drag to stabilize learning
        x_dot *= 0.995
        y_dot *= 0.995
        angle_dot *= 0.995

        # Keep within bounds horizontally
        x = float(np.clip(x, self.x_min, self.x_max))
        # Ground collision
        terminated = False
        success = False
        if y <= self.ground_y:
            y = self.ground_y
            # contact flags
            left_c = right_c = 1.0
            # landing conditions
            if abs(x) < 0.2 and abs(x_dot) < 0.5 and abs(y_dot) < 0.5 and abs(angle) < 0.2:
                reward = 100.0  # successful landing
                success = True
            else:
                reward = -100.0  # crash
            terminated = True
            y_dot = 0.0
            angle_dot = 0.0
        else:
            # Shaping reward while airborne
            shaping = - (abs(x)*1.0 + abs(y - 0.2)*0.5 + abs(x_dot)*0.1 + abs(y_dot)*0.2 + abs(angle)*0.3 + abs(angle_dot)*0.1)
            # Small action penalty to discourage fuel use
            act_pen = -0.01 * (action != 0)
            reward = shaping + act_pen

        truncated = self.steps >= self.max_steps
        self.state = np.array([x, y, x_dot, y_dot, angle, angle_dot, left_c, right_c], dtype=np.float32)
        self._last_action = action

        info = {"is_success": success}
        return self.state.copy(), float(reward), bool(terminated), bool(truncated), info

    # -------- Rendering (rgb_array) --------
    def render(self):
        return self._get_frame()

    def _world_to_px(self, x, y):
        # Map world coordinates to pixels
        px = int((x - self.x_min) / (self.x_max - self.x_min) * self._width)
        py = int((self.y_max - y) / (self.y_max - self.y_min) * self._height)
        return px, py

    def _get_frame(self):
        img = Image.new("RGB", (self._width, self._height), (15, 18, 30))
        draw = ImageDraw.Draw(img)
        # Ground
        gy_px = self._world_to_px(0, self.ground_y)[1]
        draw.rectangle([0, gy_px, self._width, self._height], fill=(35, 70, 35))

        # Lander as a triangle
        x, y, x_dot, y_dot, angle, angle_dot, left_c, right_c = self.state
        cx, cy = self._world_to_px(x, y)
        size = 18  # pixel size
        # Triangle points before rotation (upright)
        pts = np.array([[0, -size], [-size/2, size/2], [size/2, size/2]], dtype=np.float32)
        c, s = math.cos(angle), math.sin(angle)
        rot = np.array([[c, -s], [s, c]], dtype=np.float32)
        pts = (pts @ rot.T) + np.array([cx, cy], dtype=np.float32)

        draw.polygon([tuple(p) for p in pts], outline=(220, 220, 220), fill=(180, 180, 200))

        # Flames
        if self._last_action == 2:  # main
            flame = np.array([[0, size/2], [-6, size], [6, size]], dtype=np.float32)
            flame = (flame @ rot.T) + np.array([cx, cy], dtype=np.float32)
            draw.polygon([tuple(p) for p in flame], fill=(255, 140, 0))
        elif self._last_action == 1:  # left
            flame = np.array([[size/2, 0], [size, -6], [size, 6]], dtype=np.float32)
            flame = (flame @ rot.T) + np.array([cx, cy], dtype=np.float32)
            draw.polygon([tuple(p) for p in flame], fill=(255, 120, 0))
        elif self._last_action == 3:  # right
            flame = np.array([[-size/2, 0], [-size, -6], [-size, 6]], dtype=np.float32)
            flame = (flame @ rot.T) + np.array([cx, cy], dtype=np.float32)
            draw.polygon([tuple(p) for p in flame], fill=(255, 120, 0))

        return np.array(img, dtype=np.uint8)

    def close(self):
        pass

# Register once (safe if re-run)
try:
    register(id="SimpleLander-v0", entry_point=SimpleLanderEnv)
except Exception:
    pass  # already registered

# ------------------ Train & Evaluate with SB3 DQN ------------------
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy

# Train (no rendering)
env = gym.make("SimpleLander-v0")
model = DQN("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=5_000, progress_bar=False)  # increase for better results
env.close()

# Evaluate
eval_env = gym.make("SimpleLander-v0")
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=5)
print(f"Evaluation on SimpleLander-v0: mean={mean_reward:.2f} ± {std_reward:.2f}")
eval_env.close()

# ------------------ Record MP4 + GIF ≤ 100 MB ------------------
render_env = gym.make("SimpleLander-v0", render_mode="rgb_array")
fps = render_env.metadata.get("render_fps", 30)
obs, info = render_env.reset()
frames = []
done = False
max_frames = 1500
while not done and len(frames) < max_frames:
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = render_env.step(action)
    frames.append(render_env.render())
    done = terminated or truncated
render_env.close()
print(f"Captured {len(frames)} frames @ ~{fps} FPS")

MP4_PATH = "simple_lander_policy.mp4"
GIF_PATH = "simple_lander_policy.gif"
MAX_MB = 100
MIN_FPS = 3

def file_mb(path): return os.path.getsize(path)/(1024*1024) if os.path.exists(path) else 0.0

def write_mp4(frames, fps, path):
    w = imageio.get_writer(path, format="mp4", fps=fps)
    for f in frames: w.append_data(f)
    w.close()

def fit_under_cap_by_fps(frames, orig_fps, writer_fn, out_path, max_mb=100, min_fps=3, step=5, is_gif=False):
    attempts = []
    for target_fps in list(range(int(orig_fps), min_fps-1, -step)) + ([min_fps] if (int(orig_fps)-min_fps) % step else []):
        stride = max(1, math.ceil(orig_fps / target_fps))
        sub = frames[::stride]
        if is_gif:
            imageio.mimsave(out_path, sub, format="gif", duration=1.0/max(1, target_fps))
        else:
            writer_fn(sub, target_fps, out_path)
        size = file_mb(out_path)
        attempts.append((target_fps, len(sub), size))
        if size <= max_mb:
            return True, attempts
    return False, attempts

# MP4
write_mp4(frames, fps, MP4_PATH)
if file_mb(MP4_PATH) > MAX_MB:
    ok, attempts = fit_under_cap_by_fps(frames, fps, write_mp4, MP4_PATH, MAX_MB, MIN_FPS, is_gif=False)
    print("MP4 attempts (fps, frames_kept, size_MB):")
    for a in attempts: print(f"  - {a[0]:>2} fps | {a[1]:>4} frames | {a[2]:6.2f} MB")
print(f"MP4 final size: {file_mb(MP4_PATH):.2f} MB")

# GIF
ok, attempts = fit_under_cap_by_fps(frames, fps, None, GIF_PATH, MAX_MB, MIN_FPS, is_gif=True)
print("GIF attempts (fps, frames_kept, size_MB):")
for a in attempts: print(f"  - {a[0]:>2} fps | {a[1]:>4} frames | {a[2]:6.2f} MB")
print(f"GIF final size: {file_mb(GIF_PATH):.2f} MB")

print("Saved:", MP4_PATH, GIF_PATH)


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
--------------------------------------------
| rollout/            |                    |
|    ep_len_mean      | 19.5               |
|    ep_rew_mean      | -115               |
|    exploration_rate | 0.852              |
|    success_rate     | 0                  |
| time/               |                    |
|    episodes         | 4                  |
|    fps              | 351280770934898688 |
|    time_elapsed     | 0                  |
|    total_timesteps  | 78                 |
--------------------------------------------


----------------------------------
| rollout/            |          |
|    ep_len_mean      | 20.5     |
|    ep_rew_mean      | -117     |
|    exploration_rate | 0.688    |
|    success_rate     | 0        |
| time/               |          |
|    episodes         | 8        |
|    fps              | 2115     |
|    time_elapsed     | 0        |
|    total_timesteps  | 164      |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 6.53     |
|    n_updates        | 15       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 21.8     |
|    ep_rew_mean      | -118     |
|    exploration_rate | 0.502    |
|    success_rate     | 0        |
| time/               |          |
|    episodes         | 12       |
|    fps              | 2341     |
|    time_elapsed     | 0        |
|    total_timesteps  | 262      |
| train/              |          |
|    learning_rate  

  logger.warn(f"Overriding environment {new_spec.id} already in registry.")


----------------------------------
| rollout/            |          |
|    ep_len_mean      | 18.8     |
|    ep_rew_mean      | -116     |
|    exploration_rate | 0.05     |
|    success_rate     | 0        |
| time/               |          |
|    episodes         | 32       |
|    fps              | 2594     |
|    time_elapsed     | 0        |
|    total_timesteps  | 600      |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 6.32     |
|    n_updates        | 124      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 19.5     |
|    ep_rew_mean      | -116     |
|    exploration_rate | 0.05     |
|    success_rate     | 0        |
| time/               |          |
|    episodes         | 36       |
|    fps              | 2456     |
|    time_elapsed     | 0        |
|    total_timesteps  | 703      |
| train/              |          |
|    learning_rate  



MP4 final size: 0.00 MB
GIF attempts (fps, frames_kept, size_MB):
  - 30 fps |   15 frames |   0.00 MB
GIF final size: 0.00 MB
Saved: simple_lander_policy.mp4 simple_lander_policy.gif
