# Connect X
Competition: www.kaggle.com/c/connectx<br>
Getting started: https://www.kaggle.com/ajeffries/connectx-getting-started<br>
Solving Connect4: http://blog.gamesolver.org/

In [1]:
%%capture
!pip install nbclient --upgrade
!pip install kaggle-environments

## Benchmarks
In this section, we run the latest solver/agent through some benchmarking files to see how long it takes to solve each board configuration. 

In [3]:
import numpy as np

class Table:
    def __init__(self, size):
        self.size = size
        self.reset()

    def reset(self):
        self.keys = np.zeros(self.size, dtype = 'uint64')
        self.values = np.empty(self.size, dtype = 'int')

    def __index(self, key): return key % self.size

    def put(self, key, val):
        i = self.__index(key)
        self.keys[i] = key
        self.values[i] = val
    
    def get(self, key):
        i = self.__index(key)
        key_at_i = self.keys[i]
        if key_at_i == key:
            return self.values[i]
        return 0

# class Entry:
#     def __init__(self):
#         self.key = 0
#         self.val = 0

# class Table:
#     def __init__(self, size):
#         self.size = size
#         self.reset()

#     def reset(self):
#         self.elements = [Entry() for i in range(self.size)]

#     def __index(self, key): return key % self.size

#     def put(self, key, val):
#         el = self.elements[self.__index(key)]
#         el.key = key
#         el.val = val
    
#     def get(self, key):
#         el = self.elements[self.__index(key)]
#         if el.key == key:
#             return el.val
#         return 0

In [9]:
 class Board:
    def __init__(self, rows, cols, inarow):
        assert rows == 6 and cols == 7, 'Board dimensions are invalid'
        assert inarow == 4, 'Board in-a-row are invalid'
        self.rows, self.cols = rows, cols
        self.size = rows * cols
        self.inarow = inarow
        self.position = 0
        self.mask = 0
        self.moves = 0
        self.min_score = -self.size // 2 + 3
        self.max_score = (self.size + 1) // 2 - 3
        self.bottom_mask = Board.bottom(cols, rows)
        self.board_mask = self.bottom_mask * ((1 << rows) - 1)

    def play(self, c):
        self.position ^= self.mask
        self.mask |= self.mask + Board.bottom_mask_col(c, self.rows)
        self.moves += 1
        return self

    def can_win_next(self):
        return self.winning_position() & self.possible()

    def key(self): 
        return self.position + self.mask

    def possible_non_loosing_moves(self):
        assert ~self.can_win_next(), 'you could have won..'
        possible_mask = self.possible()
        opponent_win = self.opponent_winning_position()
        forced_moves = possible_mask & opponent_win
        if forced_moves:
            if forced_moves & (forced_moves - 1): return 0
            else: possible_mask = forced_moves
        return possible_mask & ~(opponent_win >> 1)

    def can_play(self, c):
        return (self.mask & Board.top_mask_col(c, self.rows)) == 0

    def is_winning_move(self, c):
        return self.winning_position() & self.possible() & Board.column_mask(c, self.rows)
        
    def winning_position(self):
        return Board.compute_winning_position(self.position, self.mask, 
                                              self.board_mask, self.rows)

    def opponent_winning_position(self):
        return Board.compute_winning_position(self.position ^ self.mask, 
                                              self.mask, self.board_mask, self.rows)

    def possible(self):
        return (self.mask + self.bottom_mask) & self.board_mask
    
    @staticmethod
    def compute_winning_position(pos, msk, brd_msk, h):
        # vertical;
        r = (pos << 1) & (pos << 2) & (pos << 3);

        # horizontal
        p = (pos << (h + 1)) & (pos << 2*(h+1));
        r |= p & (pos << 3*(h+1));
        r |= p & (pos >> (h+1));
        p = (pos >> (h+1)) & (pos >> 2*(h+1));
        r |= p & (pos << (h+1));
        r |= p & (pos >> 3*(h+1));

        # diagonal 1
        p = (pos << h) & (pos << 2*h);
        r |= p & (pos << 3*h);
        r |= p & (pos >> h);
        p = (pos >> h) & (pos >> 2*h);
        r |= p & (pos << h);
        r |= p & (pos >> 3*h);

        # diagonal 2
        p = (pos << (h+2)) & (pos << 2*(h+2));
        r |= p & (pos << 3*(h+2));
        r |= p & (pos >> (h+2));
        p = (pos >> (h+2)) & (pos >> 2*(h+2));
        r |= p & (pos << (h+2));
        r |= p & (pos >> 3*(h+2));

        return r & (brd_msk ^ msk)

    @staticmethod
    def bottom(w, h):
        if w == 0:
            return 0
        return Board.bottom(w - 1, h) | 1 << (w - 1) * (h - 1)

    @staticmethod
    def top_mask_col(col, h):
        return 1 << ((h - 1) + col * (h + 1))

    @staticmethod
    def bottom_mask_col(col, h):
        return 1 << col * (h + 1)

    @staticmethod
    def column_mask(col, h):
        return ((1 << h) - 1) << col * (h + 1)

    def clone(self):
        new = Board(self.rows, self.cols, self.inarow)
        new.position = self.position
        new.mask = self.mask
        new.moves = self.moves
        return new      
    
    def pretty_print(self): 
        for r in range(self.rows):
            s = ''
            for c in range(self.cols):
                s += str(self.__at(r, c)) + ' '
            print(s)

    def __at(self, r, c):
        # top-left corner is [0, 0].
        # 1 for first player; 2 for second player; 0 empty
        n = self.rows - 1 - r + (self.rows + 1) * c
        populated = (self.mask >> n) & 1 
        if not populated: return 0
        current_player = (self.position >> n) & 1 
        player = 1 if self.moves % 2 == 0 else 2  
        opponent = 2 if player == 1 else 1
        return player if current_player else opponent

    def __set_at(self, r, c, val):
        # top-left corner is [0, 0].
        # does not modify moves counter, use it with caution
        n = self.rows - 1 - r + (self.rows + 1) * c
        if val != 0:
            # set position only if this player
            player = 1 if self.moves % 2 == 0 else 2
            if player == val:
                self.position |= 1 << n
            # set mask anyway
            self.mask |= 1 << n
        else:
            # clear i-th bit from mask and position
            self.mask &= ~(1 << n)
            self.position &= ~(1 << n)
        return self

    @staticmethod
    def from_list(rows, cols, inarow, values):
        n_moves = 0
        for v in values:
            if v != 0: n_moves += 1
        brd = Board(rows, cols, inarow)
        brd.moves = n_moves

        for i, val in enumerate(values):
            if val == 0: continue
            r = i // cols
            c = i % cols
            brd.__set_at(r, c, val)
        return brd

In [13]:
class Solver:
    def __init__(self):
        # self.scores = []
        self.nodes = 0
        self.col_order = [3, 4, 2, 5, 1, 6, 0]
        # self.col_order = [0] * brd.cols
        # for i in range(brd.cols)
        #      self.col_order[i] = (brd.cols // 2 + 
        #                           (1 - 2 * (i % 2)) * (i + 1) // 2)
        self.table = Table(65535) # 2**16 - 1 # 8388593

    def __negamax(self, brd, alpha, beta):
        self.nodes += 1
        
        next = brd.possible_non_loosing_moves()
        if next == 0: return -(brd.size - brd.moves) // 2

        # if board is full, then it's a draw
        if brd.moves == brd.size: return 0

        min_score = -(brd.size - 2 - brd.moves) // 2
        if alpha < min_score:
            alpha = min_score
            if alpha >= beta: return alpha

        # check table if move has been already evaluated
        max_score = (brd.size - 1 - brd.moves) // 2
        val = self.table.get(brd.key())
        if val != 0:
            max_score = val + brd.min_score - 1
        if beta > max_score:
            beta = max_score
            if alpha >= beta: return beta

        for c in self.col_order:
            if next & Board.column_mask(c, brd.rows):
                new_brd = brd.clone()
                new_brd.play(c)
                new_score = -self.__negamax(new_brd, -beta, -alpha)
                if new_score >= beta: return new_score
                if new_score > alpha: alpha = new_score
        
        self.table.put(brd.key(), alpha - brd.min_score + 1)
        return alpha

    def compute_score(self, brd, alpha_beta):
        if alpha_beta == (-1, 1):
            min_score, max_score = -1, 1
        else:
            min_score = -(brd.size - brd.moves) // 2
            max_score = (brd.size + 1 - brd.moves) // 2

        while min_score < max_score:
            mid = min_score + (max_score - min_score) // 2
            if mid <= 0 and min_score // 2 < mid: mid = min_score // 2
            elif mid >= 0 and max_score // 2 > mid: mid = max_score // 2

            r = self.__negamax(brd, mid, mid + 1)
            if r <= mid: max_score = r
            else: min_score = r
        return min_score

    # def __best_score_and_col(self):
    #     best_score, best_col = None, None
    #     for c, score in enumerate(self.scores):
    #         if score is None: continue
    #         if best_score is None or best_score < score:
    #             best_score = score
    #             best_col = c
    #     return best_score, best_col

    # def solve(self, board, alpha_beta = (-21, 21)):
    #     """Solves the board and returns a score and best column to play.

    #     Parameters
    #     ----------
    #     board : Board
    #         Board instance to be solved.
    #     alpha_beta : (int, int), optional
    #         Alpha-beta pruning search bounds.

    #     Returns
    #     -------
    #     (best_score, best_col)
    #         A tuple containing the best score achievable and the relative 
    #         column to play.
    #         The score is:
    #             - 0 for a draw;
    #             - positive if you can win whatever your opponent plays, 
    #                 your score is the number of moves before your win (the 
    #                 faster you win,the higher your score); 
    #             - negative if your opponent can force you to lose, your 
    #                 score is the opposite of the number of moves before you
    #                 lose (the faster you lose, the lower your score).
    #         The column is a 0-based int, or -1 if no move was available. 
    #     """
        
    #     # play each column, and compare its inverse score with best so far
    #     self.scores = []
    #     for c in range(board.cols):
    #         if board.can_play(c):
    #             if board.is_winning_move(c):
    #                 self.scores.append((board.size + 1 - board.moves_cnt) // 2)
    #             else:
    #                 board.play(c)
    #                 self.scores.append(-self.compute_score(board, alpha_beta))     
    #                 board.undo_play(c)
    #         else:
    #             self.scores.append(None)
    #     return self.__best_score_and_col()

### For single match
A single match is benchmarked, from a source string.

In [5]:
import time

def play_recorded_moves(brd, moves):
    for m in moves:
        c = ord(m) - ord('1')
        if c >= 0 and c < brd.cols and brd.can_play(c) and not brd.is_winning_move(c):
            brd.play(c)
        else:
            raise Exception('Invalid string match')
    return brd

def initialize_board(moves, print):
    brd = Board(6, 7, 4)
    brd = play_recorded_moves(brd, moves)
    if print: brd.pretty_print()
    return brd

In [14]:
board = initialize_board('2252576253462244111563365343671351441', True)

solver = Solver()
best_score = solver.compute_score(board, alpha_beta = (-21, 21))
print('Score:', best_score)

1 2 2 2 1 0 0 
2 1 2 1 1 1 0 
1 2 2 1 2 2 0 
1 2 1 2 1 1 0 
2 2 2 1 1 2 2 
1 1 2 1 1 1 2 
Score: 0


### For multiple matches
Multiple (1000 per file) matches are benchmarked, from source files.

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [15]:
# invoke_solver (Solver, Board) -> score, col, scores, nodes
def evaluate_solver(invoke_solver, moves, check_enabled = True, expected_score = None):
    score, col, scores, nodes = invoke_solver(initialize_board(moves, False))
    if check_enabled:
        assert score == expected_score, f'Expected {expected_score}, got {score} instead ({moves}).'
    return score, col, scores, nodes

def evaluate_solver_from_file(invoke_solver, filename, checks_enabled = True):
    def format_n(n): 
        return '{:.3f}'.format(n)

    lines = open(filename, 'r').readlines()
    tot_nodes = 0

    start_t = time.time()
    for line in lines:
        vals = line.strip().split(' ')
        moves, expected = vals[0], int(vals[1])
        tot_nodes += (evaluate_solver(invoke_solver, moves, checks_enabled, expected))[3]
    sec_t = time.time() - start_t

    L = len(lines)
    mean_us = format_n(1e6 * sec_t / L)
    mean_nodes = format_n(tot_nodes / L)
    k_nodes_sec = format_n(tot_nodes / sec_t / 1e3)
    fname = filename.split('/')[-1].split('.')[0]
    if not checks_enabled:
        fname += ' (no checks)'
    print(fname, '- Mean time (us):', mean_us, '- Mean nodes:', mean_nodes, '- K nodes/sec:', k_nodes_sec)
    return

In [16]:
def solver_fcn(brd):
    s = Solver()
    score = s.compute_score(brd, alpha_beta = (-21, 21))
    return score, None, None, s.nodes

# evaluate_solver_from_file(solver_fcn, 'drive/MyDrive/Coding/Colab/Kaggle/ConnectX/Test1_end_easy.txt', checks_enabled = True)
evaluate_solver_from_file(solver_fcn, 'drive/MyDrive/Coding/Colab/Kaggle/ConnectX/Test2_middle_easy.txt', checks_enabled = True)

# alpha-beta pruning negamax
# Test1_end_easy - Mean time (us): 4600.734 - Mean nodes: 283.618 - K nodes/sec: 61.646

# with move exploration order
# Test1_end_easy - Mean time (us): 2273.532 - Mean nodes: 139.659 - K nodes/sec: 61.428

# with bitboard
# Test1_end_easy - Mean time (us): 1621.601 - Mean nodes: 142.967 - K nodes/sec: 88.164

# with transposition table (2^16)
# Test1_end_easy - Mean time (us): 1636.755 - Mean nodes: 95.154 - K nodes/sec: 58.136

# with iterative deepening
# Test1_end_easy - Mean time (us): 2194.660 - Mean nodes: 134.345 - K nodes/sec: 61.215
# Test2_middle_easy - Mean time (us): 67413.906 - Mean nodes: 3962.770 - K nodes/sec: 58.783

# with anticipation of direct losing moves


AssertionError: ignored

In [10]:
def solver_fcn(brd):
    s = Solver()
    score = s.compute_score(brd, alpha_beta = (-1, 1))
    return score, None, None, s.nodes

evaluate_solver_from_file(solver_fcn, 'drive/MyDrive/Coding/Colab/Kaggle/ConnectX/Test1_end_easy.txt', checks_enabled = False)
evaluate_solver_from_file(solver_fcn, 'drive/MyDrive/Coding/Colab/Kaggle/ConnectX/Test2_middle_easy.txt', checks_enabled = False)

# alpha-beta pruning negamax
# Test1_end_easy (no checks) - Mean time (us): 3488.851 - Mean nodes: 222.856 - K nodes/sec: 63.877

# with move exploration order
# Test1_end_easy (no checks) - Mean time (us): 1763.980 - Mean nodes: 107.145 - K nodes/sec: 60.740

# with bitboard
# Test1_end_easy (no checks) - Mean time (us): 1295.302 - Mean nodes: 110.090 - K nodes/sec: 84.992

# with transposition table (2^16)
# Test1_end_easy (no checks) - Mean time (us): 1256.443 - Mean nodes: 70.732 - K nodes/sec: 56.295

# with iterative deepening
# Test1_end_easy (no checks) - Mean time (us): 1413.012 - Mean nodes: 81.463 - K nodes/sec: 57.652
# Test2_middle_easy - Mean time (us): 67413.906 - Mean nodes: 3962.770 - K nodes/sec: 58.783

Test1_end_easy (no checks) - Mean time (us): 1413.012 - Mean nodes: 81.463 - K nodes/sec: 57.652


KeyboardInterrupt: ignored

## Modeling an agent

In [None]:
import kaggle_environments as kg

Loading environment football failed: No module named 'gfootball'


In [None]:
env = kg.make('connectx', debug = True)

def play_match(agent, opponent, env = env, show_replay = True, play_first = True):
    env.reset() 
    agents = [agent, opponent] if play_first else [opponent, agent]
    names =  [agent.__name__, opponent] if play_first else [opponent, agent.__name__]
    print('1st player (Ks) =', names[0])
    print('2nd player (Ds) =', names[1])
    env.run(agents)
    if show_replay:
        env.render(mode = 'ipython', width = 500, height = 450)

### Random Agent playing for the win and blocking

This agent playes accordingly to this strategy:
1. if there's a move that makes it win, play it;
2. otherwise, if there's a move that lets the opponent win, block it;
3. otherwise, play a random non-full column

In [None]:
def pp_agent(obs, conf):

    print(obs.board)
    
    # --------------------------------------- #

    verbose = True
    def report(*s): 
        if verbose: print(*s)

    # --------------------------------------- #

    class Board:
        def __init__(self, rows, cols, inarow, player_mark, board, col_heights = None):
            self.rows, self.cols, self.inarow, self.player = rows, cols, inarow, player_mark
            self.values = board[:]  # create hard-copy
            if col_heights is None:
                self.col_heights = [self.__compute_col_height(c) for c in range(cols)]
            else:
                self.col_heights = col_heights[:] # create hard-copy
        
        # compute height of a given column
        def __compute_col_height(self, c):
            h, i = 0, self.cols * self.rows - self.cols + c
            while i >= 0 and self.values[i] != 0:
                h += 1
                i -= self.cols
            return h

        # check if a position is valid
        def __is_valid_pos(self, r, c): return r >= 0 and c >= 0 and r < self.rows and c < self.cols

        # get value of baord at row and col. Bottom-left is [0,0], but in
        # the 1D board top-left is [0]. That's why rows need to be flipped
        def at(self, r, c): return self.values[(self.rows - 1 - r) * self.cols + c]

        # check if a column can be played, i.e., non-full
        def can_play(self, c): return self.values[c] == 0

        # play a playable column
        def play(self, c):
            r = self.col_heights[c]
            self.values[(self.rows - 1 - r) * self.cols + c] = self.player
            self.col_heights[c] = r + 1
            return

        # check if the current players plays a column, the game is won
        def is_winning_move(self, c):
            r = self.col_heights[c]

            # check vertical alignment
            if r >= (self.inarow - 1):
                cnt = 0
                while self.__is_valid_pos(r - cnt - 1, c) and self.at(r - cnt -1, c) == self.player:
                    cnt += 1
                    if cnt >= (self.inarow - 1): 
                        return True

            # check horizontal alignment
            for dy in [-1, 0, 1]:
                cnt = 0
                for dx in [-1, 1]:
                    x = c + dx
                    y = r + dx * dy
                    while self.__is_valid_pos(y, x) and self.at(y, x) == self.player:
                        x += dx
                        y += dx * dy
                        cnt += 1
                        if cnt >= (self.inarow - 1): 
                            return True
            return False
        
        # clone this into a board where the opponent is the player
        def to_opponent_board(self):
            opponent = 1 if self.player == 2 else 2
            return Board(self.rows, self.cols, self.inarow, 
                         opponent, self.values, self.col_heights)

    # --------------------------------------- #

    # check if any column is the winning, and if so play it
    my_board = Board(conf.rows, conf.columns, conf.inarow, obs.mark, obs.board)
    for c in range(my_board.cols):
        if my_board.can_play(c) and my_board.is_winning_move(c):
            report('{:2d}'.format(obs.step + 1), '-', f'playing {c} to win game.')
            return c

    # check if any column is the winning one for the opponent, and if so play it
    opponent_board = my_board.to_opponent_board()
    for c in range(opponent_board.cols):
        if opponent_board.can_play(c) and opponent_board.is_winning_move(c):
            report('{:2d}'.format(obs.step + 1), '-', f'playing {c} to block opponent.')
            return c

    # random non-full column picker
    import random as rng
    def get_rnd_col(): 
        return rng.choice([c for c in range(conf.columns) if obs.board[c] == 0])    

    a = get_rnd_col()
    report('{:2d}'.format(obs.step + 1), '-', f'playing {a} randomly.')
    return a

    # --------------------------------------- #
    return 0

play_match(pp_agent, 'random', show_replay = True, play_first = True)

1st player (Ks) = pp_agent
2nd player (Ds) = random
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 1 - playing 0 randomly.
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0]
 3 - playing 4 randomly.
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, 0, 1, 0, 2]
 5 - playing 0 randomly.
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 2, 2, 0, 1, 0, 2]
 7 - playing 6 randomly.
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 2, 2, 2, 1, 0, 2]
 9 - playing 5 randomly.
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 2, 1, 1, 2, 2, 2, 1, 1, 2]
11 - playing 4 randomly.
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

### Negamax Agent
Agent that chooses the best action according to a minmax algorithm (negamax variant). The algorithm is in its exact form, so it uses no pruning. Since it takes too long to solve an entire board, we use an heuristic when the maximum depth of the search is hit. <br>
The heuristic is taken from the "Random agent playing for the win and blocking".

In [None]:
def pp_agent(obs, conf):
    
    # ----------------------------------------------------------------------- #

    # verbose = False

    # def report(*s): 
    #     if verbose: print('{:2d} - '.format(obs.step + 1), *s)

    # ----------------------------------------------------------------------- #

    class Board:
        """A class representing a ConnectX board."""

        def __init__(self, rows, cols, inarow, player_mark, moves_cnt, board, 
                        col_heights = None):
            """Initializes a new instance of a Board."""
            self.rows = rows
            self.cols = cols
            self.inarow = inarow
            self.player = player_mark
            self.moves_cnt = moves_cnt
            self.values = board[:]  # create hard-copy
            if col_heights is None:
                self.col_heights = [self.__get_col_h(c) for c in range(cols)]
            else:
                self.col_heights = col_heights[:] # create hard-copy
        
        def __get_col_h(self, c):
            # returns the height of a given column
            h = 0
            i = self.cols * self.rows - self.cols + c
            while i >= 0 and self.values[i] != 0:
                h += 1
                i -= self.cols
            return h

        def __is_valid_pos(self, r, c): 
            # checks if a position is valid
            return r >= 0 and c >= 0 and r < self.rows and c < self.cols

        def at(self, r, c):
            """Gets the board's value at the given row and column."""
            # bottom-left is [0,0], but in the 1D board top-left is [0]. 
            # That's why rows need to be flipped
            return self.values[(self.rows - 1 - r) * self.cols + c]

        def set_at(self, r, c, val): 
            """Sets the board's value at the given row and column."""
            self.values[(self.rows - 1 - r) * self.cols + c] = val
            return self

        def switch_player(self):
            """Switches the board player to the opponent."""
            self.player = 1 if self.player == 2 else 2
            return self

        def can_play(self, c): 
            """Checks if a given column can be played (is non full)."""
            return self.values[c] == 0

        def play(self, c):
            """Plays a column for the current player and switches player."""
            r = self.col_heights[c]
            self.set_at(r, c, self.player)
            self.col_heights[c] = r + 1
            self.moves_cnt += 1
            self.switch_player()
            return self

        def undo_play(self, c):
            """Undo a play at a column and switches back player."""
            r = self.col_heights[c] - 1
            self.set_at(r, c, 0)
            self.col_heights[c] = r
            self.moves_cnt -= 1
            self.switch_player()
            return self

        def is_winning_move(self, c):
            """Checks if playing this column win the game."""
            # check vertical alignment
            r = self.col_heights[c]
            if r >= (self.inarow - 1):
                cnt = 0
                while (self.__is_valid_pos(r - cnt - 1, c) and 
                        self.at(r - cnt -1, c) == self.player):
                    cnt += 1
                    if cnt >= (self.inarow - 1): 
                        return True
            # check horizontal alignment
            for dy in [-1, 0, 1]:
                cnt = 0
                for dx in [-1, 1]:
                    x = c + dx
                    y = r + dx * dy
                    while (self.__is_valid_pos(y, x) and
                            self.at(y, x) == self.player):
                        x += dx
                        y += dx * dy
                        cnt += 1
                        if cnt >= (self.inarow - 1): 
                            return True
            return False
            
        def clone(self):
            """Creates a deep clone of the current Board."""
            return Board(self.rows, self.cols, self.inarow, self.player,
                            self.moves_cnt, self.values, self.col_heights)
            
        def pretty_print(self, players_dict = None):
            """Prints the current Board's disks."""
            for r in range(self.rows):
                s = ''
                for c in range(self.cols):
                    val = self.values[r * self.cols + c]
                    if players_dict is not None and (val == 1 or val == 2):
                        val = players_dict[val]
                    s += f'{val} '
                print(s)
            return

    # ----------------------------------------------------------------------- #

    class Solver:
        """A solver able to solve a ConnectX Board."""

        def __init__(self):
            """Initializes a new instance of Solver."""
            self.nodes_cnt = 0
            self.best_col = -1

        def __negamax(self, brd, depth):
            # solves board with negamax. Returns best score and column
            self.nodes_cnt += 1
            size = brd.cols * brd.rows

            # if board is full, then it's a draw
            if (brd.moves_cnt == size): return 0, -1

            # check if we can win with this move
            for c in range(brd.cols):
                if brd.can_play(c) and brd.is_winning_move(c):
                    return (size + 1 - brd.moves_cnt) // 2, c

            # check if search has reached max depth; if so, call heuristic 
            if (depth == 0): return self.__heuristic(brd)

            # initialize best score as lower bound, i.e., the worst score
            best_score = -size # / 2 (??) 
            best_col = -1
            
            for c in range(brd.cols):
                if brd.can_play(c):
                    brd.play(c)
                    score = -(self.__negamax(brd, depth - 1)[0])
                    brd.undo_play(c)
                    if score > best_score: 
                        best_score = score
                        best_col = c
            return best_score, best_col

        def __heuristic(self, brd):
            # solves heuristically the board, i.e., non exactly.
            # no need to check for winning col, because the negamx would have
            # already played it.
            # check if any column lets the opponent win; if so, block it.
            size = brd.cols * brd.rows // 2
            brd.switch_player()
            for c in range(brd.cols):
                if brd.can_play(c) and brd.is_winning_move(c):
                    brd.switch_player()
                    return -(size + 1 - brd.moves_cnt) // 2, c
        
            # pick a random, non-full column
            import random as rng
            nonfull_cols = [c for c in range(brd.cols) if brd.can_play(c)]
            # by returning an absurdly positive score, this move is forced to 
            # be the best one.
            return size, rng.choice(nonfull_cols)

        def solve(self, board, max_depth = None):
            """Solves the board and returns a score and best column to play.

            Parameters
            ----------
            board : Board
                Board instance to be solved.
            max_depth : int or None, optional
                Maximum depth the solver shall explore. If None, unbounded. If 
                set to 0, only the heuristic will be used to compute the score.

            Returns
            -------
            (best_score, best_col)
                A tuple containing the best score achievable and the relative 
                column to play.
                The score is:
                    - 0 for a draw;
                    - positive if you can win whatever your opponent plays, 
                      your score is the number of moves before your win (the 
                      faster you win,the higher your score); 
                    - negative if your opponent can force you to lose, your 
                      score is the opposite of the number of moves before you
                      lose (the faster you lose, the lower your score).
                The column is a 0-based int, or -1 if no move was available. 
            """
            self.nodes_cnt = 0
            max_d = max_depth if max_depth is not None else -1
            return self.__negamax(board, max_d)
    
    # ----------------------------------------------------------------------- #

    b = Board(conf.rows, conf.columns, conf.inarow, 
              obs.mark, obs.step, obs.board)
    _, a = Solver().solve(b, max_depth = 5)
    return a

play_match(pp_agent, 'random', show_replay = True, play_first = True)

1st player (Ks) = pp_agent
2nd player (Ds) = random


### Negamax Agent with Pruning
Minimax version with alpha-beta pruning and max depth in place.

In [None]:
def pp_agent(obs, conf):
    
    # ----------------------------------------------------------------------- #

    # verbose = False

    # def report(*s): 
    #     if verbose: print('{:2d} - '.format(obs.step + 1), *s)

    # ----------------------------------------------------------------------- #

    class Board:
        """A class representing a ConnectX board."""

        def __init__(self, rows, cols, inarow, player_mark, moves_cnt, board, 
                        col_heights = None):
            """Initializes a new instance of a Board."""
            self.rows = rows
            self.cols = cols
            self.size = rows * cols
            self.inarow = inarow
            self.player = player_mark
            self.moves_cnt = moves_cnt
            self.values = board[:]  # create hard-copy
            if col_heights is None:
                self.col_heights = [self.__get_col_h(c) for c in range(cols)]
            else:
                self.col_heights = col_heights[:] # create hard-copy
        
        def __get_col_h(self, c):
            # returns the height of a given column
            h = 0
            i = self.size - self.cols + c
            while i >= 0 and self.values[i] != 0:
                h += 1
                i -= self.cols
            return h

        def __is_valid_pos(self, r, c): 
            # checks if a position is valid
            return r >= 0 and c >= 0 and r < self.rows and c < self.cols

        def at(self, r, c):
            """Gets the board's value at the given row and column."""
            # bottom-left is [0,0], but in the 1D board top-left is [0]. 
            # That's why rows need to be flipped
            return self.values[(self.rows - 1 - r) * self.cols + c]

        def set_at(self, r, c, val): 
            """Sets the board's value at the given row and column."""
            self.values[(self.rows - 1 - r) * self.cols + c] = val
            return self

        def switch_player(self):
            """Switches the board player to the opponent."""
            self.player = 1 if self.player == 2 else 2
            return self

        def can_play(self, c): 
            """Checks if a given column can be played (is non full)."""
            return self.values[c] == 0

        def play(self, c):
            """Plays a column for the current player and switches player."""
            r = self.col_heights[c]
            self.set_at(r, c, self.player)
            self.col_heights[c] = r + 1
            self.moves_cnt += 1
            self.switch_player()
            return self

        def undo_play(self, c):
            """Undo a play at a column and switches back player."""
            r = self.col_heights[c] - 1
            self.set_at(r, c, 0)
            self.col_heights[c] = r
            self.moves_cnt -= 1
            self.switch_player()
            return self

        def is_winning_move(self, c):
            """Checks if playing this column win the game."""
            # check vertical alignment
            r = self.col_heights[c]
            if r >= (self.inarow - 1):
                cnt = 0
                while (self.__is_valid_pos(r - cnt - 1, c) and 
                        self.at(r - cnt -1, c) == self.player):
                    cnt += 1
                    if cnt >= (self.inarow - 1): 
                        return True
            # check horizontal alignment
            for dy in [-1, 0, 1]:
                cnt = 0
                for dx in [-1, 1]:
                    x = c + dx
                    y = r + dx * dy
                    while (self.__is_valid_pos(y, x) and
                            self.at(y, x) == self.player):
                        x += dx
                        y += dx * dy
                        cnt += 1
                        if cnt >= (self.inarow - 1): 
                            return True
            return False
            
        def clone(self):
            """Creates a deep clone of the current Board."""
            return Board(self.rows, self.cols, self.inarow, self.player,
                            self.moves_cnt, self.values, self.col_heights)
            
        def pretty_print(self, players_dict = None):
            """Prints the current Board's disks."""
            for r in range(self.rows):
                s = ''
                for c in range(self.cols):
                    val = self.values[r * self.cols + c]
                    if players_dict is not None and (val == 1 or val == 2):
                        val = players_dict[val]
                    s += f'{val} '
                print(s)
            return

    # ----------------------------------------------------------------------- #

    class Solver:
        """A solver able to solve a ConnectX Board."""

        def __init__(self):
            """Initializes a new instance of Solver."""
            self.nodes_cnt = 0
            self.scores = []

        def __negamax(self, brd, alpha, beta, depth):
            # solves board with negamax. Returns best score and column
            self.nodes_cnt += 1

            # if board is full, then it's a draw
            if (brd.moves_cnt == brd.size): return 0

            # check if we can win with this move
            for c in range(brd.cols):
                if brd.can_play(c) and brd.is_winning_move(c):
                    return (brd.size + 1 - brd.moves_cnt) // 2

            # get rid of the search window [max_score, beta]
            max_score = (brd.size - 1 - brd.moves_cnt) // 2 
            if beta > max_score:
                beta = max_score
                if alpha >= beta: 
                    return beta

            # check if search has reached max depth; if so, call heuristic 
            if (depth == 0): return self.__heuristic(brd)

            # compute score of all possible moves, keep best
            for c in range(brd.cols):        
                if brd.can_play(c):
                    # new_brd = brd.clone()
                    # new_brd.play(c)
                    # score = -self.__negamax(new_brd, -beta, -alpha, depth - 1)
                    brd.play(c)
                    score = -self.__negamax(brd, -beta, -alpha, depth - 1)
                    brd.undo_play(c)
                    if score >= beta: return score
                    if score > alpha: alpha = score;
            return alpha

        def __heuristic(self, brd):
            # solves heuristically the board, i.e., non exactly.
            # no need to check for winning col, because the negamx would have
            # already played it.
            # check if any column lets the opponent win; if so, block it.
            brd.switch_player()
            for c in range(brd.cols):
                if brd.can_play(c) and brd.is_winning_move(c):
                    brd.switch_player()
                    return -(brd.size + 1 - brd.moves_cnt) // 2
        
            # pick a random score
            import random as rng
            return rng.randint(-(brd.size - 1 - brd.moves_cnt) // 2, 4)

        def compute_score(self, brd, alpha_beta, max_depth):
            # prime and start the solver; returns the best score for this board
            self.nodes_cnt = 0
            return self.__negamax(brd, alpha_beta[0], alpha_beta[1], max_depth)

        def __best_score_and_col(self):
            best_score, best_col = None, None
            for c, score in enumerate(self.scores):
                if score is None: continue
                if best_score is None or best_score < score:
                    best_score = score
                    best_col = c
            return best_score, best_col

        def solve(self, board, alpha_beta = (-21, 21), max_depth = 5):
            """Solves the board and returns a score and best column to play.

            Parameters
            ----------
            board : Board
                Board instance to be solved.
            alpha_beta : (int, int), optional
                Alpha-beta pruning search bounds.

            Returns
            -------
            (best_score, best_col)
                A tuple containing the best score achievable and the relative 
                column to play.
                The score is:
                    - 0 for a draw;
                    - positive if you can win whatever your opponent plays, 
                        your score is the number of moves before your win (the 
                        faster you win,the higher your score); 
                    - negative if your opponent can force you to lose, your 
                        score is the opposite of the number of moves before you
                        lose (the faster you lose, the lower your score).
                The column is a 0-based int, or -1 if no move was available. 
            """
            
            # play each column, and compare its inverse score with best so far
            self.scores = []
            for c in range(board.cols):
                if board.can_play(c):
                    if board.is_winning_move(c):
                        self.scores.append((board.size + 1 - board.moves_cnt) // 2)
                    else:
                        # new_brd = board.clone()
                        # new_brd.play(c)
                        # self.scores.append(-self.compute_score(new_brd, alpha_beta, max_depth))     
                        board.play(c)
                        self.scores.append(-self.compute_score(board, alpha_beta, max_depth))     
                        board.undo_play(c)
                else:
                    self.scores.append(None)
            return self.__best_score_and_col()

    # ----------------------------------------------------------------------- #

    b = Board(conf.rows, conf.columns, conf.inarow, 
              obs.mark, obs.step, obs.board)
    _, a = Solver().solve(b, alpha_beta = (-21, 21), max_depth = 5)
    return a

play_match(pp_agent, 'random', show_replay = True, play_first = False)

1st player (Ks) = random
2nd player (Ds) = pp_agent


## Evaluating an Agent

In [None]:
def mean_reward(rewards, idx):
    return sum(r[idx] if r[idx] is not None else 0 for r in rewards) / len(rewards)

def evaluate_agent(agent, opponent, num_episodes = 10, switch_sides = True):
    r1_mean =  mean_reward(kg.evaluate('connectx', [agent, opponent], num_episodes = num_episodes), 0)
    print(f'{agent.__name__} vs {opponent}:', r1_mean)
    if switch_sides:
        r2_mean =  mean_reward(kg.evaluate('connectx', [opponent, agent], num_episodes = num_episodes), 1)
        print(f'{opponent} vs {agent.__name__}:', r2_mean)
        print('Average:', (r1_mean + r2_mean) / 2)

evaluate_agent(pp_agent, 'random', num_episodes = 50)
evaluate_agent(pp_agent, 'negamax', num_episodes = 5)

pp_agent vs random: 0.92
random vs pp_agent: 0.6
Average: 0.76
pp_agent vs negamax: -0.6
negamax vs pp_agent: 0.0
Average: -0.3


## Submission

In [None]:
import inspect
import os
import sys

def write_agent_to_file(function, file):
    with open(file, 'w') as f:
        f.write(inspect.getsource(function))
        print(function, 'written to', file)

# write agent to file
write_agent_to_file(pp_agent, 'submission.py')

# load agent from file
out = sys.stdout
submission = kg.utils.read_file('submission.py')
agent = kg.agent.get_last_callable(submission)
sys.stdout = out

# try play the agent
env = kg.make('connectx', debug = True)
env.run([agent, agent])
print(env)
print('Success!' if env.state[0].status == env.state[1].status == 'DONE' else 'Failed...')

<function pp_agent at 0x7f9f7924ae18> written to submission.py
<kaggle_environments.core.Environment object at 0x7f9f790e7f60>
Success!
