#  Cell 1: Installations

In [1]:
# Cell 1: Installations
# Install PyTorch with CUDA support
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 -q
# Install core libraries for simulation, data, and AI
!pip install datasets Pillow matplotlib scikit-learn gymnasium opencv-python-headless -q
# Install the Stable Baselines3 libraries for Reinforcement Learning
!pip install stable-baselines3[extra] sb3-contrib -q

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/93.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m93.2/93.2 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/187.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m187.2/187.2 kB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
[?25h

# Cell 2: Imports and Configuration

In [2]:
# Cell 2: Imports and Global Configuration
import torch
from torch import nn
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import heapq
import cv2
import os
import random

# Stable Baselines 3 and extensions
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv
from sb3_contrib import MaskablePPO
from sb3_contrib.common.wrappers import ActionMasker

# --- Global Configuration for the V2.0 Model ---
config = {
    # --- Simulation Parameters ---
    "MAX_AGENTS": 10,         # The fixed number of agent "slots" for the observation
    "MAX_EXITS": 248,         # The fixed number of perimeter "exit slots" for the action mask
    "GRID_SIZE": 64,          # The fixed size of the grid the AI will "see"

    # --- Agent Behavior ---
    "PANIC_DISTANCE": 20,     # Reduced distance to make panic more likely
    "ALERT_DISTANCE": 40,

    # --- Training Parameters ---
    "TOTAL_TIMESTEPS": 1_000_000, # Target for the full training run (can be done in chunks)
    "N_ENVS": 8,              # Number of parallel environments (good for T4 GPU)
    "BATCH_SIZE": 256,

    # --- File Paths ---
    "UNET_MODEL_PATH": "unet_floorplan_model.pth",
    "PPO_MODEL_PATH": "ppo_commander_v2.0.zip",

    # --- Dataset ---
    "DATASET_NAME": "zimhe/pseudo-floor-plan-12k"
}

# --- Device Configuration ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print("--- Configuration Loaded ---")
print(f"Using device: {device}")
print(f"Max Agents: {config['MAX_AGENTS']}, Max Exits: {config['MAX_EXITS']}")

--- Configuration Loaded ---
Using device: cuda
Max Agents: 10, Max Exits: 248


Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  return datetime.utcnow().replace(tzinfo=utc)


# Cell 3: Load Pre-Trained U-Net and Define Helper Functions

In [3]:


# 1. Re-define the U-Net Model Architecture
# This must be the EXACT same structure as the one we used for training in the previous notebook.
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        def double_conv(in_channels, out_channels):
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
                nn.ReLU(inplace=True))
        self.dconv_down1 = double_conv(3, 64)
        self.dconv_down2 = double_conv(64, 128)
        self.dconv_down3 = double_conv(128, 256)
        self.dconv_down4 = double_conv(256, 512)
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dconv_up3 = double_conv(256 + 512, 256)
        self.dconv_up2 = double_conv(128 + 256, 128)
        self.dconv_up1 = double_conv(128 + 64, 64)
        self.conv_last = nn.Conv2d(64, 1, kernel_size=1)

    def forward(self, x):
        conv1 = self.dconv_down1(x); x = self.maxpool(conv1)
        conv2 = self.dconv_down2(x); x = self.maxpool(conv2)
        conv3 = self.dconv_down3(x); x = self.maxpool(conv3)
        x = self.dconv_down4(x)
        x = nn.functional.interpolate(x, scale_factor=2, mode='bilinear', align_corners=True)
        x = torch.cat([x, conv3], dim=1); x = self.dconv_up3(x)
        x = nn.functional.interpolate(x, scale_factor=2, mode='bilinear', align_corners=True)
        x = torch.cat([x, conv2], dim=1); x = self.dconv_up2(x)
        x = nn.functional.interpolate(x, scale_factor=2, mode='bilinear', align_corners=True)
        x = torch.cat([x, conv1], dim=1); x = self.dconv_up1(x)
        return self.conv_last(x)

# 2. Load the trained U-Net weights from the file
unet_model = UNet().to(device)
try:
    unet_model.load_state_dict(torch.load(config["UNET_MODEL_PATH"], map_location=device))
    unet_model.eval()  # Set the model to evaluation mode (very important!)
    print("--- U-Net model loaded successfully ---")
except FileNotFoundError:
    print(f"--- ERROR: U-Net model file not found at '{config['UNET_MODEL_PATH']}' ---")
    print("Please make sure you have uploaded the .pth file to the Colab session.")

# 3. Define the image processing helper function
def create_grid_from_image(unet_model, pil_image):
    """
    Takes a loaded U-Net model and a PIL Image object,
    and returns a binary numpy grid resized to the simulation GRID_SIZE.
    """
    transform = transforms.Compose([
        transforms.Resize((config['GRID_SIZE'], config['GRID_SIZE'])),
        transforms.ToTensor(),
    ])

    input_tensor = transform(pil_image).unsqueeze(0).to(device)

    with torch.no_grad():
        output = unet_model(input_tensor)

    output_probs = torch.sigmoid(output)
    binary_mask = (output_probs > 0.5).float()

    grid_numpy = binary_mask.squeeze().cpu().numpy()
    return grid_numpy

print("Helper function 'create_grid_from_image' is defined.")

--- U-Net model loaded successfully ---
Helper function 'create_grid_from_image' is defined.


# Cell 4: Upgraded Simulation Classes (Person, Fire, and the new EvacuationEnv)

In [4]:
# --- A* Pathfinding (Unchanged) ---
def a_star_search(grid, start, goal, fire_map=None):
    def heuristic(a, b): return abs(a[0] - b[0]) + abs(a[1] - b[1])
    neighbors = [(0, 1), (0, -1), (1, 0), (-1, 0)]
    close_set, came_from, gscore = set(), {}, {start: 0}
    fscore = {start: heuristic(start, goal)}
    oheap = [(fscore[start], start)]
    while oheap:
        current = heapq.heappop(oheap)[1]
        if current == goal:
            path = []
            while current in came_from: path.append(current); current = came_from[current]
            return path[::-1]
        close_set.add(current)
        for i, j in neighbors:
            neighbor = current[0] + i, current[1] + j
            if not (0 <= neighbor[0] < grid.shape[1] and 0 <= neighbor[1] < grid.shape[0]): continue
            if grid[neighbor[1]][neighbor[0]] == 1: continue
            if fire_map is not None and fire_map[neighbor[1]][neighbor[0]] == 1: continue

            tentative_g_score = gscore[current] + 1
            if neighbor in close_set and tentative_g_score >= gscore.get(neighbor, 0): continue
            if tentative_g_score < gscore.get(neighbor, 0) or neighbor not in [i[1] for i in oheap]:
                came_from[neighbor], gscore[neighbor] = current, tentative_g_score
                fscore[neighbor] = tentative_g_score + heuristic(neighbor, goal)
                heapq.heappush(oheap, (fscore[neighbor], neighbor))
    return []

# --- Person and FireSimulator Classes (Unchanged from our last version) ---
class Person:
    def __init__(self, position):
        self.initial_pos, self.pos = tuple(position), list(position)
        self.path, self.status = [], 'evacuating'
        self.state, self.speed, self.trip_probability, self.tripped_timer = 'CALM', 1.0, 0.0, 0
    def update_state(self, fire_map):
        if self.tripped_timer > 0: return
        fire_locs = np.argwhere(fire_map == 1)
        min_dist = np.min(np.linalg.norm(fire_locs - np.array([self.pos[1], self.pos[0]]), axis=1)) if len(fire_locs) > 0 else float('inf')
        if min_dist < config["PANIC_DISTANCE"]: self.state, self.speed, self.trip_probability = 'PANICKED', 1.5, 0.1
        elif min_dist < config["ALERT_DISTANCE"]: self.state, self.speed, self.trip_probability = 'ALERT', 1.2, 0.0
        else: self.state, self.speed, self.trip_probability = 'CALM', 1.0, 0.0
    def move(self):
        if self.tripped_timer > 0: self.tripped_timer -= 1; return
        if self.state == 'PANICKED' and np.random.rand() < self.trip_probability: self.tripped_timer = 5; return
        for _ in range(int(round(self.speed))):
            if self.path: self.pos = self.path.pop(0)
            else: break
    def check_status(self, fire_map, exits):
        if self.status != 'evacuating': return
        pos_yx = (int(self.pos[1]), int(self.pos[0]))
        if fire_map[pos_yx[0], pos_yx[1]] == 1: self.status = 'burned'; return
        if any(np.linalg.norm(np.array(self.pos) - np.array(ex)) < 5 for ex in exits): self.status = 'escaped'
    def compute_path(self, grid, goal, fire_map):
        self.path = a_star_search(grid, (int(self.pos[0]), int(self.pos[1])), (int(goal[0]), int(goal[1])), fire_map)
    def reset(self):
        self.pos = list(self.initial_pos); self.path = []; self.status = 'evacuating'
        self.state, self.speed, self.trip_probability, self.tripped_timer = 'CALM', 1.0, 0.0, 0

class FireSimulator:
    def __init__(self, grid, spread_prob=0.25): self.grid, self.spread_prob = grid, spread_prob; self.reset()
    def step(self):
        new_fire = self.fire_map.copy()
        for y, x in np.argwhere(self.fire_map == 1):
            for dy, dx in [(0,1), (0,-1), (1,0), (-1,0)]:
                ny, nx = y + dy, x + dx
                if 0 <= ny < self.grid.shape[0] and 0 <= nx < self.grid.shape[1] and self.fire_map[ny, nx] == 0 and self.grid[ny, nx] == 0 and np.random.rand() < self.spread_prob:
                    new_fire[ny, nx] = 1
        self.fire_map = new_fire
    def reset(self): self.fire_map = np.zeros_like(self.grid)

# --- The Heavily Upgraded EvacuationEnv for MaskablePPO ---
class EvacuationEnv(gym.Env):
    def __init__(self, grid, num_agents, fire_start_pos, exits):
        super().__init__()
        self.grid = grid
        self.num_agents = num_agents
        self.fire_start_pos = fire_start_pos
        self.exits = exits
        self.agents = [Person(pos) for pos in self._get_random_agent_starts()]
        self.fire_sim = FireSimulator(self.grid)
        self.current_step = 0
        self.max_steps = 750

        # --- KEY CHANGE 1: Fixed-Size Observation Space with Padding ---
        obs_fire_shape = config["GRID_SIZE"] * config["GRID_SIZE"]
        obs_agent_shape = config["MAX_AGENTS"] * 3  # x, y, state for each agent
        total_obs_shape = obs_fire_shape + obs_agent_shape + 1 # +1 for timestep
        self.observation_space = spaces.Box(low=0, high=1, shape=(total_obs_shape,), dtype=np.float32)

        # --- KEY CHANGE 2: Fixed-Size Action Space for Masking ---
        self.action_space = spaces.Discrete(config["MAX_EXITS"])
        self._perimeter_coords = self._get_perimeter_coords()

    def _get_perimeter_coords(self):
        coords = []
        s = config["GRID_SIZE"]
        for i in range(1, s - 1): coords.append((i, 1)) # Top edge
        for i in range(1, s - 1): coords.append((i, s - 2)) # Bottom edge
        for i in range(1, s - 1): coords.append((1, i)) # Left edge
        for i in range(1, s - 1): coords.append((s - 2, i)) # Right edge
        return coords

    def _get_random_agent_starts(self):
        starts = []
        while len(starts) < self.num_agents:
            y, x = random.randint(0, self.grid.shape[0]-1), random.randint(0, self.grid.shape[1]-1)
            if self.grid[y, x] == 0: starts.append((x, y))
        return starts

    def _get_observation(self):
        fire_obs = self.fire_sim.fire_map.flatten().astype(np.float32)

        agent_data = []
        state_map = {'CALM': 0.0, 'ALERT': 0.5, 'PANICKED': 1.0}
        for i in range(config["MAX_AGENTS"]):
            if i < len(self.agents):
                agent = self.agents[i]
                agent_data.extend([
                    agent.pos[0] / self.grid.shape[1],
                    agent.pos[1] / self.grid.shape[0],
                    state_map[agent.state]
                ])
            else:
                # --- Padding with Zeros ---
                agent_data.extend([0.0, 0.0, 0.0])

        agent_obs = np.array(agent_data, dtype=np.float32)
        time_obs = np.array([self.current_step / self.max_steps], dtype=np.float32)
        return np.concatenate([fire_obs, agent_obs, time_obs])

    def compute_action_mask(self):
        # --- KEY CHANGE 3: The Action Masking Logic ---
        mask = np.zeros(config["MAX_EXITS"], dtype=bool)
        for exit_pos in self.exits:
            # Find the index of this exit in our master list of perimeter coordinates
            try:
                # Need to handle tuple vs list, ensure pos is (x,y)
                exit_pos_tuple = (int(exit_pos[0]), int(exit_pos[1]))
                idx = self._perimeter_coords.index(exit_pos_tuple)
                mask[idx] = True
            except ValueError:
                pass # This exit is not on the standard perimeter, ignore
        return mask

    def reset(self, seed=None):
        super().reset(seed=seed)
        self.current_step = 0
        self.fire_sim.reset()
        self.fire_sim.fire_map[self.fire_start_pos[1], self.fire_start_pos[0]] = 1
        self.agents = [Person(pos) for pos in self._get_random_agent_starts()]
        return self._get_observation(), {}

    def step(self, action):
        target_exit_coord = self._perimeter_coords[action]
        reward = -0.01
        self.current_step += 1
        self.fire_sim.step()

        for agent in self.agents:
            if agent.status == 'evacuating':
                agent.update_state(self.fire_sim.fire_map)
                if not agent.path or self.current_step % 10 == 0:
                    agent.compute_path(self.grid, target_exit_coord, self.fire_sim.fire_map)
                agent.move()
                agent.check_status(self.fire_sim.fire_map, self.exits)
                if agent.status == 'escaped': reward += 10
                elif agent.status == 'burned': reward -= 10

        terminated = all(agent.status != 'evacuating' for agent in self.agents)
        truncated = self.current_step >= self.max_steps
        return self._get_observation(), reward, terminated, truncated, {}

    # Required for MaskablePPO
    def valid_action_mask(self):
        return self.compute_action_mask()

print("--- Upgraded Simulation Classes Defined ---")
print("EvacuationEnv is now ready for Action Masking and Padded Observations.")

--- Upgraded Simulation Classes Defined ---
EvacuationEnv is now ready for Action Masking and Padded Observations.


# Cell 5: The Dynamic Training Environment (The "Gymnasium Factory")

In [5]:
from datasets import load_dataset
from sb3_contrib.common.wrappers import ActionMasker

class RandomFloorplanEnv(gym.Env):
    def __init__(self):
        super().__init__()
        print("Loading Hugging Face dataset... (this may take a moment)")
        self.dataset = load_dataset(config["DATASET_NAME"], split='train')
        print("Dataset loaded.")

        # The underlying env will change every reset, so we define a "master" space
        self.observation_space = spaces.Box(
            low=0, high=1,
            shape=((config["GRID_SIZE"]**2) + (config["MAX_AGENTS"] * 3) + 1,),
            dtype=np.float32
        )
        self.action_space = spaces.Discrete(config["MAX_EXITS"])

        self.current_env = None

    def _create_new_episode_env(self):
        """The core logic for creating a new, randomized scenario."""
        # --- 1. Get a random floor plan and create the grid ---
        random_idx = random.randint(0, len(self.dataset) - 1)
        plan_image = self.dataset[random_idx]['plans'].convert("RGB")
        grid = create_grid_from_image(unet_model, plan_image)

        # --- 2. Find exits on this specific grid ---
        # We find exits by looking for openings on the perimeter
        exits = []
        s = config["GRID_SIZE"]
        for x in range(1, s - 1):
            if grid[1, x] == 0: exits.append((x, 1))
            if grid[s-2, x] == 0: exits.append((x, s-2))
        for y in range(1, s - 1):
            if grid[y, 1] == 0: exits.append((1, y))
            if grid[y, s-2] == 0: exits.append((s-2, y))

        # Skip this floor plan if it has no detectable exits
        if not exits:
            return self._create_new_episode_env() # Recursively try again

        # --- 3. Randomize the scenario ---
        num_agents = random.randint(3, config["MAX_AGENTS"])

        # Find a valid, non-wall spot for the fire to start
        valid_starts = np.argwhere(grid == 0)
        if len(valid_starts) == 0:
             return self._create_new_episode_env() # Try again if no valid spots

        fire_start_pos_yx = random.choice(valid_starts)
        fire_start_pos_xy = (fire_start_pos_yx[1], fire_start_pos_yx[0]) # Convert to (x,y)

        # --- 4. Create and return the configured environment ---
        return EvacuationEnv(
            grid=grid,
            num_agents=num_agents,
            fire_start_pos=fire_start_pos_xy,
            exits=exits
        )

    def reset(self, seed=None):
        if seed is not None:
            super().reset(seed=seed)

        self.current_env = self._create_new_episode_env()
        return self.current_env.reset()

    def step(self, action):
        return self.current_env.step(action)

    # This is the function the ActionMasker will call
    def get_action_mask(self):
        return self.current_env.compute_action_mask()

# --- Helper function to create the final, wrapped training environment ---
def make_training_env():
    """Creates an instance of our dynamic env and wraps it with the ActionMasker."""
    env = RandomFloorplanEnv()
    masked_env = ActionMasker(env, action_mask_fn=lambda e: e.get_action_mask())
    return masked_env

print("\n--- Dynamic Training Environment Factory Defined ---")
print("Ready to create randomized training scenarios.")


--- Dynamic Training Environment Factory Defined ---
Ready to create randomized training scenarios.


  return datetime.utcnow().replace(tzinfo=utc)


# Cell 6: The Final, Definitive Training Script for the V2.0 AI Commander


In [None]:
# Cell 6 (CORRECTED): The Final, Definitive Training Script for the V2.0 AI Commander

# MODIFIED: Added CheckpointCallback to the imports
from stable_baselines3.common.callbacks import CheckpointCallback

# --- 1. Create the Parallel, Masked, Dynamic Training Environments ---
print("--- Initializing Parallel Training Environments ---")
vec_env = make_vec_env(
    make_training_env,
    n_envs=config["N_ENVS"],
    vec_env_cls=SubprocVecEnv
)
print(f"--- {config['N_ENVS']} parallel environments created successfully ---")

# --- 2. Define PPO Hyperparameters and Model Path ---
PPO_MODEL_PATH = config["PPO_MODEL_PATH"]
hyperparams = {
    "learning_rate": 3e-4, "n_steps": 2048, "batch_size": 256, "n_epochs": 10,
    "gamma": 0.99, "gae_lambda": 0.95, "clip_range": 0.2, "ent_coef": 0.01,
    "vf_coef": 0.5, "max_grad_norm": 0.5,
    "policy_kwargs": dict(net_arch=dict(pi=[256, 256], vf=[256, 256])),
    "verbose": 1, "device": 'cuda', "tensorboard_log": "./ppo_v2_tensorboard/"
}

# --- 3. Setup Checkpoint Callback to Save Progress ---
checkpoint_callback = CheckpointCallback(
    save_freq=max(10000 // config["N_ENVS"], 1), # Save every ~10k total steps
    save_path='./checkpoints/',
    name_prefix='ppo_commander_v2'
)

# --- 4. Load or Create the MaskablePPO Model ---
if os.path.exists(PPO_MODEL_PATH):
    print("--- Found existing V2.0 model! Loading to continue training... ---")
    model = MaskablePPO.load(PPO_MODEL_PATH, env=vec_env, device='cuda')
else:
    print("--- No existing V2.0 model found. Creating a new one... ---")
    model = MaskablePPO("MlpPolicy", vec_env, **hyperparams)

# --- 5. THE FINAL TRAINING RUN ---
print(f"\n--- Starting Final AI Commander Training for {config['TOTAL_TIMESTEPS']} timesteps ---")
print("The AI will now train on a dynamic curriculum of randomized floor plans and scenarios.")

try:
    model.learn(
        total_timesteps=config["TOTAL_TIMESTEPS"],
        progress_bar=True,
        reset_num_timesteps=False,
        callback=checkpoint_callback
    )

    print(f"\n--- {config['TOTAL_TIMESTEPS']} timesteps of training complete ---")
    model.save(PPO_MODEL_PATH)
    print(f"\nFinal AI Commander model saved to {PPO_MODEL_PATH}")

except KeyboardInterrupt:
    # --- Graceful exit on interruption ---
    model.save(PPO_MODEL_PATH)
    print("\n--- Training Interrupted by User ---")
    print(f"Current progress has been saved to {PPO_MODEL_PATH}")

finally:
    # --- Clean up the environments ---
    vec_env.close()
    print("\n--- Training environments closed ---")

--- Initializing Parallel Training Environments ---
