In [1]:
from ultimatetictactoe import ultimatetictactoe

import torch
from torch import nn

import numpy as np

import random

  from pkg_resources import resource_stream, resource_exists


In [None]:
Policy = nn.Sequential(
    nn.Conv2d(3, 2, 3, padding=1),
    nn.Conv2d(2, 5, 3, padding=1),
    nn.BatchNorm2d(5),
    nn.ReLU(),
    nn.Conv2d(5, 5, 3, padding=1),
    nn.BatchNorm2d(5),
    nn.ReLU(),
    nn.Flatten(),
    nn.Linear(405, 405),
    nn.ReLU(),
    nn.Linear(405, 200),
    nn.ReLU(),
    nn.Linear(200, 81),
    nn.Softmax()
)

In [3]:
env = ultimatetictactoe.env(render_mode="human")
env.reset(42)

In [4]:
# c = 0
# for agent in env.agent_iter():
#     print(agent)
#     c += 1
#     observation, reward, termination, truncation, info = env.last()

#     if termination or truncation:
#         action = None
#     else:
#         # this is where you would insert your policy
#         mask = observation["action_mask"]
#         action = env.action_space(agent).sample(mask)

#     env.step(action)

In [48]:
def trajectory(env, agent1, agent2):
    env.reset()

    states = []
    actions = []
    rewards = []
    observation = env.last()[0]

    counter = 0
    for agent in env.agent_iter():
        
        observation, reward, termination, truncation, info = env.last()
        if termination or truncation:
            break
        state = observation['observation']

        if agent == "player_1":
            action = agent1.pick_action(env)
        else:
            action = agent2.pick_action(env)
        
        states.append(state)
        actions.append(action)
        rewards.append(reward)

        # rewards
        if counter >= 2:
            if agent == "player_1":
                rewards
        
        env.step(action)
        counter += 1
    return states, actions, rewards

def split_trajectory(trajectory):
    # states, actions, rewards = trajectory
    # return {"player_1": {'states': states[0::2], 'actions': actions[0::2], 'rewards': rewards}}

In [None]:
class Agent:

    def __init__(self, name, type='random'):
        self.name = name
    
    def pick_action(self, env):
        action_mask = env.last()[0]['action_mask']
        return env.action_space(self.name).sample(action_mask)

In [201]:
import torch.optim as optim

In [50]:
a1 = Agent("player_1")
a2 = Agent("player_2")

In [55]:
env.metadata['render_fps'] = 5

In [57]:
r = trajectory(env, a1, a2)

In [58]:
states, actions, rewards = r

In [75]:
torch.tensor(states[0]).shape

torch.Size([2, 9, 9])

In [None]:
# optimizer = optim.Adam(Policy.parameters(), lr=1e-3)

In [None]:
class SimplePolicy(nn.Module):

    def __init__(self, net = None):
        super(Policy, self).__init__()

        if net is None:
            self.net = nn.Sequential(
                nn.Conv2d(3, 3, 3, padding=1),
                nn.ReLU(),
                nn.Flatten(),
                nn.Linear(243, 81),
            )
        else:
            self.net = net

        self.softmax = nn.Softmax(1)
    
    def forward(self, state: torch.Tensor):
        """state should be tensor of shape (B, 3, 9, 9)"""
        action_mask = state[:, 2, :, :].reshape(state.size(0), -1).bool()
        logits = self.net(state) # (B, 81)
        masked_logits = logits.masked_fill(~action_mask, float('-inf'))
        probs = self.softmax(masked_logits)
        return probs


In [None]:
class NeuralAgent:
    """TODO"""

    def __init__(
        self, 
        name, 
        policy_net, 
        optimizer,
        device,
        mode = 'train'
    ):
        self.name = name
        self.policy_net = policy_net
        self.optimizer = optimizer
        self.device = device
        self.mode = mode

    def pick_action(self, obs: torch.Tensor):
        """
        Obs is a (B, 3, 9, 9) tensor, where 
        the first two channels are the players moves on the board
        and the last one is the action mask for the board
        """

        probs = self.policy_net(obs.to(self.device))
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action, log_prob

    def update(self, loss):
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

In [301]:
a = torch.randint(0, 2, (2, 3, 9, 9)).to(torch.float)
t = torch.rand((2, 81))
torch.distributions.Categorical(t)

Categorical(probs: torch.Size([2, 81]))

In [302]:
policy = Policy()
optimizer = optim.Adam(policy.parameters(), lr=1e-3)
ag = NeuralAgent("ciao", policy, optimizer, torch.device("cpu"))

In [304]:
ag.pick_action(a)

(tensor([44, 62]), tensor([-3.6152, -3.6094], grad_fn=<SqueezeBackward1>))

In [263]:
ag.pick_action(a)

(tensor([ 7, 48]), tensor([-4.4060, -4.3525], grad_fn=<SqueezeBackward1>))

In [None]:
class Reinforce:

    def __init__(self, env, num_episodes, agent1, agent2):
        self.env = env


# stable baseline

In [71]:
import glob
import os
import time

import gymnasium as gym
from sb3_contrib import MaskablePPO
from sb3_contrib.common.maskable.policies import MaskableActorCriticPolicy
from sb3_contrib.common.wrappers import ActionMasker

import pettingzoo.utils
from pettingzoo.classic import connect_four_v3

In [None]:
# To pass into other gymnasium wrappers, we need to ensure that pettingzoo's wrappper
# can also be a gymnasium Env. Thus, we subclass under gym.Env as well.
class SB3ActionMaskWrapper(pettingzoo.utils.BaseWrapper, gym.Env):
    """Wrapper to allow PettingZoo environments to be used with SB3 illegal action masking."""

    def reset(self, seed=None, options=None):
        """Gymnasium-like reset function which assigns obs/action spaces to be the same for each agent.

        This is required as SB3 is designed for single-agent RL and doesn't expect obs/action spaces to be functions
        """
        super().reset(seed, options)

        # Strip the action mask out from the observation space
        self.observation_space = super().observation_space(self.possible_agents[0])[
            "observation"
        ]
        self.action_space = super().action_space(self.possible_agents[0])

        # Return initial observation, info (PettingZoo AEC envs do not by default)
        return self.observe(self.agent_selection), {}

    def step(self, action):
        """Gymnasium-like step function, returning observation, reward, termination, truncation, info.

        The observation is for the next agent (used to determine the next action), while the remaining
        items are for the agent that just acted (used to understand what just happened).
        """
        current_agent = self.agent_selection

        super().step(action)

        next_agent = self.agent_selection
        return (
            self.observe(next_agent),
            self._cumulative_rewards[current_agent],
            self.terminations[current_agent],
            self.truncations[current_agent],
            self.infos[current_agent],
        )

    def observe(self, agent):
        """Return only raw observation, removing action mask."""
        return super().observe(agent)["observation"]

    def action_mask(self):
        """Separate function used in order to access the action mask."""
        return super().observe(self.agent_selection)["action_mask"]


def mask_fn(env):
    # Do whatever you'd like in this function to return the action mask
    # for the current env. In this example, we assume the env has a
    # helpful method we can rely on.
    return env.action_mask()


def train_action_mask(env_fn, steps=10_000, seed=0, **env_kwargs):
    """Train a single model to play as each agent in a zero-sum game environment using invalid action masking."""
    env = env_fn.env(**env_kwargs)

    print(f"Starting training on {str(env.metadata['name'])}.")

    # Custom wrapper to convert PettingZoo envs to work with SB3 action masking
    env = SB3ActionMaskWrapper(env)

    env.reset(seed=seed)  # Must call reset() in order to re-define the spaces

    env = ActionMasker(env, mask_fn)  # Wrap to enable masking (SB3 function)
    # MaskablePPO behaves the same as SB3's PPO unless the env is wrapped
    # with ActionMasker. If the wrapper is detected, the masks are automatically
    # retrieved and used when learning. Note that MaskablePPO does not accept
    # a new action_mask_fn kwarg, as it did in an earlier draft.
    model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=1)
    model.set_random_seed(seed)
    model.learn(total_timesteps=steps)

    model.save(f"{env.unwrapped.metadata.get('name')}_{time.strftime('%Y%m%d-%H%M%S')}")

    print("Model has been saved.")

    print(f"Finished training on {str(env.unwrapped.metadata['name'])}.\n")

    env.close()


def eval_action_mask(env_fn, num_games=100, render_mode=None, **env_kwargs):
    # Evaluate a trained agent vs a random agent
    env = env_fn.env(render_mode=render_mode, **env_kwargs)

    print(
        f"Starting evaluation vs a random agent. Trained agent will play as {env.possible_agents[1]}."
    )

    try:
        latest_policy = max(
            glob.glob(f"{env.metadata['name']}*.zip"), key=os.path.getctime
        )
    except ValueError:
        print("Policy not found.")
        exit(0)

    model = MaskablePPO.load(latest_policy)

    scores = {agent: 0 for agent in env.possible_agents}
    total_rewards = {agent: 0 for agent in env.possible_agents}
    round_rewards = []

    for i in range(num_games):
        env.reset(seed=i)
        env.action_space(env.possible_agents[0]).seed(i)

        for agent in env.agent_iter():
            obs, reward, termination, truncation, info = env.last()

            # Separate observation and action mask
            observation, action_mask = obs.values()

            if termination or truncation:
                # If there is a winner, keep track, otherwise don't change the scores (tie)
                if (
                    env.rewards[env.possible_agents[0]]
                    != env.rewards[env.possible_agents[1]]
                ):
                    winner = max(env.rewards, key=env.rewards.get)
                    scores[winner] += env.rewards[
                        winner
                    ]  # only tracks the largest reward (winner of game)
                # Also track negative and positive rewards (penalizes illegal moves)
                for a in env.possible_agents:
                    total_rewards[a] += env.rewards[a]
                # List of rewards by round, for reference
                round_rewards.append(env.rewards)
                break
            else:
                if agent == env.possible_agents[0]:
                    act = env.action_space(agent).sample(action_mask)
                else:
                    # Note: PettingZoo expects integer actions # TODO: change chess to cast actions to type int?
                    act = int(
                        model.predict(
                            observation, action_masks=action_mask, deterministic=True
                        )[0]
                    )
            env.step(act)
    env.close()

    # Avoid dividing by zero
    if sum(scores.values()) == 0:
        winrate = 0
    else:
        winrate = scores[env.possible_agents[1]] / sum(scores.values())
    print("Rewards by round: ", round_rewards)
    print("Total rewards (incl. negative rewards): ", total_rewards)
    print("Winrate: ", winrate)
    print("Final scores: ", scores)
    return round_rewards, total_rewards, winrate, scores


if __name__ == "__main__":
    env_fn = connect_four_v3

    env_kwargs = {}

    # Evaluation/training hyperparameter notes:
    # 10k steps: Winrate:  0.76, loss order of 1e-03
    # 20k steps: Winrate:  0.86, loss order of 1e-04
    # 40k steps: Winrate:  0.86, loss order of 7e-06

    # Train a model against itself (takes ~20 seconds on a laptop CPU)
    train_action_mask(env_fn, steps=20_480, seed=0, **env_kwargs)

    # Evaluate 100 games against a random agent (winrate should be ~80%)
    eval_action_mask(env_fn, num_games=100, render_mode=None, **env_kwargs)

    # Watch two games vs a random agent
    eval_action_mask(env_fn, num_games=2, render_mode="human", **env_kwargs)

Starting training on connect_four_v3.
Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 21.9     |
|    ep_rew_mean     | 1        |
| time/              |          |
|    fps             | 353      |
|    iterations      | 1        |
|    time_elapsed    | 5        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 20.7        |
|    ep_rew_mean          | 1           |
| time/                   |             |
|    fps                  | 304         |
|    iterations           | 2           |
|    time_elapsed         | 13          |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.010001426 |
|    clip_fraction        | 0.0792      |
|    clip_range        