In [2]:
import numpy as np
import random
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from gymnasium import ObservationWrapper
from gymnasium.spaces import Box
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
import csv
import uuid

1. Элементы лабиринта
- Стены
- Путь к выходу
- Ключ и дверь (выход доступен только с ключом)
- Ловушки, которые уменьшают здоровье
- Костёр восстановления здоровья

2. Структура лабиринта
Каждая клетка в лабиринте будет иметь код:
- 0: Пустая клетка
- 1: Стена
- 2: Ключ
- 3: Дверь
- 4: Ловушка
- 5: Костёр
- 6: Игрок
- 7: Выход

In [3]:
# Размеры лабиринта
WIDTH =50
HEIGHT = 50

EMPTY = 0
WALL = 1
KEY = 2
DOOR = 3
TRAP = 4
CAMPFIRE = 5
EXIT = 7

In [4]:
# Создание пустой карты
def create_empty_maze(width, height):
    return np.ones((height, width), dtype=int)  # Всё это стены (1)

# Генерация случайного пути (Алгоритм DFS)
def generate_path(maze, start, end):
    stack = [start]
    visited = set()
    while stack:
        current = stack[-1]
        if current == end:
            break
        visited.add(current)
        x, y = current
        neighbors = []
        for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < maze.shape[0] and 0 <= ny < maze.shape[1] and maze[nx, ny] == 1 and (nx, ny) not in visited:
                neighbors.append((nx, ny))
        if neighbors:
            next_cell = random.choice(neighbors)
            maze[next_cell] = 0
            stack.append(next_cell)
        else:
            stack.pop()

# Размещение ключа и двери
def place_key_and_door(maze, start, end):
    key_pos = random.choice([(x, y) for x in range(maze.shape[0]) for y in range(maze.shape[1]) if maze[x, y] == 0])
    door_pos = random.choice([(x, y) for x in range(maze.shape[0]) for y in range(maze.shape[1]) if maze[x, y] == 0 and (x, y) != key_pos])
    
    maze[key_pos] = 2  # Ключ
    maze[door_pos] = 3  # Дверь

# Размещение ловушек и костров
def place_traps_and_campfires(maze):
    num_traps = random.randint(3, 5)
    num_campfires = random.randint(2, 4)

    for _ in range(num_traps):
        trap_pos = random.choice([(x, y) for x in range(maze.shape[0]) for y in range(maze.shape[1]) if maze[x, y] == 0])
        maze[trap_pos] = 4  # Ловушка

    for _ in range(num_campfires):
        campfire_pos = random.choice([(x, y) for x in range(maze.shape[0]) for y in range(maze.shape[1]) if maze[x, y] == 0])
        maze[campfire_pos] = 5  # Костёр

In [5]:
def generate_maze(
    width=15,
    height=15,
    start_pos=(1, 1),
    num_traps=4,
    num_campfires=3,
    algo='dfs'  # пока только dfs, но задел под выбор
):
    if width % 2 == 0: width += 1
    if height % 2 == 0: height += 1

    maze = np.ones((height, width), dtype=int)  # Стены

    def carve_dfs(x, y):
        dirs = [(2,0), (-2,0), (0,2), (0,-2)]
        random.shuffle(dirs)
        for dx, dy in dirs:
            nx, ny = x + dx, y + dy
            if 1 <= nx < height-1 and 1 <= ny < width-1 and maze[nx, ny] == WALL:
                maze[nx, ny] = EMPTY
                maze[x + dx//2, y + dy//2] = EMPTY
                carve_dfs(nx, ny)

    # Выбор алгоритма генерации
    if algo == 'dfs':
        x, y = start_pos
        maze[x, y] = EMPTY
        carve_dfs(x, y)
    else:
        raise NotImplementedError(f"Алгоритм '{algo}' ещё не реализован")

    # Добавим полезную функцию выбора случайной пустой клетки
    def place_tile(tile_code, count=1):
        placed = 0
        while placed < count:
            x, y = random.randint(1, height-2), random.randint(1, width-2)
            if maze[x, y] == EMPTY:
                maze[x, y] = tile_code
                placed += 1

    place_tile(KEY, 1)
    place_tile(DOOR, 1)
    place_tile(TRAP, num_traps)
    place_tile(CAMPFIRE, num_campfires)

    # Выход
    while True:
        x, y = random.randint(1, height-2), random.randint(1, width-2)
        if maze[x, y] == EMPTY:
            maze[x, y] = EXIT
            break

    return maze

In [6]:
def print_maze(maze, start_pos=None):
    symbols = {
        EMPTY: ' ', WALL: '█', KEY: 'K', DOOR: 'D',
        TRAP: 'T', CAMPFIRE: 'C', EXIT: 'E'
    }

    for i, row in enumerate(maze):
        line = ""
        for j, cell in enumerate(row):
            if start_pos is not None and (i, j) == start_pos:
                line += 'S'
            else:
                line += symbols.get(cell, '?')
        print(line)

In [25]:
def save_maze(maze, path="maze.csv"):
    np.savetxt(path, maze, fmt='%d', delimiter=",")


def save_episode_log(log, path="episode_log.csv"):
    with open(path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["step", "x", "y", "action", "tile", "reward", "health", "has_key"])
        writer.writerows(log)

In [8]:
# Генерируем и печатаем лабиринт
maze = generate_maze(
    width=50,
    height=50,
    start_pos=(1, 1),
    num_traps=6,
    num_campfires=2
)
print_maze(maze, start_pos=(1, 1))

███████████████████████████████████████████████████
█S█   █                 █           █   █       █ █
█ █ █ █ █ ███████████████ █████ ███ █ █ █████ █ █ █
█ █ █   █     █     █   C █   █ █ █   █       █   █
█ █ █████████ █ ███ █ █████ █ █ █ ███████████████ █
█ █     █ █   █ █ █   █     █ █ █       █         █
█ █████ █ █ ███ █ █████████ █ █ █ █ █████ █████████
█   █ █   █   █     █       █ █ █ █ █     █       █
███ █ ███ ███ █████ ███ █████ █ █ ███ █████ ███ █ █
█   █     █ █     █     █ █   █ █     █   █   █ █ █
█ █████ ███ █████ ███████ █ ███ █ █████ █ █████ █ █
█     █       █ █     █ █   █   █ █     █ █   █ █ █
█████ ███████ █ ███ █ █ █ ███ ███ █ █████ █ █ █ █ █
█     █     █   █ █ █ D █ █ █ █ █   █   █   █   █ █
█ █████ ███████ █ █ █ ███ █ █ █ ███████ █████████ █
█ █   █ █       █ █ █ █     █ █         █       █ █
█ █ █ █ █ ███████ █ ███ █████ █ ███ ███ █ █ ███ █ █
█   █ █   █   █   █     █     █ █   █   █ █   █ █ █
█████ █ ███ █ █ █ ███████ ███████ ███ ███ ███ █ █ █
█   █ █ █   

In [21]:

class MazeEnv(gym.Env):
    def __init__(self, maze, start_pos=(1, 1), max_health=100):
        super().__init__()

        self.maze = maze.copy()
        self.start_pos = start_pos
        self.player_pos = list(start_pos)
        self.has_key = False
        self.health = max_health
        self.max_health = max_health
        self.episode_log = []

        self.height, self.width = maze.shape
        self.observation_space = spaces.Dict({
            "view": spaces.Box(low=0, high=7, shape=self.maze.shape, dtype=np.int8),
            "position": spaces.Box(low=0, high=max(self.height, self.width), shape=(2,), dtype=np.int32),
            "has_key": spaces.Discrete(2),
            "health": spaces.Box(low=0, high=max_health, shape=(), dtype=np.int32),
        })

        self.action_space = spaces.Discrete(4)  # 0:UP, 1:DOWN, 2:LEFT, 3:RIGHT

    def reset(self, seed=None, options=None):
        self.player_pos = list(self.start_pos)
        self.has_key = False
        self.health = self.max_health
        self.visible_map = np.full_like(self.maze, fill_value=-1, dtype=np.int8)
        self._update_visibility()
        return self._get_obs(), {}

    def _update_visibility(self):
        x, y = self.player_pos
        for dx in range(-1, 2):
            for dy in range(-1, 2):
                nx, ny = x + dx, y + dy
                if 0 <= nx < self.height and 0 <= ny < self.width:
                    self.visible_map[nx, ny] = self.maze[nx, ny]


    def step(self, action):
        dx, dy = 0, 0
        if action == 0: dx, dy = -1, 0  # UP
        elif action == 1: dx, dy = 1, 0  # DOWN
        elif action == 2: dx, dy = 0, -1  # LEFT
        elif action == 3: dx, dy = 0, 1  # RIGHT

        new_x = self.player_pos[0] + dx
        new_y = self.player_pos[1] + dy

        reward = -0.1
        terminated = False

        if 0 <= new_x < self.height and 0 <= new_y < self.width:
            tile = self.maze[new_x, new_y]
            if tile != 1:  # не стена
                self.player_pos = [new_x, new_y]

                if tile == 2:  # ключ
                    self.has_key = True
                    self.maze[new_x, new_y] = 0
                    reward += 1

                elif tile == 3:  # дверь
                    if self.has_key:
                        pass  # можно пройти
                    else:
                        reward -= 1  # ударился об дверь

                elif tile == 4:  # ловушка
                    self.health -= 50
                    reward -= 5
                    if self.health <= 0:
                        terminated = True
                        reward -= 100

                elif tile == 5:  # костёр
                    self.health = min(self.max_health, self.health + 50)

                elif tile == 7:  # выход
                    if self.has_key:
                        reward += 10
                        terminated = True
                    else:
                        reward -= 1  # нужен ключ
                        
        self._update_visibility()

        obs = self._get_obs()
        self.episode_log.append([
            len(self.episode_log), *self.player_pos, action, tile, reward, self.health, int(self.has_key)
        ])
        return obs, float(reward), bool(terminated), False, {}


    def _get_obs(self):
        return {
            "view": self.visible_map.copy(),   # весь накопленный вид (вся карта)
            "position": np.array(self.player_pos, dtype=np.int32),
            "has_key": int(self.has_key),
            "health": self.health
        }


    def render(self):
        render_maze = self.maze.copy()
        x, y = self.player_pos
        render_maze[x, y] = 9  # агент
        symbols = {
            0: ' ', 1: '█', 2: 'K', 3: 'D',
            4: 'T', 5: 'C', 7: 'E', 9: 'A'
        }
        # print('\n'.join(''.join(symbols.get(cell, '?') for cell in row) for row in render_maze))
        # print(f"Pos: {self.player_pos}  HP: {self.health}  Key: {self.has_key}")

In [22]:
class FlattenObservationWrapper(ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        view_shape = env.observation_space.spaces["view"].shape
        self.observation_space = Box(
            low=0,
            high=1,
            shape=(view_shape[0] * view_shape[1] + 2 + 2,),  # view + has_key + health + position(2)
            dtype=np.float32,
        )

    def observation(self, obs):
        view_flat = (obs["view"].flatten().astype(np.float32) + 1) / 8.0  # нормализация сдвигом
        has_key = np.array([obs["has_key"]], dtype=np.float32)
        health = np.array([obs["health"] / self.env.max_health], dtype=np.float32)
        position = obs.get("position")
        if position is not None:
            position = position.astype(np.float32) / max(self.env.height, self.env.width)
            return np.concatenate([view_flat, has_key, health, position])
        else:
            return np.concatenate([view_flat, has_key, health])

In [23]:
# 1. Сгенерируем лабиринт
maze_id = str(uuid.uuid4())
maze_filename = f"maze_{maze_id}.csv"

maze = generate_maze(50, 50)


# 2. Обернём среду
env = MazeEnv(maze, start_pos=(1, 1))
env = FlattenObservationWrapper(env)

# 3. Проверим, что всё ок
check_env(env, warn=True)

# 4. Обучаем PPO
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log="./ppo_maze_tensorboard", n_steps=128)

# 5. Запуск обучения
model.learn(total_timesteps=10_000, progress_bar=True)


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ./ppo_maze_tensorboard/PPO_3


Output()



----------------------------
| time/              |     |
|    fps             | 272 |
|    iterations      | 1   |
|    time_elapsed    | 0   |
|    total_timesteps | 128 |
----------------------------
-------------------------------------------
| time/                   |               |
|    fps                  | 214           |
|    iterations           | 2             |
|    time_elapsed         | 1             |
|    total_timesteps      | 256           |
| train/                  |               |
|    approx_kl            | 0.00014753174 |
|    clip_fraction        | 0             |
|    clip_range           | 0.2           |
|    entropy_loss         | -1.39         |
|    explained_variance   | -0.0843       |
|    learning_rate        | 0.0003        |
|    loss                 | 0.462         |
|    n_updates            | 10            |
|    policy_gradient_loss | -0.00137      |
|    value_loss           | 1.52          |
-------------------------------------------
-----

<stable_baselines3.ppo.ppo.PPO at 0x79fef7423d60>

In [26]:
obs, _ = env.reset()
done = False
step_count = 0
log_filename = f"episode_log_{maze_id}.csv"
max_steps = maze.shape[0] * maze.shape[1]

while not done and step_count < max_steps:
    action, _ = model.predict(obs)
    obs, reward, done, _, _ = env.step(action)
    env.render()
    step_count += 1

if not done:
    print(f"Лабиринт {maze_id} НЕ ПРОЙДЕН")
    env.unwrapped.episode_log.append([
        step_count, *env.unwrapped.player_pos, -1, -1, -999, env.unwrapped.health, int(env.unwrapped.has_key)
    ])

# 6. Сохраняем результаты с id
save_maze(maze, maze_filename)
save_episode_log(env.unwrapped.episode_log, log_filename)

Лабиринт edccaf26-74a7-4365-98c6-741e83f972d3 НЕ ПРОЙДЕН
