it goes to the yellow to end the game
it keeps doing +1 -1 to stay in its place without hitting the wall

ideas:
maybe reduce the wall neg rew to -10
increase prob based on eps or step
maybe increase reward for distance

In [1]:
# python -m tensorboard.main --logdir=./src/logs/DQN_36

from typing import Optional, Union
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from utils.vision import compute_visible_cells
from helpers import default_map
import cv2
import time
import math
from stable_baselines3.common.env_checker import check_env
from pathfinding.core.diagonal_movement import DiagonalMovement
from pathfinding.core.grid import Grid
from pathfinding.finder.a_star import AStarFinder
from pathfinding.finder.dijkstra import DijkstraFinder
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
import random

GREEN = (0, 255, 0)
RED = (0, 0, 255)
BLUE = (255, 0, 0)
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
YELLOW = (0, 255, 255)

colors = np.array([WHITE,  # hidden cells
                   YELLOW, # visible cells
                   BLACK,  # walls
                   RED,  # seeker
                   GREEN],   # hider
                  dtype=np.uint8)

UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3
STAY = 4

HITTING_WALL_REWARD = -20
LOSE_REWARD = -100
DISTANCE_COEF_REWARD = 1

OBS_DICT = {
            "hidden":0,
            "visible":1,
            "wall":2,
            "seeker":3,
            "hider":4
        }

MIN_RES = 36




class HideAndSeekEnv(gym.Env):
    def __init__(self, 
                 observation_handler: ObservationHandler,
                 grid_size=12, 
                 vision_range=5, 
                 seq_len=5,
                 use_cache=True, 
                 max_steps=None,
                 render_mode='rgb_array',
                 fps=5,
                ):
        super(HideAndSeekEnv, self).__init__()

        assert vision_range > 0, "Vision range must be greater than 0"
        assert grid_size > 0, "Grid size must be greater than 0"
        assert max_steps is None or max_steps > 0, "Max steps must be greater than 0" 
        assert render_mode in ['rgb_array', 'human'], "Render mode must be 'rgb_array' or 'human'"
        assert seq_len > 0, "Sequence length must be greater than 0"
        
        if max_steps is None:
            max_steps = np.inf

        self.grid_size = grid_size
        self.max_steps = max_steps
        self.vision_range = vision_range
        self.seq_len = seq_len
        self.render_mode = render_mode
        self.use_cache = use_cache
        self.fps = fps

        self.seeker_pos = None
        self.hider_pos = None
        self.cache = {
            'best_seeker_action':-1*np.ones(
                (self.grid_size, self.grid_size, self.grid_size, self.grid_size), 
                dtype=int,
            ),
            'visible_cells':{},
        }
        self.upscale_factor = math.ceil(MIN_RES/self.grid_size)


        self.action_space = spaces.Discrete(4)  # Up, Down, Left, Right, Stay
        self.obs_dict = OBS_DICT
        self.observation_space = observation_handler.get_observation_space()
        self.observation_handler = observation_handler

        self.current_state = None
        self.current_step = 0
        self.prev_states = None
        self.walls = np.zeros((self.grid_size, self.grid_size), dtype=bool)
        self.visible_cells = np.zeros((self.grid_size, self.grid_size), dtype=bool)

        self.current_eps = 0
        self.mode = None
        self.train()
        check_env(self)
        self.current_eps = 0

        self.reset()

    def train(self):
        self.mode = 'train'
    
    def eval(self):
        self.mode = 'eval'
    
    def test(self):
        self.mode = 'test'

    def get_state(self):
        return self.observation_handler.get_observation(
            walls=self.walls, 
            visible_cells=self.visible_cells, 
            seeker_pos=self.seeker_pos, 
            hider_pos=self.hider_pos
        )
    
    def reset(self, seed: Optional[int] = None,):
        super().reset(seed=seed)

        self.current_step = 0
        self.prev_states = np.empty((self.grid_size, self.grid_size, 0), dtype=np.uint8)

        self.walls = self._generate_walls()
        self.seeker_pos = self.generate_seeker_pos()
        self.visible_cells = self.get_visible_cells()
        self.hider_pos = self.generate_hider_pos()

        return self.get_state(), self._get_info()

    def get_visible_cells(self):
        cache = self.cache['visible_cells']
        if self.use_cache:
            key = (self.seeker_pos[0], self.seeker_pos[1])
            if key in cache:
                return cache[key]

        visible_cells = compute_visible_cells(self.walls, self.seeker_pos, self.vision_range)
        if self.use_cache:
            cache[key] = visible_cells

        return visible_cells

    def _generate_walls(self):
        walls_mask = default_map()
        return walls_mask
        
    def generate_hider_pos(self):
        allowed_cells = ~(self.walls | self.visible_cells)
        allowed_cells[self.seeker_pos[0], self.seeker_pos[1]] = False
        return self.sample_from_allowed_cells(allowed_cells)

    def generate_seeker_pos(self):
        allowed_cells = ~self.walls
        return self.sample_from_allowed_cells(allowed_cells)
    
    def index_to_coords(self, index):
        return index // self.grid_size, index % self.grid_size
    
    def coords_to_index(self, coords):
        return coords[0] * self.grid_size + coords[1]
    
    def sample_from_allowed_cells(self, allowed_cells):
        prob = allowed_cells.astype(np.float32)
        prob /= prob.sum()
        flat_prob = prob.flatten()
        sample_index = np.random.choice(flat_prob.size, p=flat_prob)
        x, y = self.index_to_coords(sample_index)
        return np.array([x, y], dtype=np.uint8)
    
    def _get_valid_actions(self, position):
        valid_actions = []
        if position[0] > 0 and not self.walls[position[0] - 1, position[1]]:
            valid_actions.append(UP)
        if position[0] < self.grid_size - 1 and not self.walls[position[0] + 1, position[1]]:
            valid_actions.append(DOWN)
        if position[1] > 0 and not self.walls[position[0], position[1] - 1]:
            valid_actions.append(LEFT)
        if position[1] < self.grid_size - 1 and not self.walls[position[0], position[1] + 1]:
            valid_actions.append(RIGHT)
        # valid_actions.append(STAY)
        return valid_actions
    
    def _move(self, position, action):
        if action == UP:  # Up
            position[0] -= 1
        elif action == DOWN:  # Down
            position[0] += 1
        elif action == LEFT:  # Left
            position[1] -= 1
        elif action == RIGHT:  # Right
            position[1] += 1
        elif action == STAY:  # Stay
            print("Agent action: STAY ?!")
            pass
        return position
    
    def _get_min_distance_from_visible_cells(self, position):
        distances = np.linalg.norm(np.indices((self.grid_size, self.grid_size)) - position[:, np.newaxis, np.newaxis], axis=0)
        distances[~self.visible_cells] = np.inf
        return distances.min()
    
    def _get_info(self):
        return {}
    
    def move_player(self, action):
        assert self.action_space.contains(action), f"{action} is an invalid action"
        assert self.mode in ['test'], "Call move_player only in test mode"

        self._move(self.seeker_pos, action)
        self.visible_cells = self.get_visible_cells()
    
    def step(self, action, verbose=False):
        assert self.action_space.contains(action), f"{action} is an invalid action"

        self.current_step += 1

        reward = 0
        reward_log = {}

        if self.mode == 'train':
            # seeker makes a move based on the old state
            self._move_seeker()

        valid_actions = self._get_valid_actions(self.hider_pos)
        if action in valid_actions:
            self._move(self.hider_pos, action)
        else:
            if verbose:
                print("Hider hit the wall")
            reward += HITTING_WALL_REWARD
            reward_log['hitting_wall'] = HITTING_WALL_REWARD

        min_distance = self._get_min_distance_from_visible_cells(self.hider_pos)
        distance_reward = int(min_distance) * DISTANCE_COEF_REWARD
        reward += distance_reward
        reward_log['distance'] = distance_reward

        terminated = False
        if self.visible_cells[self.hider_pos[0], self.hider_pos[1]]:
            if verbose:
                print("Hider was caught")
            reward += LOSE_REWARD
            reward_log['lose'] = LOSE_REWARD
            terminated = True

        truncated = self.current_step >= self.max_steps and not terminated

        if terminated or truncated:
            self.current_eps += 1

        return self.get_state(), reward, terminated, truncated, self._get_info()

    def _generate_frame(self, matrix, cell_size=50):
        # Calculate image size based on grid dimensions and cell size
        image_size = (matrix.shape[1] * cell_size, matrix.shape[0] * cell_size)

        # Create a blank canvas with white background
        image = np.ones((matrix.shape[1], matrix.shape[0], 3), dtype=np.uint8) * 255

        # Fill each cell with the corresponding color using NumPy indexing
        image_rows = np.arange(matrix.shape[0]) * cell_size
        image_cols = np.arange(matrix.shape[1]) * cell_size

        # print(image[0, :60])
        image = colors[matrix]
        # repeat each row cell_size times
        image = np.repeat(image, cell_size, axis=0)
        image = np.repeat(image, cell_size, axis=1)

        # Draw black lines as separators between cells using NumPy indexing
        image[::cell_size, :] = (0, 0, 0)
        image[:, ::cell_size] = (0, 0, 0)
        image[::cell_size, -1] = (0, 0, 0)
        image[-1, ::cell_size] = (0, 0, 0)

        return image
    
    def render(self):
        if self.render_mode == "human":
            frame = self._generate_frame(self.current_state)
            cv2.imshow('Hide & Seek', frame)
            cv2.waitKey(0)
            cv2.destroyAllWindows()
        elif self.render_mode == "rgb_array":
            print(self.current_state)
        
    def get_best_seeker_action(self):
        cache = self.cache['best_seeker_action']
        if self.use_cache:
            key = (self.seeker_pos[0], self.seeker_pos[1], self.hider_pos[0], self.hider_pos[1])
            if cache[key]!=-1:
                return cache[key]

        best_action = self._compute_best_seeker_action(self.walls, self.seeker_pos, self.hider_pos)
        if self.use_cache:
            cache[key] = best_action

        return best_action
    
    def _compute_best_seeker_action(self, walls, seeker_pos, hider_pos):
        grid = Grid(matrix=~walls)
        start = grid.node(seeker_pos[1], seeker_pos[0])
        end = grid.node(hider_pos[1], hider_pos[0])
        finder = DijkstraFinder()
        path, runs = finder.find_path(start, end, grid)
        if len(path) == 0: # no path found
            return STAY
        next_cell = path[-len(path)+1]
        best_move = (next_cell[1] - seeker_pos[0], next_cell[0] - seeker_pos[1])
        best_action = self._move_to_action(best_move)
        if best_action == STAY:
            print("Seeker best action is to stay ?!")
        return best_action

    def _move_to_action(self, move):
        if move == (-1, 0):
            action = UP
        elif move == (1, 0):
            action = DOWN
        elif move == (0, -1):
            action = LEFT
        elif move == (0, 1):
            action = RIGHT
        elif move == (0, 0):
            action = STAY
        return action

    def _move_seeker(self):
        # best_action = self.get_best_seeker_action()
        # prob = self.current_step / self.max_steps
        prob = 0.2
        # if self.current_step < 10:
        #     return
        if np.random.binomial(1, prob):
            action = self.get_best_seeker_action()
        else:
            valid_actions = self._get_valid_actions(self.seeker_pos)
            action = random.choice(valid_actions)
        self._move(self.seeker_pos, action)
        self.visible_cells = self.get_visible_cells()

        
env = Monitor(HideAndSeekEnv(
    observation_handler=MLPObservationHandler(grid_size=12),
    grid_size=12,
    vision_range=5,
    seq_len=1,
    render_mode="human",
    use_cache=True,
    max_steps=300,
    # start_movement_at=100_000,
    # start_opt_movement_at=10,
))
# env.render()
# env.step(UP)


In [None]:
class HideAndSeekEnv(gym.Env):
    def __init__(self, 
                 grid_size=12, 
                 vision_range=5, 
                 seq_len=5,
                 use_cache=True, 
                 max_steps=None,
                 render_mode='rgb_array',
                 fps=5,
                ):
        super(HideAndSeekEnv, self).__init__()

        assert vision_range > 0, "Vision range must be greater than 0"
        assert grid_size > 0, "Grid size must be greater than 0"
        assert max_steps is None or max_steps > 0, "Max steps must be greater than 0" 
        assert render_mode in ['rgb_array', 'human'], "Render mode must be 'rgb_array' or 'human'"
        assert seq_len > 0, "Sequence length must be greater than 0"
        
        if max_steps is None:
            max_steps = np.inf

        self.grid_size = grid_size
        self.max_steps = max_steps
        self.vision_range = vision_range
        self.seq_len = seq_len
        self.render_mode = render_mode
        self.use_cache = use_cache
        self.fps = fps

        self.seeker_pos = None
        self.hider_pos = None
        self.cache = {
            'best_seeker_action':-1*np.ones(
                (self.grid_size, self.grid_size, self.grid_size, self.grid_size), 
                dtype=int,
            ),
            'visible_cells':{},
        }
        self.upscale_factor = math.ceil(MIN_RES/self.grid_size)


        self.action_space = spaces.Discrete(4)  # Up, Down, Left, Right, Stay
        self.obs_dict = {
            "hidden":0,
            "visible":1,
            "wall":2,
            "seeker":3,
            "hider":4
        }

        self.observation_space = spaces.Box(
            low=0,
            high=len(self.obs_dict)-1,
            shape=(self.grid_size*self.grid_size, ),
            dtype=np.uint8
        )

        self.current_state = None
        self.current_step = 0
        self.prev_states = None
        self.walls = np.zeros((self.grid_size, self.grid_size), dtype=bool)
        self.visible_cells = np.zeros((self.grid_size, self.grid_size), dtype=bool)

        self.current_eps = 0
        self.mode = None
        self.train()
        check_env(self)
        self.current_eps = 0

        self.reset()

    def train(self):
        self.mode = 'train'
    
    def eval(self):
        self.mode = 'eval'
    
    def test(self):
        self.mode = 'test'

    def get_processed_state(self):
        return self.current_state.flatten()

    def update_current_state(self):
        self.current_state = np.full(
            shape=(self.grid_size, self.grid_size), 
            fill_value=self.obs_dict['hidden'], 
            dtype=np.uint8
        )
        self.current_state[self.walls] = self.obs_dict['wall']
        self.current_state[self.visible_cells] = self.obs_dict['visible']
        self.current_state[self.seeker_pos[0], self.seeker_pos[1]] = self.obs_dict['seeker']
        self.current_state[self.hider_pos[0], self.hider_pos[1]] = self.obs_dict['hider']
        
        # self.prev_states = np.append(
        #     self.prev_states, 
        #     np.expand_dims(self.current_state, axis=-1), 
        #     axis=-1
        # )

    def reset(self, seed: Optional[int] = None,):
        super().reset(seed=seed)

        self.current_step = 0
        self.prev_states = np.empty((self.grid_size, self.grid_size, 0), dtype=np.uint8)

        self.walls = self._generate_walls()
        self.seeker_pos = self.generate_seeker_pos()
        self.visible_cells = self.get_visible_cells()
        self.hider_pos = self.generate_hider_pos()

        for _ in range(self.seq_len):
            self.update_current_state()

        return self.get_processed_state(), self._get_info()

    def get_visible_cells(self):
        cache = self.cache['visible_cells']
        if self.use_cache:
            key = (self.seeker_pos[0], self.seeker_pos[1])
            if key in cache:
                return cache[key]

        visible_cells = compute_visible_cells(self.walls, self.seeker_pos, self.vision_range)
        if self.use_cache:
            cache[key] = visible_cells

        return visible_cells

    def _generate_walls(self):
        walls_mask = default_map()
        return walls_mask
        
    def generate_hider_pos(self):
        allowed_cells = ~(self.walls | self.visible_cells)
        allowed_cells[self.seeker_pos[0], self.seeker_pos[1]] = False
        return self.sample_from_allowed_cells(allowed_cells)

    def generate_seeker_pos(self):
        allowed_cells = ~self.walls
        return self.sample_from_allowed_cells(allowed_cells)
    
    def index_to_coords(self, index):
        return index // self.grid_size, index % self.grid_size
    
    def coords_to_index(self, coords):
        return coords[0] * self.grid_size + coords[1]
    
    def sample_from_allowed_cells(self, allowed_cells):
        prob = allowed_cells.astype(np.float32)
        prob /= prob.sum()
        flat_prob = prob.flatten()
        sample_index = np.random.choice(flat_prob.size, p=flat_prob)
        x, y = self.index_to_coords(sample_index)
        return np.array([x, y], dtype=np.uint8)
    
    def _get_valid_actions(self, position):
        valid_actions = []
        if position[0] > 0 and not self.walls[position[0] - 1, position[1]]:
            valid_actions.append(UP)
        if position[0] < self.grid_size - 1 and not self.walls[position[0] + 1, position[1]]:
            valid_actions.append(DOWN)
        if position[1] > 0 and not self.walls[position[0], position[1] - 1]:
            valid_actions.append(LEFT)
        if position[1] < self.grid_size - 1 and not self.walls[position[0], position[1] + 1]:
            valid_actions.append(RIGHT)
        # valid_actions.append(STAY)
        return valid_actions
    
    def _move(self, position, action):
        if action == UP:  # Up
            position[0] -= 1
        elif action == DOWN:  # Down
            position[0] += 1
        elif action == LEFT:  # Left
            position[1] -= 1
        elif action == RIGHT:  # Right
            position[1] += 1
        elif action == STAY:  # Stay
            print("Agent action: STAY ?!")
            pass
        return position
    
    def _get_min_distance_from_visible_cells(self, position):
        distances = np.linalg.norm(np.indices((self.grid_size, self.grid_size)) - position[:, np.newaxis, np.newaxis], axis=0)
        distances[~self.visible_cells] = np.inf
        return distances.min()
    
    def _get_info(self):
        return {}
    
    def move_player(self, action):
        assert self.action_space.contains(action), f"{action} is an invalid action"
        assert self.mode in ['test'], "Call move_player only in test mode"

        self._move(self.seeker_pos, action)
        self.visible_cells = self.get_visible_cells()
    
    def step(self, action, verbose=False):
        assert self.action_space.contains(action), f"{action} is an invalid action"
        assert self.current_state is not None, "Call reset before using step method."

        self.current_step += 1

        reward = 0
        reward_log = {}

        if self.mode == 'train':
            # seeker makes a move based on the old state
            self._move_seeker()

        valid_actions = self._get_valid_actions(self.hider_pos)
        if action in valid_actions:
            self._move(self.hider_pos, action)
        else:
            if verbose:
                print("Hider hit the wall")
            reward += HITTING_WALL_REWARD
            reward_log['hitting_wall'] = HITTING_WALL_REWARD

        min_distance = self._get_min_distance_from_visible_cells(self.hider_pos)
        distance_reward = int(min_distance) * DISTANCE_COEF_REWARD
        reward += distance_reward
        reward_log['distance'] = distance_reward

        terminated = False
        if self.visible_cells[self.hider_pos[0], self.hider_pos[1]]:
            if verbose:
                print("Hider was caught")
            reward += LOSE_REWARD
            reward_log['lose'] = LOSE_REWARD
            terminated = True

        truncated = self.current_step >= self.max_steps and not terminated
        self.update_current_state()

        if terminated or truncated:
            self.current_eps += 1

        return self.get_processed_state(), reward, terminated, truncated, self._get_info()

    def _generate_frame(self, matrix, cell_size=50):
        # Calculate image size based on grid dimensions and cell size
        image_size = (matrix.shape[1] * cell_size, matrix.shape[0] * cell_size)

        # Create a blank canvas with white background
        image = np.ones((matrix.shape[1], matrix.shape[0], 3), dtype=np.uint8) * 255

        # Fill each cell with the corresponding color using NumPy indexing
        image_rows = np.arange(matrix.shape[0]) * cell_size
        image_cols = np.arange(matrix.shape[1]) * cell_size

        # print(image[0, :60])
        image = colors[matrix]
        # repeat each row cell_size times
        image = np.repeat(image, cell_size, axis=0)
        image = np.repeat(image, cell_size, axis=1)

        # Draw black lines as separators between cells using NumPy indexing
        image[::cell_size, :] = (0, 0, 0)
        image[:, ::cell_size] = (0, 0, 0)
        image[::cell_size, -1] = (0, 0, 0)
        image[-1, ::cell_size] = (0, 0, 0)

        return image
    
    def render(self):
        if self.render_mode == "human":
            frame = self._generate_frame(self.current_state)
            cv2.imshow('Hide & Seek', frame)
            cv2.waitKey(0)
            cv2.destroyAllWindows()
        elif self.render_mode == "rgb_array":
            print(self.current_state)
        
    def get_best_seeker_action(self):
        cache = self.cache['best_seeker_action']
        if self.use_cache:
            key = (self.seeker_pos[0], self.seeker_pos[1], self.hider_pos[0], self.hider_pos[1])
            if cache[key]!=-1:
                return cache[key]

        best_action = self._compute_best_seeker_action(self.walls, self.seeker_pos, self.hider_pos)
        if self.use_cache:
            cache[key] = best_action

        return best_action
    
    def _compute_best_seeker_action(self, walls, seeker_pos, hider_pos):
        grid = Grid(matrix=~walls)
        start = grid.node(seeker_pos[1], seeker_pos[0])
        end = grid.node(hider_pos[1], hider_pos[0])
        finder = DijkstraFinder()
        path, runs = finder.find_path(start, end, grid)
        if len(path) == 0: # no path found
            return STAY
        next_cell = path[-len(path)+1]
        best_move = (next_cell[1] - seeker_pos[0], next_cell[0] - seeker_pos[1])
        best_action = self._move_to_action(best_move)
        if best_action == STAY:
            print("Seeker best action is to stay ?!")
        return best_action

    def _move_to_action(self, move):
        if move == (-1, 0):
            action = UP
        elif move == (1, 0):
            action = DOWN
        elif move == (0, -1):
            action = LEFT
        elif move == (0, 1):
            action = RIGHT
        elif move == (0, 0):
            action = STAY
        return action

    def _move_seeker(self):
        # best_action = self.get_best_seeker_action()
        # prob = self.current_step / self.max_steps
        prob = 0.2
        # if self.current_step < 10:
        #     return
        if np.random.binomial(1, prob):
            action = self.get_best_seeker_action()
        else:
            valid_actions = self._get_valid_actions(self.seeker_pos)
            action = random.choice(valid_actions)
        self._move(self.seeker_pos, action)
        self.visible_cells = self.get_visible_cells()

        
env = Monitor(HideAndSeekEnv(
    grid_size=12,
    vision_range=5,
    seq_len=1,
    render_mode="human",
    use_cache=True,
    max_steps=300,
    # start_movement_at=100_000,
    # start_opt_movement_at=10,
))
# env.render()
# env.step(UP)

In [2]:
from stable_baselines3 import DQN

# env = gym.make("CartPole-v1", render_mode="human")
lr = 1e-4
exploration = 0.1
log_dir = "logs"
model = DQN("MlpPolicy", env, 
            verbose=0,
            learning_rate=lr,
            exploration_final_eps=exploration,
            tensorboard_log=log_dir,
            )
model.learn(
    total_timesteps=500_000, 
    progress_bar=True
)


Output()

KeyboardInterrupt: 

In [383]:
model.save("models/mlp_20_100_1_prob=0.2")

In [None]:
nb_episodes = 1000
mean_reward, std_reward = evaluate_policy(
        model,
        env,
        n_eval_episodes=nb_episodes
    )

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")


In [5]:
model.save("prob_agent")

for a given map, we have walls, visible cells, paths
we have a class map, that loads the map from a json file and provide all those attr

In [263]:
vec_env = model.get_env()
obs = vec_env.reset()
# for i in range(1000):
#     action, _state = model.predict(obs, deterministic=True)
#     obs, reward, done, info = vec_env.step(action)
#     vec_env.render("human")


In [382]:
# vec_env = model.get_env()
# obs = vec_env.reset()
vec_env.render("human")
action, _state = model.predict(obs, deterministic=True)
obs, reward, done, info = vec_env.step(action)
print(action)

[0]


In [None]:
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3
STAY = 4

In [None]:
vec_env.render("human")

In [None]:
env.render()
cv2.waitKey(0)
cv2.destroyAllWindows()


In [None]:
env.step(UP)

Hider was caught


(array([[3, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0],
        [1, 1, 2, 0, 0, 1, 0, 2, 0, 0, 0, 0],
        [1, 1, 2, 2, 2, 0, 0, 2, 2, 2, 0, 0],
        [1, 1, 2, 2, 2, 0, 0, 2, 2, 2, 0, 0],
        [1, 1, 2, 2, 2, 0, 0, 2, 2, 0, 0, 0],
        [1, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 2, 2, 0, 0, 2, 2, 2, 2, 0],
        [0, 0, 2, 2, 2, 0, 0, 2, 2, 2, 0, 0],
        [0, 0, 2, 2, 2, 0, 0, 2, 2, 2, 0, 0],
        [0, 0, 0, 0, 2, 0, 0, 0, 2, 2, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], dtype=uint8),
 -100,
 True,
 {})

In [None]:

import cv2
import numpy as np

# Define the colors using RGB values
colors = np.array([(255, 255, 255),  # Color 0
                   (255, 0, 0),      # Color 1
                   (0, 255, 0),      # Color 2
                   (0, 0, 255),      # Color 3
                   (255, 255, 0)],   # Color 4
                  dtype=np.uint8)

# Create a matrix representing the grid
matrix = np.random.randint(low=0, high=5, size=(12, 12), dtype=np.uint8)  # Random color indices for demonstration

# Define cell size and line thickness for display
cell_size = 50

# Calculate image size based on grid dimensions and cell size
image_size = (matrix.shape[1] * cell_size, matrix.shape[0] * cell_size)

# Create a blank canvas with white background
image = np.ones((matrix.shape[1], matrix.shape[0], 3), dtype=np.uint8) * 255

# Fill each cell with the corresponding color using NumPy indexing
image_rows = np.arange(matrix.shape[0]) * cell_size
image_cols = np.arange(matrix.shape[1]) * cell_size

# print(image[0, :60])
image = colors[matrix]
# repeat each row cell_size times
image = np.repeat(image, cell_size, axis=0)
image = np.repeat(image, cell_size, axis=1)

# image[image_rows[:, np.newaxis], image_cols] = colors[matrix]
print(image.shape)
# print(image[0, :60])
# print(colors[matrix].shape)

# compare old and new image

# Draw black lines as separators between cells using NumPy indexing
image[::cell_size, :] = (0, 0, 0)
image[:, ::cell_size] = (0, 0, 0)
image[::cell_size, -1] = (0, 0, 0)
image[-1, ::cell_size] = (0, 0, 0)



# print(image)
# Display the imageq
cv2.imshow('Grid Image', image)
cv2.waitKey(0)
cv2.destroyAllWindows()


(600, 600, 3)


In [None]:
matrix = np.zeros((12, 12))

# Define the coordinates of point "a"
a = (2, 5)  # Row, Column

# Compute the distance between point "a" and each cell in the matrix
distances = np.linalg.norm(np.indices(matrix.shape) - np.array(a)[:, np.newaxis, np.newaxis], axis=0)

print("Distances from point 'a' to each cell:")
print(distances)


Distances from point 'a' to each cell:
[[ 5.38516481  4.47213595  3.60555128  2.82842712  2.23606798  2.
   2.23606798  2.82842712  3.60555128  4.47213595  5.38516481  6.32455532]
 [ 5.09901951  4.12310563  3.16227766  2.23606798  1.41421356  1.
   1.41421356  2.23606798  3.16227766  4.12310563  5.09901951  6.08276253]
 [ 5.          4.          3.          2.          1.          0.
   1.          2.          3.          4.          5.          6.        ]
 [ 5.09901951  4.12310563  3.16227766  2.23606798  1.41421356  1.
   1.41421356  2.23606798  3.16227766  4.12310563  5.09901951  6.08276253]
 [ 5.38516481  4.47213595  3.60555128  2.82842712  2.23606798  2.
   2.23606798  2.82842712  3.60555128  4.47213595  5.38516481  6.32455532]
 [ 5.83095189  5.          4.24264069  3.60555128  3.16227766  3.
   3.16227766  3.60555128  4.24264069  5.          5.83095189  6.70820393]
 [ 6.40312424  5.65685425  5.          4.47213595  4.12310563  4.
   4.12310563  4.47213595  5.          5.65685425

In [None]:
# ENVIRONMENTS OPENAI GYM
import math
from typing import Optional, Union

import numpy as np

import gym
from gym import logger, spaces
from gym.envs.classic_control import utils
from gym.error import DependencyNotInstalled


class CartPoleEnv(gym.Env[np.ndarray, Union[int, np.ndarray]]):
    """
    ### Description

    This environment corresponds to the version of the cart-pole problem described by Barto, Sutton, and Anderson in
    ["Neuronlike Adaptive Elements That Can Solve Difficult Learning Control Problem"](https://ieeexplore.ieee.org/document/6313077).
    A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track.
    The pendulum is placed upright on the cart and the goal is to balance the pole by applying forces
     in the left and right direction on the cart.

    ### Action Space

    The action is a `ndarray` with shape `(1,)` which can take values `{0, 1}` indicating the direction
     of the fixed force the cart is pushed with.

    | Num | Action                 |
    |-----|------------------------|
    | 0   | Push cart to the left  |
    | 1   | Push cart to the right |

    **Note**: The velocity that is reduced or increased by the applied force is not fixed and it depends on the angle
     the pole is pointing. The center of gravity of the pole varies the amount of energy needed to move the cart underneath it

    ### Observation Space

    The observation is a `ndarray` with shape `(4,)` with the values corresponding to the following positions and velocities:

    | Num | Observation           | Min                 | Max               |
    |-----|-----------------------|---------------------|-------------------|
    | 0   | Cart Position         | -4.8                | 4.8               |
    | 1   | Cart Velocity         | -Inf                | Inf               |
    | 2   | Pole Angle            | ~ -0.418 rad (-24°) | ~ 0.418 rad (24°) |
    | 3   | Pole Angular Velocity | -Inf                | Inf               |

    **Note:** While the ranges above denote the possible values for observation space of each element,
        it is not reflective of the allowed values of the state space in an unterminated episode. Particularly:
    -  The cart x-position (index 0) can be take values between `(-4.8, 4.8)`, but the episode terminates
       if the cart leaves the `(-2.4, 2.4)` range.
    -  The pole angle can be observed between  `(-.418, .418)` radians (or **±24°**), but the episode terminates
       if the pole angle is not in the range `(-.2095, .2095)` (or **±12°**)

    ### Rewards

    Since the goal is to keep the pole upright for as long as possible, a reward of `+1` for every step taken,
    including the termination step, is allotted. The threshold for rewards is 475 for v1.

    ### Starting State

    All observations are assigned a uniformly random value in `(-0.05, 0.05)`

    ### Episode End

    The episode ends if any one of the following occurs:

    1. Termination: Pole Angle is greater than ±12°
    2. Termination: Cart Position is greater than ±2.4 (center of the cart reaches the edge of the display)
    3. Truncation: Episode length is greater than 500 (200 for v0)

    ### Arguments

    ```
    gym.make('CartPole-v1')
    ```

    No additional arguments are currently supported.
    """

    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 50,
    }

    def __init__(self, render_mode: Optional[str] = None):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = self.masspole + self.masscart
        self.length = 0.5  # actually half the pole's length
        self.polemass_length = self.masspole * self.length
        self.force_mag = 10.0
        self.tau = 0.02  # seconds between state updates
        self.kinematics_integrator = "euler"

        # Angle at which to fail the episode
        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4

        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds.
        high = np.array(
            [
                self.x_threshold * 2,
                np.finfo(np.float32).max,
                self.theta_threshold_radians * 2,
                np.finfo(np.float32).max,
            ],
            dtype=np.float32,
        )

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

        self.render_mode = render_mode

        self.screen_width = 600
        self.screen_height = 400
        self.screen = None
        self.clock = None
        self.isopen = True
        self.state = None

        self.steps_beyond_terminated = None

    def step(self, action):
        err_msg = f"{action!r} ({type(action)}) invalid"
        assert self.action_space.contains(action), err_msg
        assert self.state is not None, "Call reset before using step method."
        x, x_dot, theta, theta_dot = self.state
        force = self.force_mag if action == 1 else -self.force_mag
        costheta = math.cos(theta)
        sintheta = math.sin(theta)

        # For the interested reader:
        # https://coneural.org/florian/papers/05_cart_pole.pdf
        temp = (
            force + self.polemass_length * theta_dot**2 * sintheta
        ) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / (
            self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass)
        )
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        if self.kinematics_integrator == "euler":
            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else:  # semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot

        self.state = (x, x_dot, theta, theta_dot)

        terminated = bool(
            x < -self.x_threshold
            or x > self.x_threshold
            or theta < -self.theta_threshold_radians
            or theta > self.theta_threshold_radians
        )

        if not terminated:
            reward = 1.0
        elif self.steps_beyond_terminated is None:
            # Pole just fell!
            self.steps_beyond_terminated = 0
            reward = 1.0
        else:
            if self.steps_beyond_terminated == 0:
                logger.warn(
                    "You are calling 'step()' even though this "
                    "environment has already returned terminated = True. You "
                    "should always call 'reset()' once you receive 'terminated = "
                    "True' -- any further steps are undefined behavior."
                )
            self.steps_beyond_terminated += 1
            reward = 0.0

        if self.render_mode == "human":
            self.render()
        return np.array(self.state, dtype=np.float32), reward, terminated, False, {}

    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ):
        super().reset(seed=seed)
        # Note that if you use custom reset bounds, it may lead to out-of-bound
        # state/observations.
        low, high = utils.maybe_parse_reset_bounds(
            options, -0.05, 0.05  # default low
        )  # default high
        self.state = self.np_random.uniform(low=low, high=high, size=(4,))
        self.steps_beyond_terminated = None

        if self.render_mode == "human":
            self.render()
        return np.array(self.state, dtype=np.float32), {}

    def render(self):
        if self.render_mode is None:
            gym.logger.warn(
                "You are calling render method without specifying any render mode. "
                "You can specify the render_mode at initialization, "
                f'e.g. gym("{self.spec.id}", render_mode="rgb_array")'
            )
            return

        try:
            import pygame
            from pygame import gfxdraw
        except ImportError:
            raise DependencyNotInstalled(
                "pygame is not installed, run `pip install gym[classic_control]`"
            )

        if self.screen is None:
            pygame.init()
            if self.render_mode == "human":
                pygame.display.init()
                self.screen = pygame.display.set_mode(
                    (self.screen_width, self.screen_height)
                )
            else:  # mode == "rgb_array"
                self.screen = pygame.Surface((self.screen_width, self.screen_height))
        if self.clock is None:
            self.clock = pygame.time.Clock()

        world_width = self.x_threshold * 2
        scale = self.screen_width / world_width
        polewidth = 10.0
        polelen = scale * (2 * self.length)
        cartwidth = 50.0
        cartheight = 30.0

        if self.state is None:
            return None

        x = self.state

        self.surf = pygame.Surface((self.screen_width, self.screen_height))
        self.surf.fill((255, 255, 255))

        l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
        axleoffset = cartheight / 4.0
        cartx = x[0] * scale + self.screen_width / 2.0  # MIDDLE OF CART
        carty = 100  # TOP OF CART
        cart_coords = [(l, b), (l, t), (r, t), (r, b)]
        cart_coords = [(c[0] + cartx, c[1] + carty) for c in cart_coords]
        gfxdraw.aapolygon(self.surf, cart_coords, (0, 0, 0))
        gfxdraw.filled_polygon(self.surf, cart_coords, (0, 0, 0))

        l, r, t, b = (
            -polewidth / 2,
            polewidth / 2,
            polelen - polewidth / 2,
            -polewidth / 2,
        )

        pole_coords = []
        for coord in [(l, b), (l, t), (r, t), (r, b)]:
            coord = pygame.math.Vector2(coord).rotate_rad(-x[2])
            coord = (coord[0] + cartx, coord[1] + carty + axleoffset)
            pole_coords.append(coord)
        gfxdraw.aapolygon(self.surf, pole_coords, (202, 152, 101))
        gfxdraw.filled_polygon(self.surf, pole_coords, (202, 152, 101))

        gfxdraw.aacircle(
            self.surf,
            int(cartx),
            int(carty + axleoffset),
            int(polewidth / 2),
            (129, 132, 203),
        )
        gfxdraw.filled_circle(
            self.surf,
            int(cartx),
            int(carty + axleoffset),
            int(polewidth / 2),
            (129, 132, 203),
        )

        gfxdraw.hline(self.surf, 0, self.screen_width, carty, (0, 0, 0))

        self.surf = pygame.transform.flip(self.surf, False, True)
        self.screen.blit(self.surf, (0, 0))
        if self.render_mode == "human":
            pygame.event.pump()
            self.clock.tick(self.metadata["render_fps"])
            pygame.display.flip()

        elif self.render_mode == "rgb_array":
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
            )

    def close(self):
        if self.screen is not None:
            import pygame

            pygame.display.quit()
            pygame.quit()
            self.isopen = False

import math
from typing import Optional

import numpy as np

import gym
from gym import spaces
from gym.envs.classic_control import utils
from gym.error import DependencyNotInstalled


class Continuous_MountainCarEnv(gym.Env):
    """
    ### Description

    The Mountain Car MDP is a deterministic MDP that consists of a car placed stochastically
    at the bottom of a sinusoidal valley, with the only possible actions being the accelerations
    that can be applied to the car in either direction. The goal of the MDP is to strategically
    accelerate the car to reach the goal state on top of the right hill. There are two versions
    of the mountain car domain in gym: one with discrete actions and one with continuous.
    This version is the one with continuous actions.

    This MDP first appeared in [Andrew Moore's PhD Thesis (1990)](https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-209.pdf)

    ```
    @TECHREPORT{Moore90efficientmemory-based,
        author = {Andrew William Moore},
        title = {Efficient Memory-based Learning for Robot Control},
        institution = {University of Cambridge},
        year = {1990}
    }
    ```

    ### Observation Space

    The observation is a `ndarray` with shape `(2,)` where the elements correspond to the following:

    | Num | Observation                          | Min  | Max | Unit         |
    |-----|--------------------------------------|------|-----|--------------|
    | 0   | position of the car along the x-axis | -Inf | Inf | position (m) |
    | 1   | velocity of the car                  | -Inf | Inf | position (m) |

    ### Action Space

    The action is a `ndarray` with shape `(1,)`, representing the directional force applied on the car.
    The action is clipped in the range `[-1,1]` and multiplied by a power of 0.0015.

    ### Transition Dynamics:

    Given an action, the mountain car follows the following transition dynamics:

    *velocity<sub>t+1</sub> = velocity<sub>t+1</sub> + force * self.power - 0.0025 * cos(3 * position<sub>t</sub>)*

    *position<sub>t+1</sub> = position<sub>t</sub> + velocity<sub>t+1</sub>*

    where force is the action clipped to the range `[-1,1]` and power is a constant 0.0015.
    The collisions at either end are inelastic with the velocity set to 0 upon collision with the wall.
    The position is clipped to the range [-1.2, 0.6] and velocity is clipped to the range [-0.07, 0.07].

    ### Reward

    A negative reward of *-0.1 * action<sup>2</sup>* is received at each timestep to penalise for
    taking actions of large magnitude. If the mountain car reaches the goal then a positive reward of +100
    is added to the negative reward for that timestep.

    ### Starting State

    The position of the car is assigned a uniform random value in `[-0.6 , -0.4]`.
    The starting velocity of the car is always assigned to 0.

    ### Episode End

    The episode ends if either of the following happens:
    1. Termination: The position of the car is greater than or equal to 0.45 (the goal position on top of the right hill)
    2. Truncation: The length of the episode is 999.

    ### Arguments

    ```
    gym.make('MountainCarContinuous-v0')
    ```

    ### Version History

    * v0: Initial versions release (1.0.0)
    """

    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 30,
    }

    def __init__(self, render_mode: Optional[str] = None, goal_velocity=0):
        self.min_action = -1.0
        self.max_action = 1.0
        self.min_position = -1.2
        self.max_position = 0.6
        self.max_speed = 0.07
        self.goal_position = (
            0.45  # was 0.5 in gym, 0.45 in Arnaud de Broissia's version
        )
        self.goal_velocity = goal_velocity
        self.power = 0.0015

        self.low_state = np.array(
            [self.min_position, -self.max_speed], dtype=np.float32
        )
        self.high_state = np.array(
            [self.max_position, self.max_speed], dtype=np.float32
        )

        self.render_mode = render_mode

        self.screen_width = 600
        self.screen_height = 400
        self.screen = None
        self.clock = None
        self.isopen = True

        self.action_space = spaces.Box(
            low=self.min_action, high=self.max_action, shape=(1,), dtype=np.float32
        )
        self.observation_space = spaces.Box(
            low=self.low_state, high=self.high_state, dtype=np.float32
        )

    def step(self, action: np.ndarray):

        position = self.state[0]
        velocity = self.state[1]
        force = min(max(action[0], self.min_action), self.max_action)

        velocity += force * self.power - 0.0025 * math.cos(3 * position)
        if velocity > self.max_speed:
            velocity = self.max_speed
        if velocity < -self.max_speed:
            velocity = -self.max_speed
        position += velocity
        if position > self.max_position:
            position = self.max_position
        if position < self.min_position:
            position = self.min_position
        if position == self.min_position and velocity < 0:
            velocity = 0

        # Convert a possible numpy bool to a Python bool.
        terminated = bool(
            position >= self.goal_position and velocity >= self.goal_velocity
        )

        reward = 0
        if terminated:
            reward = 100.0
        reward -= math.pow(action[0], 2) * 0.1

        self.state = np.array([position, velocity], dtype=np.float32)

        if self.render_mode == "human":
            self.render()
        return self.state, reward, terminated, False, {}

    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        # Note that if you use custom reset bounds, it may lead to out-of-bound
        # state/observations.
        low, high = utils.maybe_parse_reset_bounds(options, -0.6, -0.4)
        self.state = np.array([self.np_random.uniform(low=low, high=high), 0])

        if self.render_mode == "human":
            self.render()
        return np.array(self.state, dtype=np.float32), {}

    def _height(self, xs):
        return np.sin(3 * xs) * 0.45 + 0.55

    def render(self):
        if self.render_mode is None:
            gym.logger.warn(
                "You are calling render method without specifying any render mode. "
                "You can specify the render_mode at initialization, "
                f'e.g. gym("{self.spec.id}", render_mode="rgb_array")'
            )
            return

        try:
            import pygame
            from pygame import gfxdraw
        except ImportError:
            raise DependencyNotInstalled(
                "pygame is not installed, run `pip install gym[classic_control]`"
            )

        if self.screen is None:
            pygame.init()
            if self.render_mode == "human":
                pygame.display.init()
                self.screen = pygame.display.set_mode(
                    (self.screen_width, self.screen_height)
                )
            else:  # mode == "rgb_array":
                self.screen = pygame.Surface((self.screen_width, self.screen_height))
        if self.clock is None:
            self.clock = pygame.time.Clock()

        world_width = self.max_position - self.min_position
        scale = self.screen_width / world_width
        carwidth = 40
        carheight = 20

        self.surf = pygame.Surface((self.screen_width, self.screen_height))
        self.surf.fill((255, 255, 255))

        pos = self.state[0]

        xs = np.linspace(self.min_position, self.max_position, 100)
        ys = self._height(xs)
        xys = list(zip((xs - self.min_position) * scale, ys * scale))

        pygame.draw.aalines(self.surf, points=xys, closed=False, color=(0, 0, 0))

        clearance = 10

        l, r, t, b = -carwidth / 2, carwidth / 2, carheight, 0
        coords = []
        for c in [(l, b), (l, t), (r, t), (r, b)]:
            c = pygame.math.Vector2(c).rotate_rad(math.cos(3 * pos))
            coords.append(
                (
                    c[0] + (pos - self.min_position) * scale,
                    c[1] + clearance + self._height(pos) * scale,
                )
            )

        gfxdraw.aapolygon(self.surf, coords, (0, 0, 0))
        gfxdraw.filled_polygon(self.surf, coords, (0, 0, 0))

        for c in [(carwidth / 4, 0), (-carwidth / 4, 0)]:
            c = pygame.math.Vector2(c).rotate_rad(math.cos(3 * pos))
            wheel = (
                int(c[0] + (pos - self.min_position) * scale),
                int(c[1] + clearance + self._height(pos) * scale),
            )

            gfxdraw.aacircle(
                self.surf, wheel[0], wheel[1], int(carheight / 2.5), (128, 128, 128)
            )
            gfxdraw.filled_circle(
                self.surf, wheel[0], wheel[1], int(carheight / 2.5), (128, 128, 128)
            )

        flagx = int((self.goal_position - self.min_position) * scale)
        flagy1 = int(self._height(self.goal_position) * scale)
        flagy2 = flagy1 + 50
        gfxdraw.vline(self.surf, flagx, flagy1, flagy2, (0, 0, 0))

        gfxdraw.aapolygon(
            self.surf,
            [(flagx, flagy2), (flagx, flagy2 - 10), (flagx + 25, flagy2 - 5)],
            (204, 204, 0),
        )
        gfxdraw.filled_polygon(
            self.surf,
            [(flagx, flagy2), (flagx, flagy2 - 10), (flagx + 25, flagy2 - 5)],
            (204, 204, 0),
        )

        self.surf = pygame.transform.flip(self.surf, False, True)
        self.screen.blit(self.surf, (0, 0))
        if self.render_mode == "human":
            pygame.event.pump()
            self.clock.tick(self.metadata["render_fps"])
            pygame.display.flip()

        elif self.render_mode == "rgb_array":
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
            )

    def close(self):
        if self.screen is not None:
            import pygame

            pygame.display.quit()
            pygame.quit()
            self.isopen = False


In [None]:
from typing import Optional, Union
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from vision import compute_visible_cells
from helpers import default_map
import cv2
import time
import math
from stable_baselines3.common.env_checker import check_env
from pathfinding.core.diagonal_movement import DiagonalMovement
from pathfinding.core.grid import Grid
from pathfinding.finder.a_star import AStarFinder
from pathfinding.finder.dijkstra import DijkstraFinder
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
import random

GREEN = (0, 255, 0)
RED = (0, 0, 255)
BLUE = (255, 0, 0)
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
YELLOW = (0, 255, 255)

colors = np.array([WHITE,  # hidden cells
                   YELLOW, # visible cells
                   BLACK,  # walls
                   RED,  # seeker
                   GREEN],   # hider
                  dtype=np.uint8)

UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3
STAY = 4

HITTING_WALL_REWARD = -1
LOSE_REWARD = 0
DISTANCE_COEF_REWARD = 5

MIN_RES = 36

class HideAndSeekEnv(gym.Env):
    def __init__(self, 
                 grid_size=12, 
                 vision_range=5, 
                 seq_len=5,
                 use_cache=True, 
                 max_steps=None,
                 render_mode='rgb_array',
                 fps=5,
                ):
        super(HideAndSeekEnv, self).__init__()

        assert vision_range > 0, "Vision range must be greater than 0"
        assert grid_size > 0, "Grid size must be greater than 0"
        assert max_steps is None or max_steps > 0, "Max steps must be greater than 0" 
        assert render_mode in ['rgb_array', 'human'], "Render mode must be 'rgb_array' or 'human'"
        assert seq_len > 0, "Sequence length must be greater than 0"
        
        if max_steps is None:
            max_steps = np.inf

        self.grid_size = grid_size
        self.max_steps = max_steps
        self.vision_range = vision_range
        self.seq_len = seq_len
        self.render_mode = render_mode
        self.use_cache = use_cache
        self.fps = fps

        self.seeker_pos = None
        self.hider_pos = None
        self.cache = {
            'best_seeker_action':-1*np.ones(
                (self.grid_size, self.grid_size, self.grid_size, self.grid_size), 
                dtype=int,
            ),
            'visible_cells':{},
        }
        self.upscale_factor = math.ceil(MIN_RES/self.grid_size)


        self.action_space = spaces.Discrete(4)  # Up, Down, Left, Right, Stay
        self.obs_dict = {
            "hidden":0,
            "visible":1,
            "wall":2,
            "seeker":3,
            "hider":4
        }

        self.observation_space = spaces.Box(
            low=0,
            high=255,
            shape=(self.grid_size*self.upscale_factor, self.grid_size*self.upscale_factor, self.seq_len),
            dtype=np.uint8
        )

        self.current_state = None
        self.current_step = 0
        self.prev_states = None
        self.walls = np.zeros((self.grid_size, self.grid_size), dtype=bool)
        self.visible_cells = np.zeros((self.grid_size, self.grid_size), dtype=bool)

        self.mode = None
        self.train()
        # check_env(self)

        self.reset()

    def train(self):
        self.mode = 'train'
    
    def eval(self):
        self.mode = 'eval'
    
    def test(self):
        self.mode = 'test'

    def get_processed_state(self):
        processed_state = self.prev_states[:, :, -self.seq_len:]
        processed_state = np.repeat(processed_state, self.upscale_factor, axis=0)
        processed_state = np.repeat(processed_state, self.upscale_factor, axis=1)
        return processed_state

    def update_current_state(self):
        self.current_state = np.full(
            shape=(self.grid_size, self.grid_size), 
            fill_value=self.obs_dict['hidden'], 
            dtype=np.uint8
        )
        self.current_state[self.walls] = self.obs_dict['wall']
        self.current_state[self.visible_cells] = self.obs_dict['visible']
        self.current_state[self.seeker_pos[0], self.seeker_pos[1]] = self.obs_dict['seeker']
        self.current_state[self.hider_pos[0], self.hider_pos[1]] = self.obs_dict['hider']
        
        self.prev_states = np.append(
            self.prev_states, 
            np.expand_dims(self.current_state, axis=-1), 
            axis=-1
        )

    def reset(self, seed: Optional[int] = None,):
        super().reset(seed=seed)

        self.current_step = 0
        self.prev_states = np.empty((self.grid_size, self.grid_size, 0), dtype=np.uint8)

        self.walls = self._generate_walls()
        self.seeker_pos = self.generate_seeker_pos()
        self.visible_cells = self.get_visible_cells()
        self.hider_pos = self.generate_hider_pos()

        for _ in range(self.seq_len):
            self.update_current_state()

        return self.get_processed_state(), self._get_info()

    def get_visible_cells(self):
        cache = self.cache['visible_cells']
        if self.use_cache:
            key = (self.seeker_pos[0], self.seeker_pos[1])
            if key in cache:
                return cache[key]

        visible_cells = compute_visible_cells(self.walls, self.seeker_pos, self.vision_range)
        if self.use_cache:
            cache[key] = visible_cells

        return visible_cells

    def _generate_walls(self):
        walls_mask = default_map()
        return walls_mask
        
    def generate_hider_pos(self):
        allowed_cells = ~(self.walls | self.visible_cells)
        allowed_cells[self.seeker_pos[0], self.seeker_pos[1]] = False
        return self.sample_from_allowed_cells(allowed_cells)

    def generate_seeker_pos(self):
        allowed_cells = ~self.walls
        return self.sample_from_allowed_cells(allowed_cells)
    
    def index_to_coords(self, index):
        return index // self.grid_size, index % self.grid_size
    
    def coords_to_index(self, coords):
        return coords[0] * self.grid_size + coords[1]
    
    def sample_from_allowed_cells(self, allowed_cells):
        prob = allowed_cells.astype(np.float32)
        prob /= prob.sum()
        flat_prob = prob.flatten()
        sample_index = np.random.choice(flat_prob.size, p=flat_prob)
        x, y = self.index_to_coords(sample_index)
        return np.array([x, y], dtype=np.uint8)
    
    def _get_valid_actions(self, position):
        valid_actions = []
        if position[0] > 0 and not self.walls[position[0] - 1, position[1]]:
            valid_actions.append(UP)
        if position[0] < self.grid_size - 1 and not self.walls[position[0] + 1, position[1]]:
            valid_actions.append(DOWN)
        if position[1] > 0 and not self.walls[position[0], position[1] - 1]:
            valid_actions.append(LEFT)
        if position[1] < self.grid_size - 1 and not self.walls[position[0], position[1] + 1]:
            valid_actions.append(RIGHT)
        # valid_actions.append(STAY)
        return valid_actions
    
    def _move(self, position, action):
        if action == UP:  # Up
            position[0] -= 1
        elif action == DOWN:  # Down
            position[0] += 1
        elif action == LEFT:  # Left
            position[1] -= 1
        elif action == RIGHT:  # Right
            position[1] += 1
        elif action == STAY:  # Stay
            print("Agent action: STAY ?!")
            pass
        return position
    
    def _get_min_distance_from_visible_cells(self, position):
        distances = np.linalg.norm(np.indices((self.grid_size, self.grid_size)) - position[:, np.newaxis, np.newaxis], axis=0)
        distances[~self.visible_cells] = np.inf
        return distances.min()
    
    def _get_info(self):
        return {}
    
    def move_player(self, action):
        assert self.action_space.contains(action), f"{action} is an invalid action"
        assert self.mode in ['test'], "Call move_player only in test mode"

        self._move(self.seeker_pos, action)
        self.visible_cells = self.get_visible_cells()
    
    def step(self, action, verbose=False):
        assert self.action_space.contains(action), f"{action} is an invalid action"
        assert self.current_state is not None, "Call reset before using step method."

        self.current_step += 1

        reward = 0
        reward_log = {}

        if self.mode == 'train':
            # seeker makes a move based on the old state
            self._move_seeker()

        valid_actions = self._get_valid_actions(self.hider_pos)
        if action in valid_actions:
            self._move(self.hider_pos, action)
        else:
            if verbose:
                print("Hider hit the wall")
            reward += HITTING_WALL_REWARD
            reward_log['hitting_wall'] = HITTING_WALL_REWARD

        min_distance = self._get_min_distance_from_visible_cells(self.hider_pos)
        distance_reward = int(min_distance) * DISTANCE_COEF_REWARD
        reward += distance_reward
        reward_log['distance'] = distance_reward

        terminated = False
        if self.visible_cells[self.hider_pos[0], self.hider_pos[1]]:
            if verbose:
                print("Hider was caught")
            reward += LOSE_REWARD
            reward_log['lose'] = LOSE_REWARD
            terminated = True

        truncated = self.current_step >= self.max_steps and not terminated
        self.update_current_state()

        return self.get_processed_state(), reward, terminated, truncated, self._get_info()

    def _generate_frame(self, matrix, cell_size=50):
        # Calculate image size based on grid dimensions and cell size
        image_size = (matrix.shape[1] * cell_size, matrix.shape[0] * cell_size)

        # Create a blank canvas with white background
        image = np.ones((matrix.shape[1], matrix.shape[0], 3), dtype=np.uint8) * 255

        # Fill each cell with the corresponding color using NumPy indexing
        image_rows = np.arange(matrix.shape[0]) * cell_size
        image_cols = np.arange(matrix.shape[1]) * cell_size

        # print(image[0, :60])
        image = colors[matrix]
        # repeat each row cell_size times
        image = np.repeat(image, cell_size, axis=0)
        image = np.repeat(image, cell_size, axis=1)

        # Draw black lines as separators between cells using NumPy indexing
        image[::cell_size, :] = (0, 0, 0)
        image[:, ::cell_size] = (0, 0, 0)
        image[::cell_size, -1] = (0, 0, 0)
        image[-1, ::cell_size] = (0, 0, 0)

        return image
    
    def render(self):
        if self.render_mode == "human":
            frame = self._generate_frame(self.current_state)
            cv2.imshow('Hide & Seek', frame)
            cv2.waitKey(0)
            cv2.destroyAllWindows()
        elif self.render_mode == "rgb_array":
            print(self.current_state)
        
    def get_best_seeker_action(self):
        cache = self.cache['best_seeker_action']
        if self.use_cache:
            key = (self.seeker_pos[0], self.seeker_pos[1], self.hider_pos[0], self.hider_pos[1])
            if cache[key]!=-1:
                return cache[key]

        best_action = self._compute_best_seeker_action(self.walls, self.seeker_pos, self.hider_pos)
        if self.use_cache:
            cache[key] = best_action

        return best_action
    
    def _compute_best_seeker_action(self, walls, seeker_pos, hider_pos):
        grid = Grid(matrix=~walls)
        start = grid.node(seeker_pos[1], seeker_pos[0])
        end = grid.node(hider_pos[1], hider_pos[0])
        finder = DijkstraFinder()
        path, runs = finder.find_path(start, end, grid)
        if len(path) == 0: # no path found
            return STAY
        next_cell = path[-len(path)+1]
        best_move = (next_cell[1] - seeker_pos[0], next_cell[0] - seeker_pos[1])
        best_action = self._move_to_action(best_move)
        if best_action == STAY:
            print("Seeker best action is to stay ?!")
        return best_action

    def _move_to_action(self, move):
        if move == (-1, 0):
            action = UP
        elif move == (1, 0):
            action = DOWN
        elif move == (0, -1):
            action = LEFT
        elif move == (0, 1):
            action = RIGHT
        elif move == (0, 0):
            action = STAY
        return action

    def _move_seeker(self):
        # best_action = self.get_best_seeker_action()
        # prob = self.current_step / self.max_steps
        prob = 0
        return
        if self.current_step < 10:
            return
        if np.random.binomial(1, prob):
            action = self.get_best_seeker_action()
        else:
            valid_actions = self._get_valid_actions(self.seeker_pos)
            action = random.choice(valid_actions)
        self._move(self.seeker_pos, action)
        self.visible_cells = self.get_visible_cells()

        
env = Monitor(HideAndSeekEnv(
    grid_size=12,
    vision_range=5,
    seq_len=1,
    render_mode="human",
    use_cache=True,
    max_steps=300,
))
# env.render()
# env.step(UP)


In [None]:
from stable_baselines3 import DQN

# env = gym.make("CartPole-v1", render_mode="human")
lr = 1e-4
exploration = 0.1
log_dir = "logs"
model = DQN("CnnPolicy", env, 
            verbose=0,
            learning_rate=lr,
            exploration_final_eps=exploration,
            tensorboard_log=log_dir,
            )
model.learn(
    total_timesteps=500_000, 
    progress_bar=True
)


Output()

In [None]:
def bresenham(x0, y0, x1, y1):
    """Yield integer coordinates on the line from (x0, y0) to (x1, y1).

    Input coordinates should be integers.

    The result will contain both the start and the end point.
    """
    dx = x1 - x0
    dy = y1 - y0

    xsign = 1 if dx > 0 else -1
    ysign = 1 if dy > 0 else -1

    dx = abs(dx)
    dy = abs(dy)

    if dx > dy:
        xx, xy, yx, yy = xsign, 0, 0, ysign
    else:
        dx, dy = dy, dx
        xx, xy, yx, yy = 0, ysign, xsign, 0

    D = 2*dy - dx
    y = 0
    result = []
    for x in range(dx + 1):
        result.append((x0 + x * xx + y * yx, y0 + x * xy + y * yy))

        if D >= 0:
            y += 1
            D -= 2*dx
        D += 2*dy 
    return result

bresenham(0, 0, 2, 1)

[(0, 0), (1, 1), (2, 1)]

In [None]:
def get_visible_cells(self, walls, agent_position, distance):
        grid_size = walls.shape[0]
        visible_cells = np.zeros((grid_size, grid_size), dtype=bool)

        x, y = agent_position

        for i in range(grid_size):
            for j in range(grid_size):
                if walls[i, j] == 0:
                    dx = i - x
                    dy = j - y

                    if dx == 0 and dy == 0:
                        continue

                    magnitude = np.sqrt(dx**2 + dy**2)
                    step_x = dx / magnitude
                    step_y = dy / magnitude

                    cur_x = x
                    cur_y = y
                    reached_limit = False

                    for _ in range(int(distance)):
                        cur_x += step_x
                        cur_y += step_y

                        if not (0 <= int(cur_x) < grid_size and 0 <= int(cur_y) < grid_size):
                            reached_limit = True
                            break

                        if walls[int(cur_x), int(cur_y)] == 1:
                            break

                        visible_cells[int(cur_x), int(cur_y)] = True

                    if reached_limit:
                        break
            visible_cells[x, y] = False

        return visible_cells

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

class HideAndSeekEnv(gym.Env):
    def __init__(self, grid_size=12):
        super(HideAndSeekEnv, self).__init__()

        self.grid_size = grid_size
        self.num_steps = 20

        self.seeker_pos = None
        self.hider_pos = None

        self.action_space = spaces.Discrete(4)  # Up, Down, Left, Right
        self.observation_space = spaces.Tuple((
            spaces.Discrete(self.grid_size),
            spaces.Discrete(self.grid_size),
        ))

        self.reset()

    def reset(self):
        self.seeker_pos = None
        self.hider_pos = None

        self._generate_walls()

        while True:
            seeker_x = np.random.randint(1, self.grid_size - 1)
            seeker_y = np.random.randint(1, self.grid_size - 1)
            if self._is_valid_position(seeker_x, seeker_y):
                self.seeker_pos = (seeker_x, seeker_y)
                break

        while True:
            hider_x = np.random.randint(1, self.grid_size - 1)
            hider_y = np.random.randint(1, self.grid_size - 1)
            if self._is_valid_position(hider_x, hider_y):
                self.hider_pos = (hider_x, hider_y)
                break

        return self._get_observation()

    def step(self, action):
        assert action in range(self.action_space.n), "Invalid action."

        new_pos = self._move_agent(self.seeker_pos, action)
        if self._is_valid_position(*new_pos):
            self.seeker_pos = new_pos

        self.current_step += 1

        done = self.current_step >= self.num_steps
        reward = self._calculate_reward()

        return self._get_observation(), reward, done, {}

    def _generate_walls(self):
        self.walls = np.zeros((self.grid_size, self.grid_size))

        self.walls[0, :] = 1
        self.walls[-1, :] = 1
        self.walls[:, 0] = 1
        self.walls[:, -1] = 1

        num_random_walls = int(self.grid_size * self.grid_size * 0.1)
        random_wall_indices = np.random.choice(
            np.arange(1, self.grid_size - 1),
            size=(2, num_random_walls),
            replace=False
        )
        self.walls[random_wall_indices] = 1

    def _is_valid_position(self, x, y):
        if self.walls[x, y] == 1:
            return False
        return True

    def _get_observation(self):
        return self.seeker_pos

    def _move_agent(self, pos, action):
        new_pos = list(pos)
        if action == 0:  # Up
            new_pos[1] = min(pos[1] + 1, self.grid_size - 1)
        elif action == 1:  # Down
            new_pos[1] = max(pos[1] - 1, 0)
        elif action == 2:  # Left
            new_pos[0] = max(pos[0] - 1, 0)
        elif action == 3:  # Right
            new_pos[0] = min(pos[0] + 1, self.grid_size - 1)
        return tuple(new_pos)

    def _calculate_reward(self):
        if self.seeker_pos == self.hider_pos:
            return 1.0
        return 0.0


In [None]:
# from gymnasium.envs.registration import (
#     make,
#     spec,
#     register,
#     registry,
#     pprint_registry,
#     make_vec,
# )
# register(
#     id="CartPole-v0",
#     entry_point="gymnasium.envs.classic_control.cartpole:CartPoleEnv",
#     vector_entry_point="gymnasium.envs.classic_control.cartpole:CartPoleVectorEnv",
#     max_episode_steps=200,
#     reward_threshold=195.0,
# )
