# üéÆ Demo: Trained RL Agent

This notebook demonstrates the trained PPO agent navigating in BabyAI environments.

In [None]:
import sys
sys.path.insert(0, '..')
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import minigrid
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import time

In [None]:
# Environment matching training
class MiniGridFlatEnv(gym.Env):
    def __init__(self, config=None):
        super().__init__()
        config = config or {}
        env_name = config.get("env_name", "BabyAI-GoToObj-v0")
        max_steps = config.get("max_steps", 64)
        self.env = gym.make(env_name, render_mode="rgb_array")
        self.env.unwrapped.max_steps = max_steps
        self.observation_space = spaces.Box(low=0.0, high=1.0, shape=(151,), dtype=np.float32)
        self.action_space = self.env.action_space
        self.instruction = ""
    
    def _flatten_obs(self, obs):
        image = obs["image"].flatten().astype(np.float32) / 10.0
        direction = np.zeros(4, dtype=np.float32)
        direction[obs["direction"]] = 1.0
        return np.concatenate([image, direction])
    
    def reset(self, *, seed=None, options=None):
        obs, info = self.env.reset(seed=seed, options=options)
        self.instruction = self.env.unwrapped.mission
        return self._flatten_obs(obs), info
    
    def step(self, action):
        obs, reward, term, trunc, info = self.env.step(action)
        return self._flatten_obs(obs), reward, term, trunc, info
    
    def render(self):
        return self.env.render()

print("Environment ready!")

In [None]:
# Load trained model
import ray
from ray.rllib.algorithms.ppo import PPOConfig
from ray.tune.registry import register_env

if ray.is_initialized():
    ray.shutdown()
ray.init(ignore_reinit_error=True, num_cpus=2)

def env_creator(config):
    return MiniGridFlatEnv(config)

register_env("MiniGridFlat-v0", env_creator)

config = (
    PPOConfig()
    .api_stack(
        enable_rl_module_and_learner=False,
        enable_env_runner_and_connector_v2=False,
    )
    .environment(
        env="MiniGridFlat-v0",
        env_config={"env_name": "BabyAI-GoToObj-v0", "max_steps": 64},
    )
    .framework("torch")
    .env_runners(num_env_runners=0)
    .resources(num_gpus=0)
)

algo = config.build()

# CHANGE THIS PATH to your checkpoint
CHECKPOINT_PATH = "../experiments/checkpoints/final"  # or wherever you extracted it

try:
    algo.restore(CHECKPOINT_PATH)
    print(f"‚úÖ Loaded model from {CHECKPOINT_PATH}")
except:
    print("‚ö†Ô∏è Could not load checkpoint. Using untrained model for demo.")
    print("To load your trained model:")
    print("1. Unzip trained_model.zip to experiments/")
    print("2. Update CHECKPOINT_PATH above")

In [None]:
# Visualize agent solving a task
ACTION_NAMES = ["left", "right", "forward", "pickup", "drop", "toggle", "done"]

def run_episode(seed=42, show_steps=True):
    env = MiniGridFlatEnv({"env_name": "BabyAI-GoToObj-v0", "max_steps": 64})
    obs, _ = env.reset(seed=seed)
    
    print(f"üéØ Instruction: {env.instruction}")
    print("="*50)
    
    frames = [env.render()]
    actions = []
    done = False
    step = 0
    
    while not done:
        action = algo.compute_single_action(obs)
        actions.append(ACTION_NAMES[action])
        obs, reward, term, trunc, _ = env.step(action)
        done = term or trunc
        step += 1
        frames.append(env.render())
        
        if show_steps:
            print(f"Step {step}: {ACTION_NAMES[action]}")
    
    if term and reward > 0:
        print(f"\nüéâ SUCCESS in {step} steps!")
    else:
        print(f"\n‚ùå Failed after {step} steps")
    
    return frames, actions

frames, actions = run_episode(seed=42)

In [None]:
# Show the trajectory
num_frames = min(len(frames), 12)
fig, axes = plt.subplots(2, 6, figsize=(15, 5))

for i, ax in enumerate(axes.flat):
    if i < num_frames:
        ax.imshow(frames[i])
        if i < len(actions):
            ax.set_title(f"Step {i}: {actions[i]}")
        else:
            ax.set_title("Done")
    ax.axis('off')

plt.tight_layout()
plt.show()

In [None]:
# Evaluate on multiple episodes
successes = 0
total_steps = 0
N = 50

for i in range(N):
    env = MiniGridFlatEnv({"env_name": "BabyAI-GoToObj-v0", "max_steps": 64})
    obs, _ = env.reset(seed=3000+i)
    done = False
    steps = 0
    
    while not done:
        action = algo.compute_single_action(obs)
        obs, reward, term, trunc, _ = env.step(action)
        done = term or trunc
        steps += 1
    
    if term and reward > 0:
        successes += 1
    total_steps += steps

print(f"\nüìä Results on {N} episodes:")
print(f"   Success Rate: {successes/N:.1%}")
print(f"   Avg Steps: {total_steps/N:.1f}")

In [None]:
# Cleanup
algo.stop()
ray.shutdown()
print("Done!")