In [1]:
from pettingzoo.classic import tictactoe_v3
from pettingzoo.classic import connect_four_v3
# from pettingzoo.classic import chess_v6

from torch.utils.tensorboard import SummaryWriter

import os
import torch
import brl

In [2]:
algo = "Reinforce"

environment = connect_four_v3
environment_name = environment.__name__.split(".")[-1]

if environment_name == "tictactoe_v3":
    nb_actions = 9

    model = torch.nn.Sequential(
          torch.nn.Linear(18, 32),
          torch.nn.ReLU(),
          torch.nn.Linear(32, nb_actions),
          torch.nn.ReLU()
        )

    optimizer = torch.optim.SGD
    optimizer_args = {
        "lr": 3e-3,
        "momentum": 0.01,
        "nesterov": True
    }

    nb_episodes_before_opti = 4
    nb_epoch = 40_000

elif environment_name == "connect_four_v3":
    nb_actions = 7

    model = torch.nn.Sequential(
          torch.nn.Linear(84, 512),
          torch.nn.ReLU(),
          torch.nn.Linear(512, 256),
          torch.nn.ReLU(),
          torch.nn.Linear(256, 128),
          torch.nn.ReLU(),
          torch.nn.Linear(128, nb_actions),
          # torch.nn.ReLU()
        )

    optimizer = torch.optim.SGD
    optimizer_args = {
        "lr": 2e-14,
        "momentum": 0.0001,
        "nesterov": True
    }

    nb_episodes_before_opti = 8
    nb_epoch = 1_000_000

elif environment_name == "chess_v6":
    nb_actions = 7
    learning_rate = 1e-4
    nb_episodes_before_opti = 32

    nb_epoch = 400_000

exp_name_build_name = algo + "_n=" + str(nb_episodes_before_opti) + "_" + str(optimizer.__name__) + "".join(["_" + args + "=" + str(optimizer_args[args]) for args in optimizer_args.keys()])

print("Applaying", exp_name_build_name, "on", environment_name)

Applaying Reinforce_n=8_SGD_lr=2e-14_momentum=0.0001_nesterov=True on connect_four_v3


In [3]:
import torch
import numpy as np

class RandomPolicy():
    def __init__(self, action_size):
        self.action_size = action_size

    def act(self, observation, mask=None):
        actions = np.arange(self.action_size)
        actions = actions[mask.astype(bool)]
        return np.random.choice(actions)

class RandomAgent():

    def __init__(self, policy):
        self.loss = 0
        self.policy = policy

    def observe(self, observation, action, reward, next_state, terminated):
        pass

    def optimize(self):
        pass

In [4]:
import torch

class MyPolicy(torch.nn.Module):

    def __init__(self, model):
        super(MyPolicy, self).__init__()
        self.model = model

    def forward(self, x):
        x = x.float().flatten(start_dim=-3)
        return self.model(x)

    @torch.inference_mode()
    def act(self, observation, mask):
        observation = torch.from_numpy(observation).float()
        mask = torch.from_numpy(mask).long()
        action_values = self.forward(observation)
        probabilities = (torch.nn.functional.softmax(action_values, dim=0) * mask) + (1e-12 * mask)
        action = torch.multinomial(probabilities, num_samples=1).item()
        return action


def _init_weights(m):
    if isinstance(m, torch.nn.Linear):
        torch.nn.init.uniform_(m.weight)
        m.bias.data.fill_(0.01)


In [5]:
from torch.utils.tensorboard import SummaryWriter

experiment_name = environment_name + "/" + exp_name_build_name
log_dir = os.path.join("../../tensorboard", experiment_name)

writer = SummaryWriter(log_dir=log_dir)

def run_env(env, agents):
    env.reset()

    last_actions = {agent: None for agent in agents.keys()}
    last_observation = {agent: None for agent in agents.keys()}

    last_reward = {agent: None for agent in agents.keys()}

    for agent in env.agent_iter():
        observation, reward, termination, truncation, info = env.last()

        obs = observation["observation"]
        mask = observation["action_mask"]

        if termination or truncation:
            action = None
            last_reward[agent] = reward
            agents[agent].observe(last_observation[agent], last_actions[agent], reward, obs, True)
        else:
            if last_observation[agent] is not None:
                agents[agent].observe(last_observation[agent], last_actions[agent], reward, obs, False)

            # this is where you would insert your policy
            action = agents[agent].policy.act(obs, mask)

            last_observation[agent] = obs
            last_actions[agent] = action

        env.step(action)
    
    return last_reward

In [6]:
rewards = [0] * 1000

def train(env, agents, n):
    agent_to_log = env.agents[0]

    for epoch in range(n + 1):
        reward = run_env(env, agents)
        rewards[epoch % len(rewards)] = reward["player_1"]

        if epoch % len(rewards) == 0:
            writer.add_scalar("train/mean_reward", sum(rewards) / len(rewards), epoch)
            writer.add_scalar("train/loss", agents[agent_to_log].loss, epoch)

In [7]:
env = environment.env(render_mode="rgb")
env.reset()

state_shape = env.observation_space("player_1")["observation"].shape

print(env.agents)
print(env.observation_space("player_1")["observation"].shape)

['player_0', 'player_1']
(6, 7, 2)


In [8]:
policy = MyPolicy(model=model)
policy.apply(_init_weights)

reinforce_agent = brl.reinforce.Reinforce(policy=policy, optimizer=optimizer, optimizer_parameters=optimizer_args, nb_episodes=nb_episodes_before_opti)

In [9]:
random_policy = RandomPolicy(action_size=nb_actions)
random_agent = RandomAgent(policy=random_policy)

In [10]:
agents = {
    env.agents[0]: reinforce_agent,
    env.agents[1]: random_agent
}

In [11]:
train(env, agents, n=nb_epoch)

KeyboardInterrupt: 