In [3]:
import numpy as np
import gymnasium as gym

SUCCESS_REWARD = 100
FAIL_REWARD = -100

def evaluate_policy(
    env: gym.Env,
    policy,
    num_episodes: int = 100,
    seed: int | None = None,
):
    """
    Evaluate a given policy on the LunarLander (discrete) environment.

    The policy is assumed to be a deterministic function:
        action = policy(obs)

    This function does NOT render. It only collects metrics.

    Metrics returned (main keys):
        - num_episodes
        - mean_return, std_return, min_return, max_return
        - solved_rate              # fraction of episodes with total_return >= 200
        - success_rate             # fraction of episodes that ended in a "safe landing"
        - crash_rate               # fraction of episodes that ended in a "crash"
        - timeout_rate             # fraction of episodes that ended by truncation
        - other_terminal_rate      # terminated but neither clearly success nor crash
        - mean_episode_length
        - mean_main_engine_usage   # how many steps per episode main engine fired (action == 1)
        - mean_side_engine_usage   # how many steps per episode side engines fired (action == 2 or 3)

        # Landing smoothness (computed only over success episodes):
        - num_success_episodes
        - mean_final_abs_x_success
        - mean_final_abs_vy_success
        - mean_final_abs_angle_success

    Success / crash classification:
        - We classify episodes only when `terminated == True` (not truncated).
        - For the final step of an episode, we look at the *instant* reward:
              last_step_reward > 0  -> success
              last_step_reward < 0  -> crash
              last_step_reward == 0 -> other_terminal
        - This is a heuristic based on the environment design:
              +100 additional reward for a safe landing
              -100 additional reward for a crash
          which strongly dominates other shaping terms.
    """

    # Store per-episode statistics
    returns = []
    steps_list = []
    main_usage_list = []   # count of steps where main engine fired (action == 1)
    side_usage_list = []   # count of steps where side engines fired (action == 2 or 3)

    # Counters for episode termination types
    num_success = 0        # safe landing (positive last-step reward on terminated)
    num_crash = 0          # crash (negative last-step reward on terminated)
    num_timeout = 0        # truncated episodes (time limit or similar)
    num_other_end = 0      # terminated but last-step reward == 0 (rare / ambiguous)

    # Landing smoothness metrics for success episodes only
    success_final_x = []       # |x| at the end
    success_final_vy = []      # |vy| at the end
    success_final_angle = []   # |angle| at the end

    # Initial reset: use seed only for the very first episode (if provided)
    if seed is not None:
        obs, info = env.reset(seed=seed)
    else:
        obs, info = env.reset()

    for ep in range(num_episodes):
        done = False
        total_reward = 0.0
        steps = 0
        main_usage = 0
        side_usage = 0
        last_step_reward = 0.0   # will be overwritten at each step

        while not done:
            # Policy must be a function: action = policy(obs)
            action = policy(obs)

            # Gymnasium step API: obs, reward, terminated, truncated, info
            obs, reward, terminated, truncated, info = env.step(action)

            total_reward += reward
            steps += 1
            last_step_reward = reward

            # Count engine usage based on discrete action:
            # 0: do nothing
            # 1: main engine
            # 2: side engine (left)
            # 3: side engine (right)
            if action == 1:
                main_usage += 1
            elif action == 2 or action == 3:
                side_usage += 1

            done = terminated or truncated

        # At this point, `obs` is the final state of the episode.
        final_state = obs
        # For LunarLander, state is typically:
        # [x, y, vx, vy, angle, v_angle, left_leg_contact, right_leg_contact]
        final_x = float(abs(final_state[0]))    # horizontal offset from center
        final_vy = float(abs(final_state[3]))   # vertical speed magnitude
        final_angle = float(abs(final_state[4]))  # absolute tilt angle

        # Store basic episode statistics
        returns.append(total_reward)
        steps_list.append(steps)
        main_usage_list.append(main_usage)
        side_usage_list.append(side_usage)

        # Classify how the episode ended
        if truncated:
            # Episode ended due to time limit or external truncation
            num_timeout += 1
        elif terminated:
            # Episode ended naturally; decide between success / crash / other
            if last_step_reward == SUCCESS_REWARD:
                # Safe landing (heuristic: last step reward positive due to +100 bonus)
                num_success += 1
                # Record landing smoothness metrics
                success_final_x.append(final_x)
                success_final_vy.append(final_vy)
                success_final_angle.append(final_angle)
            elif last_step_reward == FAIL_REWARD:
                # Crash (heuristic: last step reward negative due to -100 penalty)
                num_crash += 1
            else:
                # Rare / ambiguous case: terminated but last-step reward exactly 0
                num_other_end += 1

        # Reset for the next episode (without re-seeding)
        obs, info = env.reset()

    # Convert lists to numpy arrays for easier statistics
    returns = np.array(returns, dtype=np.float32)
    steps_arr = np.array(steps_list, dtype=np.int32)
    main_usage_arr = np.array(main_usage_list, dtype=np.int32)
    side_usage_arr = np.array(side_usage_list, dtype=np.int32)

    # Basic return statistics
    mean_return = float(returns.mean())
    std_return = float(returns.std())
    min_return = float(returns.min())
    max_return = float(returns.max())

    # Fraction of episodes with total return >= 200 (standard "solved" threshold)
    solved_rate = float(np.mean(returns >= 200.0))

    # Termination-type rates
    n = num_episodes
    success_rate = num_success / n
    crash_rate = num_crash / n
    timeout_rate = num_timeout / n
    other_terminal_rate = num_other_end / n

    # Episode length and engine usage
    mean_episode_length = float(steps_arr.mean())
    mean_main_engine_usage = float(main_usage_arr.mean())
    mean_side_engine_usage = float(side_usage_arr.mean())

    metrics = {
        "num_episodes": n,
        # Return statistics
        "mean_return": mean_return,
        "std_return": std_return,
        "min_return": min_return,
        "max_return": max_return,
        "solved_rate": solved_rate,
        # Termination statistics
        "success_rate": success_rate,
        "crash_rate": crash_rate,
        "timeout_rate": timeout_rate,
        "other_terminal_rate": other_terminal_rate,
        # Episode length & engine usage
        "mean_episode_length": mean_episode_length,
        "mean_main_engine_usage": mean_main_engine_usage,
        "mean_side_engine_usage": mean_side_engine_usage,
        # Placeholder, will be filled below if there are success episodes
        "num_success_episodes": num_success,
    }

    # Landing smoothness metrics over success episodes (if any)
    if num_success > 0:
        success_final_x = np.array(success_final_x, dtype=np.float32)
        success_final_vy = np.array(success_final_vy, dtype=np.float32)
        success_final_angle = np.array(success_final_angle, dtype=np.float32)

        metrics.update(
            {
                "mean_final_abs_x_success": float(success_final_x.mean()),
                "mean_final_abs_vy_success": float(success_final_vy.mean()),
                "mean_final_abs_angle_success": float(success_final_angle.mean()),
            }
        )
    else:
        # No success episodes: set smooth-landing metrics to None
        metrics.update(
            {
                "mean_final_abs_x_success": None,
                "mean_final_abs_vy_success": None,
                "mean_final_abs_angle_success": None,
            }
        )

    return metrics


In [4]:
import gymnasium as gym

env = gym.make("LunarLander-v3")

def random_policy(obs):
    # `obs` is required by the interface but not used here.
    return env.action_space.sample()

metrics_random = evaluate_policy(env, random_policy, num_episodes=100, seed=0)
print("Random policy metrics:")
for k, v in metrics_random.items():
    print(f"{k}: {v}")

env.close()


Random policy metrics:
num_episodes: 100
mean_return: -187.19863891601562
std_return: 106.7403564453125
min_return: -507.28753662109375
max_return: 19.70433235168457
solved_rate: 0.0
success_rate: 0.0
crash_rate: 1.0
timeout_rate: 0.0
other_terminal_rate: 0.0
mean_episode_length: 89.9
mean_main_engine_usage: 23.24
mean_side_engine_usage: 44.22
num_success_episodes: 0
mean_final_abs_x_success: None
mean_final_abs_vy_success: None
mean_final_abs_angle_success: None


In [5]:
import gymnasium as gym
from gymnasium.envs.box2d.lunar_lander import heuristic

env = gym.make("LunarLander-v3")

def heuristic_policy(obs):
    # Use the official heuristic controller provided by Gymnasium.
    return heuristic(env, obs)

metrics_heuristic = evaluate_policy(env, heuristic_policy, num_episodes=100, seed=0)
print("Heuristic policy metrics:")
for k, v in metrics_heuristic.items():
    print(f"{k}: {v}")

env.close()


Heuristic policy metrics:
num_episodes: 100
mean_return: 240.22869873046875
std_return: 97.566650390625
min_return: -218.28501892089844
max_return: 314.9457702636719
solved_rate: 0.89
success_rate: 0.93
crash_rate: 0.05
timeout_rate: 0.02
other_terminal_rate: 0.0
mean_episode_length: 252.98
mean_main_engine_usage: 15.04
mean_side_engine_usage: 107.14
num_success_episodes: 93
mean_final_abs_x_success: 0.11925307661294937
mean_final_abs_vy_success: 0.0
mean_final_abs_angle_success: 0.04345544055104256
