# Environment Setup - Rabail's Part

### Required Imports ###
1. chess: The python-chess library. It handles the basic chess board logic such as legal moves, checkmate, etc.
2. numpy: The numerical computing library. It has been used for the tensors representing the chess board.
3. AECEnv: The Agent Environment Cycle. It builds the environment to handle the cycle of agents' turn sequence.
4. AgentSelector: A utitlity which manages the turns of the agents.
5. spaces: It is a gym-style API used to define action and observation spaces (what agents can do and observe).
6. gym: It has been used for the general reinforcement learning compatibility.

In [12]:
!pip install python-chess
!pip install pettingzoo
!pip install gymnasium



In [13]:
import chess
import numpy as np
from pettingzoo import AECEnv
from pettingzoo.utils.agent_selector import AgentSelector
from gymnasium import spaces
import gym
import random

### Chess Environment Class ###
This defines the basic chess environment. The environment inherits from AECEnv, a part of the pettingzoo framework, for the multiagent interactions.
- The "metadata" specifies environment settings.
- The "init(self)" constructor method that initializes the environment.
- The "reset()" resets the environment for a new game.
- The "observe()" returns the current board state in tensor format which is supposed to be used by the agent for the observation.
- The "board_to_tensor()" converts the board to a tensor.
- The "step()" handles the agent's action, checks for the end of game and returns the state.
- The "render()" print the current board state.

In [237]:
from pettingzoo import AECEnv
from gymnasium import spaces
import chess
import numpy as np
import random
from pettingzoo.utils.agent_selector import agent_selector

class ChessEnvironment(AECEnv):
    metadata = {'render_modes': ['human'], 'name': "Chess-v0", 'is_parallelizable': True}

    def __init__(self, render_mode=None):
        super().__init__()
        self.render_mode = render_mode
        self.board = chess.Board()
        self.agents = ["w", "b"]
        self.possible_agents = self.agents[:]
        self.current_agent_index = 0
        self.current_agent = self.agents[self.current_agent_index]
        self.agent_selection = self.current_agent
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}

        self._action_space = spaces.Discrete(4672)
        self._observation_space = spaces.Box(low=0, high=1, shape=(8, 8, 12), dtype=np.int8)

    def action_space(self, agent):
        return self._action_space

    def observation_space(self, agent):
        return self._observation_space

    def reset(self, seed=None, options=None):
        self.agents = ["w", "b"]
        self.board.reset()
        self.current_agent_index = 0
        self.current_agent = self.agents[self.current_agent_index]
        self.agent_selection = self.current_agent
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        self.agent_selector = agent_selector(self.agents)
        self.agent_selection = self.agent_selector.next()
        self._game_over_pending = False  # Initialize the game over flag

    def observe(self, agent):
        return self._board_to_tensor()

    def _board_to_tensor(self):
        piece_map = self.board.piece_map()
        tensor = np.zeros((8, 8, 12), dtype=np.int8)

        for square, piece in piece_map.items():
            row = 7 - (square // 8)
            col = square % 8
            piece_type = piece.piece_type - 1
            color_offset = 0 if piece.color == chess.WHITE else 6
            tensor[row, col, piece_type + color_offset] = 1

        return tensor

    def step(self, action):
        agent = self.agent_selection

        if self.terminations[agent] or self.truncations[agent]:
            self._was_game_over = self._game_over_pending
            self.agent_selection = self.agent_selector.next()
            return

        # Get all legal moves
        legal_moves = list(self.board.legal_moves)

        # Ensure the action is within legal moves
        if 0 <= action < len(legal_moves):
            move = legal_moves[action]
            self.board.push(move)
        else:
            # Invalid move (penalize)
            self.rewards[agent] = -1
            other_agent = [a for a in self.agents if a != agent][0]
            self.rewards[other_agent] = 1

            # MARK game as pending: don't terminate yet!
            self._game_over_pending = True
            # Optionally store result for later

            # Advance agent cycle
            self.current_agent_index = 1 - self.current_agent_index
            self.current_agent = self.agents[self.current_agent_index]
            self.agent_selection = self.current_agent
            return self.observe(self.current_agent), self.rewards[self.current_agent], self.terminations[self.current_agent], self.truncations[self.current_agent], self.infos[self.current_agent]

        # Game over detection (bad move or checkmate/stalemate)
        game_over = False
        if self.board.is_game_over():
            result = self.board.result()
            if result == "1-0":
                self.rewards = {"w": 1, "b": -1}
            elif result == "0-1":
                self.rewards = {"w": -1, "b": 1}
            elif result == "1/2-1/2":  # Draw condition
                self.rewards = {"w": 0, "b": 0}
            game_over = True

        # Don't mark terminate/truncate yet! (this is the pending part)
        if game_over:
            self._game_over_pending = True
            # Optionally store rewards here, not in self.rewards yet
        else:
            # Normal transitions for an alive game
            self.rewards = {agent: 0 for agent in self.agents}
            self.terminations = {agent: False for agent in self.agents}
            self.truncations = {agent: False for agent in self.agents}

            self.current_agent_index = 1 - self.current_agent_index
            self.current_agent = self.agents[self.current_agent_index]
            self.agent_selection = self.current_agent  # Set the current agent for the next step

        # -- At the end of the cycle (after both agents have acted) --
        if self._game_over_pending and self.agent_selection == self.agents[0]:
            # End of cycle: now both agents can be terminated cleanly
            self.terminations = {agent: True for agent in self.agents}
            self.truncations = {agent: False for agent in self.agents}
            self._game_over_pending = False  # Reset the flag for the next cycle

            if all(self.terminations[agent] or self.truncations[agent] for agent in self.agents):
                self.agents = []  # End game and clear agents

        # Ensure game state consistency
        if self.current_agent is None:
            return None, 0, True, False, {}  # Game over, no agent to act

        return self.observe(self.current_agent), self.rewards[self.current_agent], self.terminations[self.current_agent], self.truncations[self.current_agent], self.infos[self.current_agent]

    def render(self):
        print(self.board)

    def close(self):
        pass

In [238]:
env = ChessEnvironment()

In [239]:
import random

# Reset the environment
env.reset()
env.render()
print()

# Play until the game is over
while env.agents:  # This checks if there are any agents left to play
    agent = env.agent_selection

    # If the agent is terminated or truncated, skip their turn
    if env.terminations[agent] or env.truncations[agent]:
        env.step(None)  # Continue to the next agent
    else:
        # Get all legal moves for the current agent
        legal_moves = list(env.board.legal_moves)

        # If there are no legal moves, skip the agent's turn
        if not legal_moves:
            print(f"No legal moves for {agent}, skipping turn.")
            env.step(None)
            continue

        # Select a random move for this agent (could be modified for AI)
        move_index = random.choice(range(len(legal_moves)))
        obs, reward, terminated, truncated, info = env.step(move_index)

        # Print the move made by the agent and render the board
        print(f"\nMove by {agent}: {legal_moves[move_index]}")
        env.render()

# Final game result
result = env.board.result()
print("\nGame Over!")
print("Result:", result)

if result == "1-0":
    print("White (w) wins")
elif result == "0-1":
    print("Black (b) wins")
else:
    print("Draw")

r n b q k b n r
p p p p p p p p
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
P P P P P P P P
R N B Q K B N R


Move by w: a2a3
r n b q k b n r
p p p p p p p p
. . . . . . . .
. . . . . . . .
. . . . . . . .
P . . . . . . .
. P P P P P P P
R N B Q K B N R

Move by b: c7c5
r n b q k b n r
p p . p p p p p
. . . . . . . .
. . p . . . . .
. . . . . . . .
P . . . . . . .
. P P P P P P P
R N B Q K B N R

Move by w: e2e4
r n b q k b n r
p p . p p p p p
. . . . . . . .
. . p . . . . .
. . . . P . . .
P . . . . . . .
. P P P . P P P
R N B Q K B N R

Move by b: h7h5
r n b q k b n r
p p . p p p p .
. . . . . . . .
. . p . . . . p
. . . . P . . .
P . . . . . . .
. P P P . P P P
R N B Q K B N R

Move by w: d1f3
r n b q k b n r
p p . p p p p .
. . . . . . . .
. . p . . . . p
. . . . P . . .
P . . . . Q . .
. P P P . P P P
R N B . K B N R

Move by b: d8a5
r n b . k b n r
p p . p p p p .
. . . . . . . .
q . p . . . . p
. . . . P . . .
P . . . . Q . .
. P P P . P P P
R N B . K B N R



In [196]:
print("Game over due to:")
print("Stalemate:", env.board.is_stalemate())
print("Repetition:", env.board.is_repetition())
print("50-move rule:", env.board.can_claim_fifty_moves())
print("Insufficient material:", env.board.is_insufficient_material())


Game over due to:
Stalemate: False
Repetition: False
50-move rule: True
Insufficient material: False


# MARL Algorithm Development - Vaneeza's Part

In [158]:
!pip install stable-baselines3
!pip install sb3-contrib
!pip install supersuit



In [228]:
import torch
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.vec_env import DummyVecEnv
from sb3_contrib import RecurrentPPO

Stable-Baselines3 (SB3) doesn’t work natively with PettingZoo. So using SuperSuit to make the environment compatible. We can now use wrapped_env with any SB3 algorithm.

In [229]:
from pettingzoo.utils.conversions import aec_to_parallel
import supersuit as ss

parallel_env = aec_to_parallel(env)
# Convert and wrap
vec_env = ss.pettingzoo_env_to_vec_env_v1(parallel_env)
vec_env = ss.concat_vec_envs_v1(vec_env, num_vec_envs=1, num_cpus=1, base_class='stable_baselines3')

## PPO (Policy Gradient Method)


In [240]:
from pettingzoo.utils.conversions import aec_to_parallel
import supersuit as ss
from stable_baselines3 import PPO

# Convert your custom AECEnv to a parallel env
parallel_env = aec_to_parallel(env)

# Apply wrappers directly to the PettingZoo env (before SB3 conversion)
parallel_env = ss.black_death_v3(parallel_env)
parallel_env = ss.dtype_v0(parallel_env, dtype='float32')

# convert to SB3-compatible format
vec_env = ss.pettingzoo_env_to_vec_env_v1(parallel_env)
vec_env = ss.concat_vec_envs_v1(vec_env, num_vec_envs=1, num_cpus=1, base_class='stable_baselines3')

In [241]:
model = PPO("MlpPolicy", vec_env, verbose=1)
model.learn(total_timesteps=10000)

Using cpu device
-----------------------------
| time/              |      |
|    fps             | 1203 |
|    iterations      | 1    |
|    time_elapsed    | 3    |
|    total_timesteps | 4096 |
-----------------------------
-----------------------------------------
| time/                   |             |
|    fps                  | 449         |
|    iterations           | 2           |
|    time_elapsed         | 18          |
|    total_timesteps      | 8192        |
| train/                  |             |
|    approx_kl            | 0.027839337 |
|    clip_fraction        | 0.335       |
|    clip_range           | 0.2         |
|    entropy_loss         | -8.43       |
|    explained_variance   | 0.493       |
|    learning_rate        | 0.0003      |
|    loss                 | -0.0412     |
|    n_updates            | 10          |
|    policy_gradient_loss | -0.0439     |
|    value_loss           | 0.0123      |
-----------------------------------------
-----------------

<stable_baselines3.ppo.ppo.PPO at 0x7abce2b1f550>

In [242]:
model.save("ppo_chess_model")