# Connect X implementation using kaggle environment and MCTS

In [3]:
from kaggle_environments import evaluate, make

env = make("connectx", debug=True)

## [Board conversion to the bitmap](https://towardsdatascience.com/creating-the-perfect-connect-four-ai-bot-c165115557b0) in order to optimize speed and storage

In [4]:
import numpy as np

def get_position_mask_bitmap(board, player, column_size, row_size):
    position, mask = '', ''
    board_np = np.reshape(board, (row_size, column_size))
#     print(board_np)
    for j in range(column_size - 1, -1, -1):
        # Add 0-bits to sentinel 
        mask += '0'
        position += '0'
        # Start with bottom row
        for i in range(0, row_size):
            mask += ['0', '1'][int(board_np[i, j] != 0)]
            position += ['0', '1'][int(board_np[i, j] == player)]
#     print("sum len", column_size* row_size, len(position))    
    
    return int(position, 2), int(mask, 2)

def connected_four(position):
    # Horizontal check
    m = position & (position >> 7)
    if m & (m >> 14):
#         print("Horizontal")
        return True
    # Diagonal \
    m = position & (position >> 6)
    if m & (m >> 12):
#         print(r"Diagonal \\")
        return True
    # Diagonal /
    m = position & (position >> 8)
    if m & (m >> 16):
#         print(r"Diagonal /")
        return True
    # Vertical
    m = position & (position >> 1)
    if m & (m >> 2):
#         print(r"Vertical")
        return True
    # Nothing found
    return False

def make_move(position, mask, col):
    opponent_position = get_opponent_position(position, mask)
    new_mask = mask | (mask + (1 << (col*7)))
    new_position = get_opponent_position(opponent_position, new_mask)
    is_a_win = connected_four(new_position)
    return new_position, new_mask, is_a_win

def get_opponent_position(position, mask):
    return position ^ mask

def get_valid_moves(mask, columns, rows):
    """returns an array with number of valid columns"""
    top_column_bit = 1 << (rows - 1)
    return [c for c in range(columns) if not (top_column_bit << ((rows+1) * c)) & mask]

def get_player_boolean(player_number):
    return True if player_number == 1 else False

def get_2d_board(position, columns, rows):
    output_array = np.zeros((rows+1, columns), dtype=int)
    i = 0
    position_str = format(position, "b").zfill(columns*(rows+1))
#     print("position_str", position_str, len(position_str))
    for c in range(columns-1, -1, -1):
        for r in range(rows+1):
            output_array[r][c] = int(position_str[i])
            i += 1
    return output_array

def print_board(position, columns, rows):
    print(get_2d_board(position, columns, rows))

In [5]:
## tests
example_board = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 1, 1, 0, 1, 0]
example_columns = 7
example_rows = 6
example_is_play_first = True
example_player_number = 1 if example_is_play_first == True else 2

example_bitboard, example_mask = get_position_mask_bitmap(example_board, example_player_number, example_columns, example_rows)
print_board(example_bitboard, example_columns, example_rows)
print("start", bin(example_mask))
print(connected_four(example_bitboard))
example_bitboard, example_mask, example_win = make_move(example_bitboard, example_mask, 0)
print_board(example_bitboard, example_columns, example_rows)
print("move 0", bin(example_mask))
print(example_win)
example_bitboard, example_mask, example_win = make_move(example_bitboard, example_mask, 4)
print_board(example_bitboard, example_columns, example_rows)
print(bin(example_mask))
print(example_win)

[[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 1 1 0 1 0]]
start 0b100000000000001000000100000000000001
False
[[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [1 0 0 0 0 0 0]
 [0 0 1 1 0 1 0]]
move 0 0b100000000000001000000100000000000011
False
[[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [1 0 0 0 0 0 0]
 [0 0 1 1 1 1 0]]
0b100000010000001000000100000000000011
True


## Tree structure

In [6]:
import math 
import numpy as np
from treelib import Node, Tree
from random import choice

class TreeNodeCounter:
    def __init__(self):
        self.counter = 0

    def get_counter(self):
        self.counter += 1
        return self.counter

class BoardNode:
    """Representation of a game decision tree.
        
    Attributes:
        move : An integer number of the column for the move
        visited : An integer count of visits for this node
        win : An integer count of wins for this node
        bitboard : An integer with base 2 that represents moves of current player
        bitboard_mask : An integer with base 2 that represents the filled spaces 
        is_player_first : A boolean that represents the number of the player
        parent : A parent tree node 
        layer : An integer for the layer where node is
        terminal_state : A boolean to mark final leaves
    """
    
    def __init__(self, move, bitboard, bitboard_mask, is_player_first, terminal_state, parent=None):
        # params
        self.move = move
        self.is_player_first = is_player_first
        self.bitboard = bitboard
        self.bitboard_mask = bitboard_mask
        self.terminal_state = terminal_state
        self.parent = parent
        if parent:
            self.layer = parent.layer + 1
        else:
            self.layer = 0
        # init
        self.children = []
        self.visited = 0
        self.win = 0

    def add_child(self, node):
        assert isinstance(node, BoardNode)
        self.children.append(node)
        
    def get_uct(self, total_simulations_number, exploration_parameter=2):
        """returns the value of UCT(Upper Confidence bounds for Trees) for the tree node"""
        if self.visited == 0:
            return -1
        return self.win/self.visited + math.sqrt(exploration_parameter*np.log(total_simulations_number)/self.visited)

    def valid_moves(self, columns, rows):
        """returns an array with number of valid columns"""
        return get_valid_moves(self.bitboard_mask, columns, rows)
        
    def create_children(self, columns, rows):
        """appends valid children to self.children
        
        Returns: a boolean for containing of a terminal state
        """
        # get all valid moves
        valid_moves = self.valid_moves(columns, rows)
        # create trees with this moves
        contains_terminal_state = False
        for move in valid_moves:
            new_position, new_mask, win = make_move(self.bitboard, self.bitboard_mask, move)
            child_position = get_opponent_position(new_position, new_mask)
            self.add_child(BoardNode(move, child_position, new_mask, not self.is_player_first, win, parent=self))
            if win:
                contains_terminal_state = True
        return contains_terminal_state
            
    def play_out(self, columns, rows, is_current_player_first):
        """returns an integer for the game result.
            1, 0.5, 0 for win, tie, loss respectively
        """
        game_over = False
        board = self.bitboard
        mask = self.bitboard_mask
        is_player_first = self.is_player_first
        while not game_over:
            valid_moves = get_valid_moves(mask, columns, rows)
            if not valid_moves:
                return 0.5
            move = choice(valid_moves)
            board, mask, game_over = make_move(board, mask, move)
            board = get_opponent_position(board, mask)
            is_player_first = not is_player_first
        if is_player_first == is_current_player_first:
            return 1
        else:
            return 0
        
    def has_children(self):
        if self.children:
            return True
        return False
    
    def update(self, result, times_visited):
        self.win = self.win + result
        self.visited += times_visited

    def get_performance(self, tsm):
        return self.win / tsm

    def get_best_performance_child(self, tsm):
        best_child = self.children[0]
        for child in self.children[1:]:
            if child.get_performance(tsm) > best_child.get_performance(tsm):
                best_child = child
        return best_child

    def get_best_uct_child(self, tsm):
        biggest_uct_child = self.children[0]
        for child in self.children[1:]:
            if child.get_uct(tsm) > biggest_uct_child.get_uct(tsm):
                biggest_uct_child = child
        return biggest_uct_child
    
    def get_worst_uct_child(self, tsm):
        smallest_uct_child = self.children[0]
        for child in self.children[1:]:
            if child.get_uct(tsm) < smallest_uct_child.get_uct(tsm):
                smallest_uct_child = child
        return smallest_uct_child

    # def print_children(self, visual_tree, parent_number=0, prev_counter=None):
    #     if not prev_counter:
    #         current_counter = TreeNodeCounter()
    #     else:
    #         current_counter = prev_counter
    #     my_number = current_counter.get_counter()
    #     node_string = f"layer: {self.layer} move: {self.move} " \
    #                   f"win: {self.win} visited: {self.visited} " \
    #                   f"terminal: {self.terminal_state} player:{self.is_player_first}"
    #     if parent_number == 0:
    #         visual_tree.create_node(node_string, my_number)
    #     else:
    #         visual_tree.create_node(node_string, my_number, parent=parent_number)
    #     for child in self.children:
    #         child.print_children(visual_tree, parent_number=my_number, prev_counter=current_counter)

In [7]:
example_position = 0b001010001110000101010100011010100000011000011001
example_position_mask = 0b111111011111100111110111111011111101111110111111

example_columns = 7
example_rows = 6
example_is_play_first = True
example_player_number = 1 if example_is_play_first == True else 2

example_main_tree = BoardNode(-1, example_position, example_position_mask, example_is_play_first, False)
print_board(example_position, example_columns, example_rows)
print(bin(example_position_mask))
print(example_main_tree.valid_moves(example_columns, example_rows))

example_main_tree.create_children(example_columns, example_rows)
print("children len",len(example_main_tree.children))
for i, ch in enumerate(example_main_tree.children):
    print("bitboard", i, bin(ch.bitboard), ch.play_out(example_columns, example_rows, example_is_play_first))
    
example_main_tree.play_out(example_columns, example_rows, example_is_play_first)

[[0 0 0 0 0 0 0]
 [0 0 1 1 0 0 0]
 [1 0 0 0 1 1 0]
 [1 1 1 0 0 1 1]
 [0 1 0 0 1 1 0]
 [0 0 0 1 0 0 1]
 [1 0 0 1 1 0 0]]
0b111111011111100111110111111011111101111110111111
[4]
children len 1
bitboard 0 0b110101010001100010100011100001011101100110100110 0.5


0.5

In [8]:
## NR 
## 6 nodes exanded nodes with 500 simulations each is 3000 simulations which takes 23 seconds
## if 2 seconds are given -> 120 seconds -> 120 / 23 -> 5 full times -> 5*6 30 nodes can be expanded

import time

example_position = 0b0
example_position_mask = 0b0
# example position
# example_position = 0b1000000000000000000000000000000000110111011
# example_position_mask = 0b111000001100000010000001000000000001110111111
example_columns = 7
example_rows = 6
example_is_play_first = True
example_player_number = 1 if example_is_play_first == True else 2

example_main_tree = BoardNode(-1, example_position, example_position_mask, example_is_play_first, False)

scores = [0,0,0]
score_dict = {0:0, 0.5:1, 1:2}
example_t0 = time.time()
samples = 100000
for _ in range(samples):
    example_result = example_main_tree.play_out(example_columns, example_rows, example_is_play_first)
    scores[score_dict[example_result]] += 1
example_t1 = time.time()
example_total_time = example_t1 - example_t0
print(scores)
print(f"total time for {samples} samples {example_total_time}")
denominator = scores[score_dict[1]] + scores[score_dict[0]] + scores[score_dict[0.5]]
print("win prob", scores[score_dict[1]] / denominator)
print("tie prob", scores[score_dict[0]] / denominator)
example_total_time

[55784, 259, 43957]
total time for 100000 samples 8.313606977462769
win prob 0.43957
tie prob 0.55784


8.313606977462769

## Search

In [9]:
import time

def m_c_t_s(main_tree, columns, rows, timeout, simulations_per_node=500):
    # total simulation number
    tsm = 0
    t0 = time.time()
    total_time = 0
    total_loops = 0
    while timeout - 0.1 > total_time:
        node = select(main_tree, tsm, main_tree.is_player_first)
        if node.terminal_state:
            break
        contains_terminal_state = expand(node, columns, rows)
        # main_tree.is_player_first is the boolean of the opponent
        # not node.is_player_first is to check, because the children are of interest
        # so if children have a terminal state and it is opponents turn, so we lose
        # backpropagate all nodes with zero wins
        if contains_terminal_state and not node.is_player_first == main_tree.is_player_first:
            for expanded_node in node.children:
                backpropagate(expanded_node, 0, simulations_per_node)
        else:
            for expanded_node in node.children:
                result_sum = 0
                for _ in range(simulations_per_node):
                    result_sum += expanded_node.play_out(columns, rows, not main_tree.is_player_first)
                backpropagate(expanded_node, result_sum, simulations_per_node)
                tsm += simulations_per_node
        total_loops += 1
        # update time
        t1 = time.time()
        total_time = t1-t0
#     print(f"total time for main mcts routine {total_time} with {total_loops} loops tsm {tsm}")
#     pretty_tree = Tree()
#     main_tree.print_children(pretty_tree)
#     pretty_tree.save2file("C:\\code\\kaggle\\ConnectX\\trees.txt")
    return main_tree.get_best_performance_child(tsm).move

def select(tree_node, tsm, is_current_player_first):
    if tree_node.has_children():
        # minimize uct if node of opponent
        if tree_node.is_player_first != is_current_player_first:
            best_matching_child = tree_node.get_worst_uct_child(tsm)
        else:
            best_matching_child = tree_node.get_best_uct_child(tsm)
        return select(best_matching_child, tsm, is_current_player_first)
    return tree_node

def expand(tree_node, columns, rows):
    return tree_node.create_children(columns, rows)

def backpropagate(tree_node, result, simulations_per_node):
    node_to_update = tree_node
    while node_to_update:
        node_to_update.update(result, simulations_per_node)
        node_to_update = node_to_update.parent

## Agent body

In [10]:
from random import choice

def init_graph(board, mark, columns, rows):
    """returns an initial game tree"""
    position, position_mask = get_position_mask_bitmap(board, mark, columns, rows)
    return BoardNode(-1, position, position_mask, not get_player_boolean(mark), False)

# This agent uses monte carlo tree search to pick a column
def my_agent(observation, configuration):
    columns = configuration.columns
    rows = configuration.rows
    # Which player the agent is playing as (1 or 2).
    mark = observation.mark
    # Board as an array with columns after one after another.
    board = observation.board
    # with open("C:\\code\\kaggle\\ConnectX\\trees.txt", "at") as f:
    #     board_np = np.reshape(board, (rows, columns))
    #     f.write("my player mark is {}\n".format(mark))
    #     np.savetxt(f, board_np, fmt="%d")
    timeout = configuration.timeout
    main_graph = init_graph(board, mark, columns, rows)

    return m_c_t_s(main_graph, columns, rows, timeout)

## Testing

In [27]:
# my_agent plays tokens with "K"
# Play as the first agent against default "negamax" agent.
env.run(["negamax", my_agent])
env.render(mode="ipython", width=500, height=450)

In [24]:
import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "a" if os.path.exists(file) else "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

write_agent_to_file(m_c_t_s_single_function, "./submission.py")

<function m_c_t_s_single_function at 0x000002380619C8B0> written to ./submission.py
