In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pygame

class TwoAgentGridworldEnv(gym.Env):
    metadata = {'render_modes': ['human', 'print'], "render_fps": 4}
    
    def __init__(self, render_mode=None, grid_size=(10, 10)):
        super(TwoAgentGridworldEnv, self).__init__()
        self.size = grid_size[0]  # The size of the square grid
        self.window_size = 800  # The size of the PyGame window
        
        self.grid_size = grid_size
        self.action_space = spaces.MultiDiscrete([4, 4])  # 4 possible actions for each of the two agents
        self.observation_space = spaces.Tuple((
            spaces.Box(low=0, high=grid_size[0]-1, shape=(2,), dtype=np.int32),
            spaces.Box(low=0, high=grid_size[1]-1, shape=(2,), dtype=np.int32)
        ))
        
        
        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode
        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None
        # Reset the environment and start
        self.reset()

    def reset(self):
        self.agent_positions = [
            np.array([0, 0]),  # Agent 1 starts at top-left corner
            np.array([self.grid_size[0]-1, self.grid_size[1]-1])  # Agent 2 starts at bottom-right corner
        ]
        return self._get_obs()

    def _get_obs(self):
        return {'agent1': self.agent_positions[0], 'agent2': self.agent_positions[1]}

    def step(self, action):
        # Define the movements corresponding to each action
        movements = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # up, down, left, right
        rewards = 0
        
        # Update the positions of both agents
        for i, act in enumerate(action):
            movement = movements[act]
            new_position = self.agent_positions[i] + movement
            # Ensure the new position is within bounds
            new_position = np.clip(new_position, [0, 0], [self.grid_size[0]-1, self.grid_size[1]-1])
            self.agent_positions[i] = new_position
        
        # Placeholder for terminal state and rewards
        done = False
        reward = 0
        
        # If the agents meet at the same position, we can assign a reward or consider it a terminal state
        if np.array_equal(self.agent_positions[0], self.agent_positions[1]):
            done = True
            reward = 10  # Example reward for meeting at the same position
        
        obs = self._get_obs()
        return obs, reward, done, False, {}

    def render(self):
        if self.render_mode == 'print':
            grid = np.zeros(self.grid_size)
            grid[tuple(self.agent_positions[0])] = 1  # Mark the position of the first agent
            grid[tuple(self.agent_positions[1])] = 2  # Mark the position of the second agent
            print(grid)
        elif self.render_mode == 'human':
            if self.window is None: # Initialize pygame if it is not initialized
                pygame.init()
                pygame.display.init()
                self.window = pygame.display.set_mode(
                    (self.window_size, self.window_size)
                )
            if self.clock is None:
                self.clock = pygame.time.Clock()
            
            # Fill the canvas
            canvas = pygame.Surface((self.window_size, self.window_size))
            canvas.fill((255, 255, 255))
            pix_square_size = (
                self.window_size / self.size
            )  # The size of a single grid square in pixels


            # First we draw agent1 square
            pygame.draw.rect(
                canvas,
                (255, 0, 0),
                pygame.Rect(
                    pix_square_size * self.agent_positions[0],
                    (pix_square_size, pix_square_size),
                ),
            )
            # Now we draw the agent2 circle
            pygame.draw.circle(
                canvas,
                (0, 0, 255),
                (self.agent_positions[1] + 0.5) * pix_square_size,
                pix_square_size / 3,
            )
            # Finally, add some gridlines
            for x in range(self.size + 1):
                pygame.draw.line(
                    canvas,
                    0,
                    (0, pix_square_size * x),
                    (self.window_size, pix_square_size * x),
                    width=3,
                )
                pygame.draw.line(
                    canvas,
                    0,
                    (pix_square_size * x, 0),
                    (pix_square_size * x, self.window_size),
                    width=3,
                )
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])
            
    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

In [2]:
# Register the environment
gym.envs.registration.register(
    id='TwoAgentGridworld-v0',
    entry_point=TwoAgentGridworldEnv,
    max_episode_steps=100,
)

# Example of creating and using the environment
env = gym.make('TwoAgentGridworld-v0', render_mode='human')
env.metadata['render_fps'] = 8
obs = env.reset()
env.render()

done = False
while not done:
    action = env.action_space.sample()
    obs, reward, done, _,  info = env.step(action)
    env.render()
    print(f"Obs: {obs}, Reward: {reward}, Done: {done}")

  logger.deprecation(
  logger.deprecation(
  logger.warn(
  logger.warn(f"{pre} was expecting a tuple, actual type: {type(obs)}")
  logger.warn(
  gym.logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")


Obs: {'agent1': array([0, 1]), 'agent2': array([8, 9])}, Reward: 0, Done: False
Obs: {'agent1': array([0, 2]), 'agent2': array([8, 8])}, Reward: 0, Done: False
Obs: {'agent1': array([0, 3]), 'agent2': array([8, 7])}, Reward: 0, Done: False
Obs: {'agent1': array([1, 3]), 'agent2': array([7, 7])}, Reward: 0, Done: False
Obs: {'agent1': array([2, 3]), 'agent2': array([7, 8])}, Reward: 0, Done: False
Obs: {'agent1': array([2, 4]), 'agent2': array([6, 8])}, Reward: 0, Done: False
Obs: {'agent1': array([2, 5]), 'agent2': array([5, 8])}, Reward: 0, Done: False
Obs: {'agent1': array([3, 5]), 'agent2': array([4, 8])}, Reward: 0, Done: False
Obs: {'agent1': array([3, 6]), 'agent2': array([4, 9])}, Reward: 0, Done: False
Obs: {'agent1': array([2, 6]), 'agent2': array([4, 8])}, Reward: 0, Done: False
Obs: {'agent1': array([2, 5]), 'agent2': array([3, 8])}, Reward: 0, Done: False
Obs: {'agent1': array([3, 5]), 'agent2': array([2, 8])}, Reward: 0, Done: False
Obs: {'agent1': array([4, 5]), 'agent2':

In [3]:
env.close()

In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pygame

class ThreeAgentGridworldEnv(gym.Env):
    metadata = {'render_modes': ['human', 'print', 'rgb_array'], "render_fps": 4}
    
    def __init__(self, render_mode=None, grid_size=(10, 10)):
        super(ThreeAgentGridworldEnv, self).__init__()
        self.size = grid_size[0]  # The size of the square grid
        self.window_size = 800  # The size of the PyGame window
        
        self.grid_size = grid_size
        self.action_space = spaces.MultiDiscrete([4, 4, 4])  # 4 possible actions for each of the two agents
        self.observation_space = spaces.Tuple((
            spaces.Box(low=0, high=grid_size[0]-1, shape=(2,), dtype=np.int32),
            spaces.Box(low=0, high=grid_size[1]-1, shape=(2,), dtype=np.int32),
            spaces.Box(low=0, high=grid_size[1]-1, shape=(2,), dtype=np.int32)
        ))
        
        
        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode
        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None
        # Reset the environment and start
        self.reset()

    def reset(self, seed=0):
        self.agent_positions = [
            np.array([0, 0]),  # Agent 1 starts at top-left corner
            np.array([self.grid_size[0]-1, self.grid_size[1]-1]),  # Agent 2 starts at bottom-right corner,
            np.array([self.grid_size[0]-3, self.grid_size[1]-3])  # Agent 3 starts at specific position,
        ]
        return self._get_obs(), {}

    def _get_obs(self):
        return {'agent1': self.agent_positions[0], 'agent2': self.agent_positions[1], 'agent3': self.agent_positions[2]}

    def step(self, action):
        # Define the movements corresponding to each action
        movements = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # up, down, left, right
        rewards = 0
        
        # Update the positions of both agents
        for i, act in enumerate(action):
            movement = movements[act]
            new_position = self.agent_positions[i] + movement
            # Ensure the new position is within bounds
            new_position = np.clip(new_position, [0, 0], [self.grid_size[0]-1, self.grid_size[1]-1])
            self.agent_positions[i] = new_position
        
        # Placeholder for terminal state and rewards
        done = False
        reward = 0
        
        # If the agents meet at the same position, we can assign a reward or consider it a terminal state
        if np.array_equal(self.agent_positions[0], self.agent_positions[1]):
            done = True
            reward = 10  # Example reward for meeting at the same position
        
        obs = self._get_obs()
        return obs, reward, done, False, {}

    def render(self):
        if self.render_mode == 'print':
            grid = np.zeros(self.grid_size)
            grid[tuple(self.agent_positions[0])] = 1  # Mark the position of the first agent
            grid[tuple(self.agent_positions[1])] = 2  # Mark the position of the second agent
            print(grid)
        elif self.render_mode == 'human':
            if self.window is None: # Initialize pygame if it is not initialized
                pygame.init()
                pygame.display.init()
                self.window = pygame.display.set_mode(
                    (self.window_size, self.window_size)
                )
            if self.clock is None:
                self.clock = pygame.time.Clock()
            
            # Fill the canvas
            canvas = pygame.Surface((self.window_size, self.window_size))
            canvas.fill((255, 255, 255))
            pix_square_size = (
                self.window_size / self.size
            )  # The size of a single grid square in pixels


            # First we draw agent1 square
            pygame.draw.rect(
                canvas,
                (255, 0, 0),
                pygame.Rect(
                    pix_square_size * self.agent_positions[0],
                    (pix_square_size, pix_square_size),
                ),
            )
            # Now we draw the agent2 circle
            pygame.draw.circle(
                canvas,
                (0, 0, 255),
                (self.agent_positions[1] + 0.5) * pix_square_size,
                pix_square_size / 3,
            )
            # And agent3 circle
            pygame.draw.circle(
                canvas,
                (0, 255, 0),
                (self.agent_positions[2] + 0.5) * pix_square_size,
                pix_square_size / 3,
            )

            # Finally, add some gridlines
            for x in range(self.size + 1):
                pygame.draw.line(
                    canvas,
                    0,
                    (0, pix_square_size * x),
                    (self.window_size, pix_square_size * x),
                    width=3,
                )
                pygame.draw.line(
                    canvas,
                    0,
                    (pix_square_size * x, 0),
                    (pix_square_size * x, self.window_size),
                    width=3,
                )
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])

        elif self.render_mode == 'rgb_array':  # rgb_array
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )
            
    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

In [2]:
# Register the environment
gym.envs.registration.register(
    id='ThreeAgentGridworld-v0',
    entry_point=ThreeAgentGridworldEnv,
    max_episode_steps=100,
)

# Example of creating and using the environment
env = gym.make('ThreeAgentGridworld-v0', render_mode='human')
env.metadata['render_fps'] = 8
obs, info = env.reset()
env.render()

done = False
while not done:
    pygame.event.get()
    action = env.action_space.sample()
    obs, reward, done, _,  info = env.step(action)
    env.render()
    print(f"Obs: {obs}, Reward: {reward}, Done: {done}")

  logger.warn(
  logger.deprecation(
  logger.warn(f"{pre} was expecting a tuple, actual type: {type(obs)}")
  logger.warn(
  gym.logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(f"{pre} was expecting a tuple, actual type: {type(obs)}")
  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")


Obs: {'agent1': array([0, 1]), 'agent2': array([9, 8]), 'agent3': array([7, 6])}, Reward: 0, Done: False
Obs: {'agent1': array([0, 2]), 'agent2': array([9, 9]), 'agent3': array([8, 6])}, Reward: 0, Done: False
Obs: {'agent1': array([0, 2]), 'agent2': array([9, 9]), 'agent3': array([9, 6])}, Reward: 0, Done: False
Obs: {'agent1': array([0, 3]), 'agent2': array([8, 9]), 'agent3': array([9, 5])}, Reward: 0, Done: False
Obs: {'agent1': array([0, 4]), 'agent2': array([8, 8]), 'agent3': array([8, 5])}, Reward: 0, Done: False
Obs: {'agent1': array([1, 4]), 'agent2': array([8, 9]), 'agent3': array([7, 5])}, Reward: 0, Done: False
Obs: {'agent1': array([1, 3]), 'agent2': array([8, 9]), 'agent3': array([7, 6])}, Reward: 0, Done: False
Obs: {'agent1': array([0, 3]), 'agent2': array([7, 9]), 'agent3': array([8, 6])}, Reward: 0, Done: False
Obs: {'agent1': array([0, 3]), 'agent2': array([8, 9]), 'agent3': array([8, 5])}, Reward: 0, Done: False
Obs: {'agent1': array([1, 3]), 'agent2': array([8, 8]),

In [3]:
env.close()

In [25]:
# Record video
env_rgb = gym.make('ThreeAgentGridworld-v0', render_mode='rgb_array')
env = gym.wrappers.RecordVideo(env=env_rgb, video_folder="/", name_prefix="test-video", episode_trigger=lambda x: x % 2 == 0)

obs, info = env.reset()
# env.render()

# Start the recorder
env.start_video_recorder()

done = False
while not done:
    pygame.event.get()
    action = env.action_space.sample()
    obs, reward, done, _,  info = env.step(action)
    env.render()
    print(f"Obs: {obs}, Reward: {reward}, Done: {done}")

# Don't forget to close the video recorder before the env!
env.close_video_recorder()

# Close the environment
env.close()


  logger.warn(


TypeError: ThreeAgentGridworldEnv.reset() got an unexpected keyword argument 'options'