In [None]:
!pip install -q gymnasium shimmy==2.0.0 stable-baselines3[extra] matplotlib opencv-python

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/187.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m184.3/187.2 kB[0m [31m8.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m187.2/187.2 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import cv2
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy


Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  return datetime.utcnow().replace(tzinfo=utc)


In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import cv2
import imageio

class WarehouseEnv(gym.Env):
    metadata = {"render_modes": ["rgb_array"], "render_fps": 30}

    def __init__(self, grid_size=10, num_moving_robots=3):
        super().__init__()

        self.grid_size = grid_size
        self.num_moving_robots = num_moving_robots

        # Action space: up, down, left, right
        self.action_space = spaces.Discrete(4)

        self.observation_space = spaces.Box(
            low=0,
            high=grid_size,
            shape=(4,),
            dtype=np.float32,
        )

        self.grid = np.zeros((grid_size, grid_size), dtype=int)

        self._place_shelves()

        self.moving_robots = []
        self.robot_pos = None
        self.goal_pos = None
        self.step_count = 0
        self.max_steps = 200

        self.robot_icon = cv2.imread("robot.png", cv2.IMREAD_UNCHANGED)
        self.shelf_icon = cv2.imread("box.png", cv2.IMREAD_UNCHANGED)
        self.goal_icon = cv2.imread("goal.png", cv2.IMREAD_UNCHANGED)

        self.other_robot_frames = self._load_gif("robotic-arm.gif")
        self.other_robot_frame_index = 0

    def _load_gif(self, path):
        frames = imageio.mimread(path)
        return [cv2.cvtColor(frame, cv2.COLOR_RGBA2BGRA) for frame in frames]

    def _place_shelves(self):
        self.grid[:] = 0
        self.grid[2, 2:8] = 1
        self.grid[5, 1:9] = 1
        self.grid[7, 3:7] = 1

    def _spawn_moving_robots(self):
        self.moving_robots = []
        for _ in range(self.num_moving_robots):
            while True:
                pos = tuple(np.random.randint(0, self.grid_size, size=2))
                if self.grid[pos] == 0:
                    self.moving_robots.append(list(pos))
                    break

    def _move_other_robots(self):
        for i in range(self.num_moving_robots):
            r, c = self.moving_robots[i]

            action = np.random.choice([0, 1, 2, 3])
            if action == 0: r -= 1
            elif action == 1: r += 1
            elif action == 2: c -= 1
            elif action == 3: c += 1

            r = np.clip(r, 0, self.grid_size - 1)
            c = np.clip(c, 0, self.grid_size - 1)

            if self.grid[r, c] == 0:
                self.moving_robots[i] = [r, c]

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.step_count = 0
        self._spawn_moving_robots()

        while True:
            self.robot_pos = np.random.randint(0, self.grid_size, size=2)
            if self.grid[tuple(self.robot_pos)] == 0 and list(self.robot_pos) not in self.moving_robots:
                break

        while True:
            self.goal_pos = np.random.randint(0, self.grid_size, size=2)
            if self.grid[tuple(self.goal_pos)] == 0 and list(self.goal_pos) not in self.moving_robots:
                break

        obs = np.array([*self.robot_pos, *self.goal_pos], dtype=np.float32)
        return obs, {}

    def step(self, action):

        self._move_other_robots()
        old_pos = self.robot_pos.copy()

        if action == 0: self.robot_pos[0] -= 1
        elif action == 1: self.robot_pos[0] += 1
        elif action == 2: self.robot_pos[1] -= 1
        elif action == 3: self.robot_pos[1] += 1

        self.robot_pos = np.clip(self.robot_pos, 0, self.grid_size - 1)

        if self.grid[tuple(self.robot_pos)] == 1:
            self.robot_pos = old_pos
            reward = -1

        elif list(self.robot_pos) in self.moving_robots:
            self.robot_pos = old_pos
            reward = -5

        else:
            dist = np.linalg.norm(self.robot_pos - self.goal_pos)
            reward = -0.1 * dist

        terminated = False
        truncated = False

        if np.array_equal(self.robot_pos, self.goal_pos):
            reward += 10
            terminated = True

        self.step_count += 1
        if self.step_count >= self.max_steps:
            truncated = True

        obs = np.array([*self.robot_pos, *self.goal_pos], dtype=np.float32)
        return obs, reward, terminated, truncated, {}

    def _overlay_icon(self, base, icon, x, y, w, h):

        icon_resized = cv2.resize(icon, (w, h))

        if icon_resized.shape[2] == 4:  # has alpha channel
            rgb = icon_resized[:, :, :3]
            alpha = icon_resized[:, :, 3] / 255.0

            for c in range(3):
                base[y:y+h, x:x+w, c] = (
                    rgb[:, :, c] * alpha +
                    base[y:y+h, x:x+w, c] * (1 - alpha)
                )
        else:
            base[y:y+h, x:x+w] = icon_resized

    def render(self, mode="rgb_array"):
        img_size = 500
        cell_size = img_size // self.grid_size
        img = np.ones((img_size, img_size, 3), dtype=np.uint8) * 255

        for i in range(self.grid_size):
            for j in range(self.grid_size):
                if self.grid[i, j] == 1:
                    self._overlay_icon(
                        img, self.shelf_icon,
                        j*cell_size, i*cell_size,
                        cell_size, cell_size
                    )

        frame = self.other_robot_frames[self.other_robot_frame_index]
        self.other_robot_frame_index = (self.other_robot_frame_index + 1) % len(self.other_robot_frames)

        for r, c in self.moving_robots:
            self._overlay_icon(
                img, frame,
                c*cell_size, r*cell_size,
                cell_size, cell_size
            )

        r, c = self.robot_pos
        self._overlay_icon(
            img, self.robot_icon,
            c*cell_size, r*cell_size,
            cell_size, cell_size
        )

        gr, gc = self.goal_pos
        self._overlay_icon(
            img, self.goal_icon,
            gc*cell_size, gr*cell_size,
            cell_size, cell_size
        )

        return img


In [None]:
# === Cell 4: Create vectorized env for training and plain env for eval/visualization ===
def make_env():
    return WarehouseEnv(grid_size=10)

vec_env = DummyVecEnv([make_env])   # for SB3 training
eval_env = WarehouseEnv(grid_size=10)  # plain env for evaluation & visualization


In [None]:
# === Cell 5: Train PPO ===
model = PPO(
    policy="MlpPolicy",
    env=vec_env,
    learning_rate=3e-4,
    n_steps=1024,
    batch_size=64,
    gae_lambda=0.95,
    gamma=0.99,
    verbose=1,
)

model.learn(total_timesteps=200_000)
model.save("warehouse_ppo_model")


Using cpu device
-----------------------------
| time/              |      |
|    fps             | 994  |
|    iterations      | 1    |
|    time_elapsed    | 1    |
|    total_timesteps | 1024 |
-----------------------------
-----------------------------------------
| time/                   |             |
|    fps                  | 784         |
|    iterations           | 2           |
|    time_elapsed         | 2           |
|    total_timesteps      | 2048        |
| train/                  |             |
|    approx_kl            | 0.013314602 |
|    clip_fraction        | 0.106       |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.38       |
|    explained_variance   | 0.0145      |
|    learning_rate        | 0.0003      |
|    loss                 | 20.2        |
|    n_updates            | 10          |
|    policy_gradient_loss | -0.0113     |
|    value_loss           | 81.3        |
-----------------------------------------
-----------------

In [None]:
# === Cell 6: Evaluate (plain env) - sanity check ===
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
print("Evaluation (mean, std):", mean_reward, std_reward)

obs, info = eval_env.reset()
action, _ = model.predict(obs, deterministic=True)
print("Sample model action:", action)


In [None]:
# === Cell 7: Visualize a single episode and animate ===
viz_env = WarehouseEnv(grid_size=10)
obs, info = viz_env.reset()

frames = []
terminated = False
truncated = False
steps = 0
max_steps = 300

while not (terminated or truncated) and steps < max_steps:
    action, _ = model.predict(obs, deterministic=True)  # obs is a 1D array
    obs, reward, terminated, truncated, info = viz_env.step(int(action))
    frame = viz_env.render()
    frames.append(frame)
    steps += 1

print("Episode length:", len(frames), "terminated:", terminated, "truncated:", truncated)

fig = plt.figure(figsize=(5,5))
plt.axis("off")
im = plt.imshow(frames[0])

def update(i):
    im.set_data(frames[i])
    return [im]

anim = animation.FuncAnimation(fig, update, frames=len(frames), interval=150, blit=True)
plt.close(fig)
from IPython.display import HTML
HTML(anim.to_jshtml())   # displays animation in Colab / Jupyter


Episode length: 12 terminated: True truncated: False


  return datetime.utcnow().replace(tzinfo=utc)
