## Imports

In [10]:
import gymnasium as gym
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback, CallbackList
import warnings
import os

warnings.simplefilter("ignore", category=UserWarning)

## Environment

In [2]:
easy = [[1, 1, 1, 1, 1],
        [1, 0, 0, 0, 1],
        [1, 1, 1, 1, 1]]

medium = [[1, 1, 1, 1, 1, 1, 1, 1],
          [1, 0, 0, 1, 1, 0, 0, 1],
          [1, 0, 0, 1, 0, 0, 0, 1],
          [1, 1, 0, 0, 0, 1, 1, 1],
          [1, 0, 0, 1, 0, 0, 0, 1],
          [1, 0, 1, 0, 0, 1, 0, 1],
          [1, 0, 0, 0, 1, 0, 0, 1],
          [1, 1, 1, 1, 1, 1, 1, 1]]

hard = [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
        [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1],
        [1, 0, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1],
        [1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1],
        [1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1],
        [1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1],
        [1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1],
        [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]

maps = {'easy': easy, 'medium': medium, 'hard': hard}

In [59]:
class GridMazeEnv(gym.Env):
    metadata = {"render_modes": ["human"]}

    def __init__(self, maps):
        super().__init__()
        self.maps = maps
        self.load_map("hard")

        self.observation_space = gym.spaces.Box(low=0, high=max(self.width, self.height), shape=(2,), dtype=np.float32)
        self.action_space = gym.spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)

        free_cells = np.argwhere(self.grid == 0)
        self.agent_pos = free_cells[0] + 0.5

    def load_map(self, map_name):
        self.map_name = map_name
        self.grid = np.array(self.maps[map_name])
        self.height, self.width = self.grid.shape

        free_cells = np.argwhere(self.grid == 0)
        self.agent_pos = free_cells[0] + 0.5
        self.goal_pos = free_cells[-1] + 0.5

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        free_cells = np.argwhere(self.grid == 0)
        self.agent_pos = free_cells[0] + 0.5
        return self.agent_pos.copy(), {}

    def step(self, action):
        delta = np.clip(action, -1, 1) * 0.3
        new_pos = self.agent_pos + delta

        if not (0 <= new_pos[0] < self.width and 0 <= new_pos[1] < self.height):
            new_pos = self.agent_pos

        cell_x, cell_y = int(new_pos[0]), int(new_pos[1])
        if self.grid[cell_y, cell_x] == 1:
            new_pos = self.agent_pos

        self.agent_pos = new_pos
        dist_to_goal = np.linalg.norm(self.agent_pos - self.goal_pos)
        terminated = dist_to_goal < 0.5
        truncated = False
        reward = -dist_to_goal
        info = {}

        return self.agent_pos.copy(), reward, terminated, truncated, info

    def render(self):
        # Simple console render:
        print(f"\nMap: {self.map_name}")
        for y in range(self.height):
            row = ""
            for x in range(self.width):
                if int(self.agent_pos[0]) == x and int(self.agent_pos[1]) == y:
                    row += "A"  # Agent
                elif int(self.goal_pos[0]) == x and int(self.goal_pos[1]) == y:
                    row += "G"  # Goal
                elif self.grid[y, x] == 1:
                    row += "#"  # Wall
                else:
                    row += "."  # Free space
            print(row)

In [60]:
class StructuredMapSwitchCallback(BaseCallback):
    def __init__(self, env, total_timesteps, verbose=0):
        super().__init__(verbose)
        self.env = env
        self.total_timesteps = total_timesteps
        self.milestones = [total_timesteps // 3, 2 * total_timesteps // 3]
        self.current_map = None

    def _on_step(self) -> bool:
        num_steps = self.num_timesteps

        if num_steps < self.milestones[0]:
            desired_map = "easy"
            difficulty = 0
        elif num_steps < self.milestones[1]:
            desired_map = "medium"
            difficulty = 1
        else:
            desired_map = "hard"
            difficulty = 2

        if desired_map != self.current_map:
            self.env.load_map(desired_map)
            self.current_map = desired_map
            if self.verbose > 0:
                print(f"Step {num_steps}: Switched to map '{desired_map}'")

        self.logger.record("map/difficulty", difficulty)

        return True

In [61]:
class ConsoleRenderCallback(BaseCallback):
    def __init__(self, env, render_freq=1000, verbose=0):
        super().__init__(verbose)
        self.env = env
        self.render_freq = render_freq

    def _on_step(self) -> bool:
        if self.n_calls % self.render_freq == 0:
            self.env.render()
        return True

## Train Model

In [None]:
total_timesteps = 10240000
env = GridMazeEnv(maps)
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log="Training/logs/")

map_switch_callback = StructuredMapSwitchCallback(env, total_timesteps=total_timesteps, verbose=1)
render_callback = ConsoleRenderCallback(env, render_freq=1000, verbose=0)
callbacks = CallbackList([map_switch_callback, render_callback])

model.learn(total_timesteps=total_timesteps, callback=callbacks)

Use "tensorboard --logdir logs/" in the Training terminal to get to tensorboard

## Test Model

In [65]:
env = GridMazeEnv(maps)
model1 = PPO("MlpPolicy", env, verbose=1, tensorboard_log="Training/logs/PPO_1")
model2 = PPO("MlpPolicy", env, verbose=1, tensorboard_log="Training/logs/PPO_2")

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [None]:
from stable_baselines3.common.evaluation import evaluate_policy

for i in range(1, 2):
    if i == 1:
        evaluate_policy(model1, env, n_eval_episodes=10, render=True)
        print("\n")
        print("="*50)
        print("\n")
    else:
        evaluate_policy(model2, env, n_eval_episodes=10, render=True)

In [None]:
episodes = 5
for i in range(1, episodes+1):
    obs, _ = env.reset()
    done = False
    score = 0

    while not done:
        action, _ = model1.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        score += reward        
    
    print(f"Episode: {i} Score: {score}")

env.close()

In [None]:
episodes = 5
for i in range(1, episodes+1):
    obs, _ = env.reset()
    truncated = False
    terminated = False
    score = 0

    while not done:
        action, _ = model2.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        score += reward        
    
    print(f"Episode: {i} Score: {score}")

env.close()