In [1]:
from kaggle_environments import make
import numpy as np
import random

# Calculates score if agent drops piece in selected column
def score_move(grid, col, mark, config):
    next_grid = drop_piece(grid, col, mark, config)
    score = get_heuristic(next_grid, mark, config)
    return score

# Helper function for score_move: gets board at next step if agent drops piece in selected column
def drop_piece(grid, col, mark, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = mark
    return next_grid

# Helper function for score_move: calculates value of heuristic for grid
def get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    score = num_threes - 1e2*num_threes_opp + 1e6*num_fours
    return score

# Helper function for get_heuristic: checks if window satisfies heuristic conditions
def check_window(window, num_discs, piece, config):
    return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)


# Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
def count_windows(grid, num_discs, piece, config):
    num_windows = 0
    # horizontal
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    return num_windows

# Setup a tictactoe environment.
env = make("connectx", debug=True)

# Basic agent which marks the first available cell.
def my_agent(obs):
    return [c for c in range(len(obs.board)) if obs.board[c] == 0][0]

# Selects random valid column
def agent_random(obs, config):
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    return random.choice(valid_moves)

# Selects middle column
def agent_middle(obs, config):
    return config.columns//2

# Selects leftmost valid column
def agent_leftmost(obs, config):
    valid_moves = [c for c in range(len(obs.board)) if obs.board[c] == 0]
    board = np.asarray(obs.board).reshape(config.rows, config.columns)
    return valid_moves[0]

# Selects leftmost valid column
def agent_one_step(obs, config):
    
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    
    # Use the heuristic to assign a score to each possible board in the next turn
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config) for col in valid_moves]))
    
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    
    # Select at random from the maximizing columns
    return random.choice(max_cols)

# Run the basic agent against a default agent which chooses a "random" move.
env.run([agent_one_step, "random"])

# Render an html ipython replay of the tictactoe game.
env.render(mode="ipython")

In [57]:
# Uses minimax to calculate value of dropping piece in selected column
def score_move(grid, col, mark, config, nsteps):
    next_grid = drop_piece(grid, col, mark, config)
    # score = minimax(next_grid, nsteps-1, False, mark, config)
    score = negamax(next_grid, nsteps-1, mark, config)
    return score

# Helper function for minimax: checks if agent or opponent has four in a row in the window
def is_terminal_window(window, config):
    return window.count(1) == config.inarow or window.count(2) == config.inarow

# Helper function for minimax: checks if game has ended
def is_terminal_node(grid, config):
    # Check for draw 
    if list(grid[0, :]).count(0) == 0:
        return True
    # Check for win: horizontal, vertical, or diagonal
    # horizontal 
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if is_terminal_window(window, config):
                return True
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if is_terminal_window(window, config):
                return True
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    return False

def negamax(node, depth, mark, config):
    is_terminal = is_terminal_node(node, config)
    valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
    
    if depth == 0 or is_terminal:
        return get_heuristic(node, mark, config)
    
    value = -np.Inf
    for col in valid_moves:
        child = drop_piece(node, col, mark, config)
        this_value = -negamax(child, depth-1, mark%2 + 1, config)
        value = max(value, this_value)
    return value

    
# How deep to make the game tree: higher values take longer to run!
N_STEPS = 3

def agent_negamax(obs, config):

    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    board = Board.from_grid(grid)

    best_col, best_score = next(board.possible_col_moves()), -1_000_000
    for col in board.possible_col_moves():
        b = board.copy()
        b.play_col(col)
        alpha, beta, depth = -1, 1, 6
        score = -shallow_negamax(b, alpha, beta, depth)
        if score > best_score:
            best_score = score
            best_col = col
            
    return best_col

def agent_negamax2(obs, config):

    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    board = Board.from_grid(grid)

    best_col, best_score = next(board.possible_col_moves()), -1_000_000
    for col in board.possible_col_moves():
        b = board.copy()
        b.play_col(col)
        alpha, beta, depth = -1, 1, 4
        score = -shallow_negamax(b, alpha, beta, depth)
        if score >= best_score:
            best_score = score
            best_col = col
            
    return best_col

def human_player(obs, config):
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    print(grid)
    col = input(f'{grid}')
    return int(col)

def agent_minimax(obs, config):
    
    
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    
    # Use the heuristic to assign a score to each possible board in the next step
    scores = {col: score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves}
    
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)


# Run the basic agent against a default agent which chooses a "random" move.
env.run([agent_negamax2, agent_negamax])

# Render an html ipython replay of the tictactoe game.
env.render(mode="ipython")

In [3]:
from dataclasses import dataclass
from typing import List
from collections.abc import Generator
import numpy as np


class MaskUtils:
    def __init__(self, rows: int, cols: int):
        self.rows: int = rows
        self.cols: int = cols
        self.num_slots: int = rows * cols
        self.BOTTOM_ROW: int = self.get_bottom_row()
        self.BOARD_MASK: int = self.get_board_mask()
        
    def get_bottom_row(self) -> int:
        x = 1
        for i in range(self.cols - 1):
            x |= x << self.rows
        return x
    
    def get_col_mask(self, col: int) -> int:
        first_col = (1 << (self.rows - 1)) - 1
        return first_col << (self.rows * col)
        
    def get_board_mask(self) -> int:
        x = self.BOTTOM_ROW << (self.rows - 1)
        return x - self.BOTTOM_ROW
    
    def move_order(self) -> List[int]:
        order: List[int] = [0] * self.cols

        for i in range(self.cols):
            if i % 2 == 0:
                order[i] = (self.cols - i - 1) // 2
            else:
                order[i] = (self.cols + i) // 2

        return order
    
    def __repr__(self) -> str:
        return f'MaskUtils(rows={self.rows}, cols={self.cols})'


@dataclass
class Board:
    mask_utils: MaskUtils 
    mask: int 
    position: int
    num_moves: int

    def can_play_col(self, col: int) -> None:
        offset = (col + 1) * self.mask_utils.rows - 2
        top_col_bit = 1 << offset
        return (self.mask & top_col_bit) == 0
    
    def play_col(self, col: int) -> None:
        offset = self.mask_utils.rows * col
        col_bit = 1 << offset
        self.play_move(self.mask + col_bit)
    
    def play_move(self, move: int) -> None:
        self.position ^= self.mask
        self.mask |= move
        self.num_moves += 1
   
    def num_slots(self) -> int:
        return (self.mask_utils.rows - 1) * self.mask_utils.cols

    def key(self) -> int:
        return (self.mask + self.mask_utils.BOTTOM_ROW) | self.position

    def is_full(self) -> bool:
        return self.num_moves == self.num_slots()
    
    def is_won(self) -> bool:
        directions = (self.mask_utils.rows - 1, self.mask_utils.rows, self.mask_utils.rows + 1, 1)
        bitboard = self.position ^ self.mask;
        for dir in directions:
            bitmask = bitboard & (bitboard >> dir)
            if (bitmask & (bitmask >> 2 * dir)):
                return True

        return False
    
    def possible_moves_mask(self) -> int:
        return (self.mask + self.mask_utils.BOTTOM_ROW) & self.mask_utils.BOARD_MASK
    
    def possible_moves(self) -> Generator[int, None, None]:
        possible_moves_mask = self.possible_moves_mask()
        move_order = self.mask_utils.move_order()
        
        for col in move_order:
            col_mask = self.mask_utils.get_col_mask(col)
            possible_move = possible_moves_mask & col_mask
            if possible_move > 0:
                yield possible_move
    
    def possible_col_moves(self) -> Generator[int, None, None]:
        possible_moves_mask = self.possible_moves_mask()
        move_order = self.mask_utils.move_order()
        
        for col in move_order:
            col_mask = self.mask_utils.get_col_mask(col)
            possible_move = possible_moves_mask & col_mask
            if possible_move > 0:
                yield col
    
    def win_mask(self) -> int:

        H1 = self.mask_utils.rows
        posn = self.position

        # Vertical win
        wm = (posn << 1) & (posn << 2) & (posn << 3)

        # Horizontals (_XXXO and OXXX_)
        wm |= (posn << H1) & (posn << 2 * H1) & (posn << 3 * H1)
        wm |= (posn >> H1) & (posn >> 2 * H1) & (posn >> 3 * H1)

        # Horizontals (OXX_XO and OX_XXO)
        wm |= (posn << H1) & (posn << 2 * H1) & (posn >> H1)
        wm |= (posn >> H1) & (posn >> 2 * H1) & (posn << H1)

        # Diagonals _/_
        wm |= (posn << (H1 + 1)) & (posn << 2 * (H1 + 1)) & (posn << 3 * (H1 + 1))
        wm |= (posn << (H1 + 1)) & (posn << 2 * (H1 + 1)) & (posn >>     (H1 + 1))
        wm |= (posn << (H1 + 1)) & (posn >>     (H1 + 1)) & (posn >> 2 * (H1 + 1))
        wm |= (posn >> (H1 + 1)) & (posn >> 2 * (H1 + 1)) & (posn >> 3 * (H1 + 1))

        # Diagonals _\_
        wm |= (posn >> (H1 - 1)) & (posn >> 2 * (H1 - 1)) & (posn >> 3 * (H1 - 1))
        wm |= (posn >> (H1 - 1)) & (posn >> 2 * (H1 - 1)) & (posn <<     (H1 - 1))
        wm |= (posn >> (H1 - 1)) & (posn <<     (H1 - 1)) & (posn << 2 * (H1 - 1))
        wm |= (posn << (H1 - 1)) & (posn << 2 * (H1 - 1)) & (posn << 3 * (H1 - 1))

        return wm & (self.mask_utils.BOARD_MASK ^ self.mask);

    def copy(self) -> 'Board':
        return Board(self.mask_utils, self.mask, self.position, self.num_moves)
    
    @classmethod
    def create(cls, rows: int, cols: int, moves: List[int]) -> 'Board':
        mask = MaskUtils(rows + 1, cols)
        board = cls(mask, 0, 0, 0) 
        for move in moves:
            board.play_col(move - 1)
        return board
    
    @classmethod
    def from_grid(cls, grid) -> 'Board':
        rows, cols = grid.shape
        padded_grid = np.vstack((np.zeros(cols), grid))
        
        indices = np.flipud(np.arange((rows + 1) * cols).reshape((cols, rows + 1)).transpose())
        binary_vals = 2 ** indices.astype(np.int64)

        mask = (padded_grid > 0).astype(np.int64)
        num_moves = np.sum(mask)
        mark = 1 + num_moves % 2
        
        posn = (padded_grid == mark).astype(np.int64)
        mask_utils = MaskUtils(rows + 1, cols)
        board = cls(mask_utils, 0, 0, 0)
        mask_val = np.sum(mask * binary_vals)
        posn_val = np.sum(posn * binary_vals)
        board.mask = mask_val
        board.position = posn_val
        board.num_moves = np.sum(mask)
        return board
    
ROWS, COLS = 6, 7
moves = [1,1,2,2,3,3]
# 64115442265757253615
moves = [1,2,2,1,1,2,2,1,1,2,2,1,3,7,4,7,7,5,7,7,6,7,5,6,6,5,5,6,6,5,5,6]
moves = [6,4,1,1,5,4,4,2,2,6,5,7,5,7,2,5,3,6,1,5]
board = Board.create(ROWS, COLS, moves)

board.possible_moves_mask()

17875670696968

In [4]:
grid = np.array(
    [[0, 0, 0, 0, 0, 0, 0],
     [0, 0, 0, 0, 0, 0, 0],
     [0, 0, 0, 0, 0, 0, 0],
     [0, 0, 0, 0, 0, 0, 0],
     [0, 1, 0, 0, 0, 0, 0],
     [0, 1, 2, 0, 2, 0, 1]])

Board.from_grid(grid)

Board(mask_utils=MaskUtils(rows=7, cols=7), mask=4398314963328, position=268451840, num_moves=5)

Board(mask_utils=MaskUtils(rows=7, cols=7), mask=4398314963328, position=268451840, num_moves=5)

In [48]:


# Helper function for score_move: calculates value of heuristic for grid
def get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    score = num_threes - 1e2*num_threes_opp + 1e6*num_fours
    return score

def get_heuristic2(board) -> int:
    directions = (board.mask_utils.rows - 1, board.mask_utils.rows, board.mask_utils.rows + 1, 1)
    bitboard = board.position ^ board.mask;
    count = 0
    for dir in directions:
        if bitboard & (bitboard >> dir) & (bitboard >> 2 * dir):
            count += 0.1

    return count


def shallow_negamax(board: Board, alpha: int, beta: int, depth: int) -> int:
    if board.is_full():
        return 0

    win_mask = board.win_mask()
    possible_moves = board.possible_moves_mask()
    if (win_mask & possible_moves):
        return (board.num_slots() - board.num_moves + 1) // 2

    max_possible_score = (board.num_slots() - board.num_moves - 1) // 2
    if max_possible_score <= alpha:
        return max_possible_score
    
    if depth == 0:
        return get_heuristic2(board)

    alpha = -100_000_000
    beta = min(beta, max_possible_score)

    for move in board.possible_moves():
        b = board.copy()
        b.play_move(move)
        score = -shallow_negamax(b, -beta, -alpha, depth - 1)
        alpha = max(alpha, score)
        if score >= beta:
            return alpha

    return alpha

ROWS, COLS = 6, 7
moves = [2,3,2,5,7]
moves = [3,3,4,4]
board = Board.create(ROWS, COLS, moves)

for col in board.possible_col_moves():
    b = board.copy()
    b.play_col(col)
    alpha, beta, depth = -1, 1, 9
    score = -shallow_negamax(b, alpha, beta, depth)
    print(f'[{col}] Score={score}')


[3] Score=0
[4] Score=18
[2] Score=0
[5] Score=0
[1] Score=18
[6] Score=0
[0] Score=0


In [6]:
class Solver:
    
    def minimax(self, board: Board, alpha: int, beta: int) -> int:
        if board.is_full():
            return 0
        
        win_mask = board.win_mask()
        possible_moves = board.possible_moves_mask()
        if (win_mask & possible_moves):
            return (board.num_slots() - board.num_moves + 1) // 2
        
        max_possible_score = (board.num_slots() - board.num_moves - 1) // 2
        if max_possible_score <= alpha:
            return max_possible_score
        
        alpha = -100_000_000
        beta = min(beta, max_possible_score)
        
        for move in board.possible_moves():
            b = board.copy()
            b.play_move(move)
            score = -self.minimax(b, -beta, -alpha)
            alpha = max(alpha, score)
            if score >= beta:
                return alpha

        return alpha
    
solver = Solver()
solver.minimax(board, -np.inf, np.inf)

moves = [6,4,1,1,5,4,4,2,2,6,5,7,5,7,2,5,3,6]
board = Board.create(ROWS, COLS, moves)

for col in board.possible_col_moves():
    b = board.copy()
    b.play_col(col)
    score = -solver.minimax(b, -1, 1)
    print(f'[{col}] Score={score}')

[3] Score=-12
[4] Score=7
[2] Score=11
[5] Score=8
[1] Score=10
[6] Score=-10
[0] Score=2


In [46]:
moves = [3,3,4,4,5,5]
board = Board.create(ROWS, COLS, moves)
board.win_mask().bit_count()

2