In [1]:
import sys
import os

#10_DQL_Applied/Minigrid_with_Monsters

# Agrega el directorio principal a sys.path
module_path = os.path.abspath(os.path.join('./Minigrid_with_Monsters'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [2]:
from __future__ import annotations


from Minigrid_with_Monsters.minigrid.core.constants import COLOR_NAMES
from Minigrid_with_Monsters.minigrid.core.grid import Grid
from Minigrid_with_Monsters.minigrid.core.mission import MissionSpace
from Minigrid_with_Monsters.minigrid.core.world_object import Goal
from Minigrid_with_Monsters.minigrid.minigrid_env import MiniGridEnv
from Minigrid_with_Monsters.minigrid.core.world_object import Monster
from time import sleep

import numpy as np
from typing import Any


class LockedRoomEnv(MiniGridEnv):
    def __init__(self, size=19, max_steps: int | None = None, **kwargs):
        self.size = size
        self.monster = Monster()

        if max_steps is None:
            max_steps = 10 * size

        mission_space = MissionSpace(mission_func=lambda: "")
        
        super().__init__(
            mission_space=mission_space,
            width=size,
            height=size,
            max_steps=max_steps,
            **kwargs,
        )

    def _gen_grid(self, width, height):
        # Create the grid
        self.grid = Grid(width, height)

        # Generate the surrounding walls
        self.grid.horz_wall(0, 0)
        self.grid.horz_wall(0, height - 1)
        self.grid.vert_wall(0, 0)
        self.grid.vert_wall(width - 1, 0)

        room_w = width // 2
        room_h = height // 2

        # For each row of rooms
        for j in range(0, 2):
            # For each column
            for i in range(0, 2):
                xL = i * room_w
                yT = j * room_h
                xR = xL + room_w
                yB = yT + room_h

                # Bottom wall and door
                if i + 1 < 2:
                    self.grid.vert_wall(xR, yT, room_h)
                    pos = (xR, self._rand_int(yT + 1, yB))
                    self.grid.set(*pos, None)

                # Bottom wall and door
                if j + 1 < 2:
                    self.grid.horz_wall(xL, yB, room_w)
                    pos = (self._rand_int(xL + 1, xR), yB)
                    self.grid.set(*pos, None)

        # Place the agent goal and monster
        self.place_agent()
        self.place_obj(Goal())
        self.monster.position = self.place_obj(self.monster)

    def step(self, action):
        obs, reward, terminated, truncated, info = super().step(action)

        # Check if monster can see the agent. If so, move towards it.
        # If the monster is in the same cell as the agent, the agent is caught.
        if self.monster.can_see(self.grid, self.agent_pos):
            # if the monster can see the agent, move towards it
            self.monster.move_towards(self.grid, self.agent_pos)

            if self.monster.position == self.agent_pos:
                terminated = True
                reward = -1
        else:
            # if the monster can't see the agent, move randomly
            self.monster.patrol_forward(self.grid)

        self.grid.set(*self.monster.position, self.monster)

        return obs, reward, terminated, truncated, info



pygame 2.6.0 (SDL 2.28.4, Python 3.12.1)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [3]:
# import gymnasium as gym
# from gymnasium.utils.play import play
# import numpy as np

# env = LockedRoomEnv(render_mode="rgb_array", max_steps=1000)

# play(env, keys_to_action={
#     (ord("a"),): 0,
#     (ord("d"),): 1,
#     (ord("w"),): 2,
#     (ord("j"),): 3,
#     (ord("k"),): 5,
# }, noop=6)

In [4]:
import gymnasium as gym
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import VecFrameStack, VecTransposeImage, DummyVecEnv
from stable_baselines3.common.env_checker import check_env

# Initialize the environment
env = LockedRoomEnv(render_mode="rgb_array", max_steps=1000)

class FlattenObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        # Assuming you only want the 'image' observation
        img_space = self.observation_space.spaces['image']
        self.observation_space = gym.spaces.Box(
            low=img_space.low.flatten(),
            high=img_space.high.flatten(),
            dtype=img_space.dtype
        )

    def observation(self, obs):
        # Return only the flattened 'image' array
        return obs['image'].flatten()
    
# Wrap the environment
env = FlattenObservationWrapper(env)

# Check the environment to ensure compatibility with SB3
check_env(env)

model = DQN('MlpPolicy', env, verbose=1)
model.learn(total_timesteps=20000)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 478      |
|    ep_rew_mean      | -0.75    |
|    exploration_rate | 0.0927   |
| time/               |          |
|    episodes         | 4        |
|    fps              | 834      |
|    time_elapsed     | 2        |
|    total_timesteps  | 1910     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.000755 |
|    n_updates        | 452      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 546      |
|    ep_rew_mean      | -0.75    |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 899      |
|    time_elapsed     | 4        |
|    total_timesteps  | 4365     |
| train/              |        

<stable_baselines3.dqn.dqn.DQN at 0x75fdf2047b90>

: 

In [5]:
import os
from stable_baselines3.common.vec_env import VecVideoRecorder

# Define the video folder and length
video_folder = 'videos/'
video_length = 500

# Ensure the directory exists
os.makedirs(video_folder, exist_ok=True)

# Wrap the environment with VecVideoRecorder
env = DummyVecEnv([lambda: env])
env = VecVideoRecorder(env, video_folder, record_video_trigger=lambda x: x == 0, video_length=video_length, name_prefix="dqn-agent")

# Reset the environment
obs = env.reset()

# Run the agent for the specified number of steps and record the video
for _ in range(video_length):
    action, _ = model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    if dones:
        obs = env.reset()

# Close the environment and save the video
env.close()

print("Video recorded successfully!")


Moviepy - Building video /workspaces/AI_Pygame_Playground/10_DQL_Applied/videos/dqn-agent-step-0-to-step-500.mp4.
Moviepy - Writing video /workspaces/AI_Pygame_Playground/10_DQL_Applied/videos/dqn-agent-step-0-to-step-500.mp4



                                                               

Moviepy - Done !
Moviepy - video ready /workspaces/AI_Pygame_Playground/10_DQL_Applied/videos/dqn-agent-step-0-to-step-500.mp4
