Import libraries

In [None]:
# !rm -rf ConnectXRL/

In [None]:
%load_ext autoreload
%autoreload 2

%matplotlib inline

In [None]:
import os
import sys
from getpass import getpass
import urllib

user = 'alomb'
password = getpass('Password: ')
password = urllib.parse.quote(password)

cmd_string = 'git clone https://{0}:{1}@github.com/{0}/{2}.git'.format(user, password, 'ConnectXRL')

os.system(cmd_string)
del cmd_string
del password

In [None]:
!pip install kaggle-environments webcolors

In [None]:
sys.path.append('ConnectXRL')
sys.path.append('ConnectXRL/src')
sys.path.append('ConnectXRL/src/connectx')

In [None]:
from random import choice

import numpy as np

import torch
from torch import nn
from torch.nn import functional as F

from kaggle_environments import make, evaluate

from src.connectx.constraints import ConstraintType, Constraints
from src.connectx.environment import ConnectXGymEnv, convert_state_to_image
from src.connectx.evaluate import get_win_percentages
from policy import CNNPolicy
from dqn import DQN

Define configurations of the game

In [None]:
# ----
# Game
# ----
BOARD_COLUMNS = 7
BOARD_ROWS = 6
STONES_TO_WIN = 4

# --------
# Training
# --------
# Rewards
INVALID_REWARD = -1.0
VICTORY_REWARD = 1.0
LOST_REWARD = -1.0
DRAW_REWARD = 0.5

PLAYER_2ND_EPISODES = 2
PLAYER_1ST_EPISODES = 2
SAVE_FREQ = PLAYER_1ST_EPISODES + PLAYER_2ND_EPISODES

# Rendering
RENDER_ENV = False
RENDER_WAITING_TIME = 1
UPDATE_PLOT_FREQ = 100
REWARD_AVG_ROLL_WINDOW_SIZE = 100

Define all the different agents

In [None]:
agents = {
    'DQN': {
        'constraint_type': None,
        'non_local': False,
        'train_first_player': True,
        'train_opponent': 'random',
        'n_episodes_as_1st_player': PLAYER_1ST_EPISODES,
        'n_episodes_as_2nd_player': PLAYER_2ND_EPISODES,
        'train_save_freq': SAVE_FREQ, 
        'train_weights_path': './dqn/',
        'test_weights': SAVE_FREQ,
        'test_opponent': 'negamax',
    },
    'LOGIC_PURE': {
        'constraint_type': ConstraintType.LOGIC_PURE,
        'non_local': False,
        'train_first_player': True,
        'train_opponent': 'random',
        'n_episodes_as_1st_player': PLAYER_1ST_EPISODES,
        'n_episodes_as_2nd_player': PLAYER_2ND_EPISODES,
        'train_save_freq': SAVE_FREQ, 
        'train_weights_path': './logic_pure/',
        'test_weights': SAVE_FREQ,
        'test_opponent': 'negamax',
    },
    'LOGIC_TRAIN': {
        'constraint_type': ConstraintType.LOGIC_TRAIN,
        'non_local': False,
        'train_first_player': True,
        'train_opponent': 'random',
        'n_episodes_as_1st_player': PLAYER_1ST_EPISODES,
        'n_episodes_as_2nd_player': PLAYER_2ND_EPISODES,
        'train_save_freq': SAVE_FREQ, 
        'train_weights_path': './logic_train/',
        'test_weights': SAVE_FREQ,
        'test_opponent': 'negamax',
    },
    'SPE': {
        'constraint_type': ConstraintType.SPE,
        'non_local': False,
        'train_first_player': True,
        'train_opponent': 'random',
        'n_episodes_as_1st_player': PLAYER_1ST_EPISODES,
        'n_episodes_as_2nd_player': PLAYER_2ND_EPISODES,
        'train_save_freq': SAVE_FREQ, 
        'train_weights_path': './spe/',
        'test_weights': SAVE_FREQ,
        'test_opponent': 'negamax',
    },
    'SBR': {
        'constraint_type': ConstraintType.SBR,
        'non_local': False,
        'train_first_player': True,
        'train_opponent': 'random',
        'n_episodes_as_1st_player': PLAYER_1ST_EPISODES,
        'n_episodes_as_2nd_player': PLAYER_2ND_EPISODES,
        'train_save_freq': SAVE_FREQ, 
        'train_weights_path': './sbr/',
        'test_weights': SAVE_FREQ,
        'test_opponent': 'negamax',
    },
    'CDQN': {
        'constraint_type': ConstraintType.CDQN,
        'non_local': False,
        'train_first_player': True,
        'train_opponent': 'random',
        'n_episodes_as_1st_player': PLAYER_1ST_EPISODES,
        'n_episodes_as_2nd_player': PLAYER_2ND_EPISODES,
        'train_save_freq': SAVE_FREQ, 
        'train_weights_path': './cdqn/',
        'test_weights': SAVE_FREQ,
        'test_opponent': 'negamax',
    },
}

In [None]:
# Create weights folders
for a in agents.values():
    if not os.path.isdir(a['train_weights_path']):
        os.mkdir(a['train_weights_path'])

Train agents

In [None]:
for a in agents.values():
    env = ConnectXGymEnv(a['train_opponent'],
                         first=True,
                         invalid_reward=INVALID_REWARD,
                         victory_reward=VICTORY_REWARD,
                         lost_reward=LOST_REWARD,
                         draw_reward=DRAW_REWARD)

    dqn = DQN(env,
              non_local=a['non_local'],
              batch_size=128,
              gamma=0.99,
              eps_start=1.0,
              eps_end=0.01,
              eps_decay=10000,
              memory_size=10000,
              target_update=500,
              learning_rate=1e-2,
              epochs=2,
              constraint_type=a['constraint_type'],
              device='cuda',
              notebook=True)

    dqn.training_loop(a['n_episodes_as_1st_player'],
                      a['n_episodes_as_2nd_player'],
                      save_path=a['train_weights_path'],
                      save_frequency=a['train_save_freq'],
                      render_env=RENDER_ENV,
                      render_waiting_time=RENDER_WAITING_TIME,
                      update_plots_frequency=UPDATE_PLOT_FREQ,
                      plot_duration=True,
                      plot_mean_reward=True,
                      plot_actions_count=True,
                      cumulative_reward_avg_roll_window=REWARD_AVG_ROLL_WINDOW_SIZE)

Initialize the models for testing

In [None]:
for a in agents.values():
    a['model'] = CNNPolicy(BOARD_COLUMNS,
                           (3, BOARD_ROWS, BOARD_COLUMNS),
                           non_local=a['non_local'])
    a['model'].load_state_dict(torch.load(a['train_weights_path'] +
                                          'weights_' +
                                          str(a['test_weights']) +
                                          '.pt'))
    a['model'].eval()

Test agents

In [None]:
show_percentages = True

for a, a_c in agents.items():
    for e in range(1):
        print(f'{e}) Evaluating agent {a}:')

        # Define function used to represent the agent at testing time
        def dqn_agent(observation: dict,
                      configuration: dict) -> int:
            """
            Agent trained using DQN and trained on the images of the game.

            :param observation: turn's data (board status, step number, ...)
            :param configuration: environment's data (steps, board, timeouts, ...) and weights file path
            :return: the column where the stone is inserted
            """

            # print(observation)
            # print(configuration)

            c_type = a_c['constraint_type']
            constraints = Constraints(c_type, observation.mark) if c_type else None

            col = None
            board = np.array(observation.board).reshape(BOARD_ROWS, BOARD_COLUMNS, 1)

            # If LOGIC_PURE detect a critical situation the correct action is performed
            if constraints and c_type is ConstraintType.LOGIC_PURE:
                col = constraints.select_constrained_action(board.squeeze()).unsqueeze(0)

            if constraints is None or \
                    col is None or \
                    (col.sum().item() != 1 and c_type is ConstraintType.LOGIC_PURE):

                col = a_c['model'](torch.from_numpy(convert_state_to_image(board)))

                # Safe policy estimation on the action values
                if constraints and c_type is ConstraintType.SPE:
                    # Compute action masks
                    constraints = constraints.select_constrained_action(board.squeeze())
                    # Set invalid actions to -inf
                    col.squeeze()[constraints == 0] = -np.inf

            if type(col) is not int:
                # Select action suggested by constraints
                col = col.max(1)[1].item()

            # Check if selected column is valid
            is_valid = (observation.board[col] == 0)

            # If not valid, select random move
            if is_valid:
                return col
            else:
                return choice([c for c in range(configuration.columns) if observation.board[c] == 0])

        env = make('connectx', debug=True)
        print(f'{list(env.agents)[0]} VS {list(env.agents)[1]}')

        if not show_percentages:
            env.run([dqn_agent, a_c['test_opponent']])
            env.render(mode = 'ipython')
        else:
            get_win_percentages(dqn_agent,
                                ['random', 'negamax'],
                                player_name=a_c['constraint_type'],
                                n_rounds_as_1st_player=30,
                                n_rounds_as_2nd_player=30)

        print('='* 50)