# Create ConnectX Environment

In [1]:
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from typing import Callable

from ipywidgets import IntProgress
from IPython.display import display

from kaggle_environments import evaluate, make, utils
from kaggle_environments.utils import Struct

env = make("connectx", debug=True)
env.render()

Loading environment lux_ai_2022 failed: No module named 'vec_noise'


# Create agents

In [2]:
class AgentModel(nn.Module):
    
    def __init__(self):
        super(AgentModel, self).__init__()
        self.input_layer = nn.Linear(44, 128)
        self.middle_layer = nn.Linear(128, 128) 
        self.output_layer = nn.Linear(128, 7) 
        self.optimizer = optim.Adam(self.parameters(), lr=0.003)
        self.loss = nn.MSELoss()
        self.device = 'cpu'
        self.to(self.device)

    def load_action(
        self,
        observation_input: list,
        board: list,
        rows: int,
        cols: int
    ) -> int:
        state = T.tensor(observation_input, dtype=T.float32).to('cpu')
        actions = self.forward(state)

        board = np.array(board).reshape(rows, cols).T
        base_actions_list: list = actions.tolist()
        final_actions_list: list = actions.tolist()      
        actions_dict = {k: v for k, v in zip(base_actions_list, range(len(base_actions_list)))}
        
        for i in range(cols):
            if board[i][0]:
                final_actions_list.remove(base_actions_list[i])
        
        if len(final_actions_list):
            action = actions_dict[max(final_actions_list)]
        else:
            action = 0
        
        return action

    def forward(self, state) -> T.tensor:
        x = F.relu(self.input_layer(state))
        x = F.relu(self.middle_layer(x))
        actions = self.output_layer(x)

        return actions

In [3]:
def random_agent(observation: Struct, configuration: Struct = None) -> int:
    '''Random agent'''
    from random import choice
    return choice([c for c in range(7) if observation.board[c] == 0])

In [4]:
def base_agent(observation: Struct, configuration: Struct = None) -> int:
    weights_path = './data/base_weights'
    model = AgentModel()
    model.load_state_dict(T.load(weights_path))

    if configuration is None:
        rows=6
        cols=7
    else:
        rows = configuration.rows
        cols = configuration.cols
        
    observation_input= observation.board + [observation.step, observation.mark]
    action = model.load_action(observation_input, observation.board, rows, cols)

    return action

In [5]:
def extra_reward_agent(observation: Struct, configuration: Struct = None) -> int:
    weights_path = './data/punishment_and_extra_rewards_weights'
    model = AgentModel()
    model.load_state_dict(T.load(weights_path))

    if configuration is None:
        rows=6
        cols=7
    else:
        rows = configuration.rows
        cols = configuration.cols
        
    observation_input= observation.board + [observation.step, observation.mark]
    action = model.load_action(observation_input, observation.board, rows, cols)

    return action

In [6]:
def extra_punish_agent(observation: Struct, configuration: Struct = None) -> int:
    weights_path = './data/punishment_minus_3_weights'
    model = AgentModel()
    model.load_state_dict(T.load(weights_path))

    if configuration is None:
        rows=6
        cols=7
    else:
        rows = configuration.rows
        cols = configuration.cols
        
    observation_input= observation.board + [observation.step, observation.mark]
    action = model.load_action(observation_input, observation.board, rows, cols)

    return action

# Test your Agent

In [10]:
def test_agent(
    n_games: int,
    trainer: Struct,
    agent: Callable
) -> float:
    wins = 0
    # f = IntProgress(min=0, max=n_games, width=100) # instantiate the bar
    # display(f)

    for i in range(n_games):
        done  = False
        observation = trainer.reset()
        
        while not done:
            action = agent(observation)
            observation, reward, done, info = trainer.step(int(action))
            wins = wins + 1 if reward == 1 else wins
        
        # f.value += 1
    
    return wins/n_games

In [11]:
env = make("connectx", debug=True)
env.render()

n_games = 1000
trainer = env.train([None, "negamax"])
wins_ratio = test_agent(n_games, trainer, extra_punish_agent)

print(wins_ratio)

0.136


In [12]:
env = make("connectx", debug=True)
env.render()

n_games = 1000
trainer = env.train([None, "random"])
wins_ratio = test_agent(n_games, trainer, extra_punish_agent)

print(wins_ratio)

0.848
