# Imports:

In [1]:
from kaggle_environments import make, evaluate, agent, utils
import numpy as np
import random
import sys

Loading environment lux_ai_s2 failed: No module named 'vec_noise'


# Human Agent:
This is to let a user play against an agent

In [2]:
def human_agent(obs, config):
    import numpy as np
    
    board = np.asarray(obs.board).reshape(config.rows, config.columns)
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    
#     print(board)
    while True:
        move = int(input(board))
        if move in valid_moves:
            return move

# Agent:

We will list all things we need in our agents to make it more organized

- **Parameters**:
    - obs:
        - `obs.board` - the game board (a Python list with one item for each grid location)
        - `obs.mark` - the piece assigned to the agent (either 1 or 2)
    - config
        - `config.columns` - number of columns in the game board (7 for Connect Four)
        - `config.rows` - number of rows in the game board (6 for Connect Four)
        - `config.inarow` - number of pieces a player needs to get in a row in order to win (4 for Connect Four)

- **Packages (Imports)**: numpy / random

- **Variables**: SEARCH_DEPTH (Constant to limit the minimax search depth)

- **Methods**:
    - `get_next_board` - method to get the resulting board when dropping a piece at a column.
    - `check_window` - check if a given window (list) has a freq(mark) == disk-count, and all other list elements are 0, if yes, it returns true, and returns false otherwise.
    - `count_horizontal` - count the horizontal windows that meets condition of `check_window` for given disk-count
    - `count_vertical` - count the vertical windows that meets condition of `check_window` for given disk-count
    - `count_diagonal` - count the diagonal windows that meets condition of `check_window` for given disk-count
    - `count_antidiagonal` - count the anti-diagonal windows that meets condition of `check_window` for given disk-count
    - `count_matches` - finds the count of matches for all kind of windows (horizontal / vertical / diagonal / anti-diagonal) to a given disk-count
    - `is_leaf / is_terminal` - check if the given state (board) is terminal (game ends on this board)
    - `evaluate` - evaluates the given state (board) according to a heuristic fuction
    - `minimax` - runs the minimax algorithim with alpha-beta pruning
    - `get_best_move` - acts as a main method

In [3]:
def my_agent(obs, config):
    import numpy as np
    import random
    import sys
    
    SEARCH_DEPTH = 5
    MOD = int(1e9) + 7
    memo = {}
    freq = {}
    
    def add_undermod(a, b):
        return (a + b) % MOD
    
    def hash_board(board, config):
        index = 0
        ret = 0
        for row in range(config.rows):
            for col in range(config.columns):
                if board[row][col] == 1:
                    ret = add_undermod(ret, pow(5, index, MOD))
                elif board[row][col] == 2:
                    ret = add_undermod(ret, pow(13, index, MOD))
                index += 1
        return ret
                    
    def get_next_board(board, col, mark, config):
        next_board = board.copy()
        for row in range(config.rows - 1, -1, -1):
            if next_board[row][col] == 0:
                next_board[row][col] = mark
                return next_board
        assert False, "The given move is invalid, the column is full!"
    
    def check_window(window, disk_count, mark, config):
        return window.count(mark) == disk_count and window.count(0) == config.inarow - disk_count
    
    def count_horizontal(board, disk_count, mark, config):
        ret = 0
        for row in range(config.rows):
            for col in range(config.columns - config.inarow + 1):
                window = list(board[row, col : col + config.inarow])
                if check_window(window, disk_count, mark, config):
                    ret += 1
        return ret
    
    def count_vertical(board, disk_count, mark, config):
        ret = 0
        for row in range(config.rows - config.inarow + 1):
            for col in range(config.columns):
                window = list(board[row : row + config.inarow, col])
                if check_window(window, disk_count, mark, config):
                    ret += 1
        return ret
    
    def count_diagonal(board, disk_count, mark, config):
        ret = 0
        for row in range(config.rows - config.inarow + 1):
            for col in range(config.columns - config.inarow + 1):
                window = list(board[range(row, row + config.inarow), range(col, col + config.inarow)])
                if check_window(window, disk_count, mark, config):
                    ret += 1
        return ret
    
    def count_antidiagonal(board, disk_count, mark, config):
        ret = 0
        for row in range(config.rows - config.inarow + 1):
            for col in range(config.columns - config.inarow + 1):
                window = list(board[range(row + config.inarow - 1, row - 1, -1), range(col, col + config.inarow)])
                if check_window(window, disk_count, mark, config):
                    ret += 1
        return ret
    
    def count_matches(board, disk_count, mark, config):
        return count_horizontal(board, disk_count, mark, config) + \
                count_vertical(board, disk_count, mark, config) + \
                count_diagonal(board, disk_count, mark, config) + \
                count_antidiagonal(board, disk_count, mark, config)
    
    def is_terminal(board, config):
        if list(board[0]).count(0) == 0:
            return True
        for mark in [1, 2]:
            if count_matches(board, config.inarow, mark, config) > 0:
                return True
        return False
    
    def evaluate(board, agent_mark, config):
        a = count_matches(board, 3, agent_mark, config)
        b = count_matches(board, 4, agent_mark, config)
        c = count_matches(board, 3, agent_mark % 2 + 1, config)
        d = count_matches(board, 4, agent_mark % 2 + 1, config)
#         original_stdout = sys.stdout
#         with open('debug.txt', 'a') as f:
#             sys.stdout = f
#             for y in board:
#                 for x in y:
#                     print(x, end = ' ')
#             print()
#             print(a, b, c, d)
#             sys.stdout = original_stdout
        return a + b * 1e6 - c * 1e2 - d * 1e4
    
    def minimax(board, depth, agent_mark, is_maximizing, alpha, beta, config):
        if depth == 0 or is_terminal(board, config):
            return evaluate(board, obs.mark, config)
        
        hash = hash_board(board, config)
        if hash in memo:
            freq[hash] += 1
            return memo[hash]
        
        valid_moves = [col for col in range(config.columns) if board[0][col] == 0]
        ret = 0
        if is_maximizing:
            ret = -np.Inf
            for col in valid_moves:
                next_board = get_next_board(board, col, agent_mark % 2 + 1, config)
                ret = max(ret, minimax(next_board, depth - 1, agent_mark, False, alpha, beta, config))
                if ret >= beta:
                    return ret
                alpha = max(alpha, ret)
        else:
            ret = np.Inf
            for col in valid_moves:
                next_board = get_next_board(board, col, agent_mark, config)
                ret = min(ret, minimax(next_board, depth - 1, agent_mark, True, alpha, beta, config))
                if alpha >= ret:
                    return ret
                beta = min(beta, ret)
        
        memo[hash] = ret
        freq[hash] = 1
        return ret
    
    # make sure about this method tmw
    def get_best_move(obs, config):
        valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
        board = np.asarray(obs.board).reshape(config.rows, config.columns)
        scores = [minimax(get_next_board(board, col, obs.mark, config), SEARCH_DEPTH - 1, obs.mark % 2 + 1, False, -1e18, 1e18, config) for col in valid_moves]
        max_score = max(scores)
        best_moves = [valid_moves[index] for index in range(len(valid_moves)) if scores[index] == max_score]
        dis = [abs(3 - index) for index in best_moves]
        minDis = min(dis)
        for i in range(len(best_moves)):
            if minDis == dis[i]:
                return best_moves[i]
#         return random.choice(best_moves)
    
    return get_best_move(obs, config)

# Create a submission file

In [4]:
import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

write_agent_to_file(my_agent, "submission.py")

<function my_agent at 0x7f37f40eb370> written to submission.py


# Validate your submission file

In [6]:
import sys
from kaggle_environments import utils, agent

out = sys.stdout
submission = utils.read_file("./submission.py")
agent = agent.get_last_callable(submission, path=submission)
sys.stdout = out

env = make("connectx", debug=True)
env.run([agent, agent])
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

Success!


# Testing and play the game

In [None]:
env = make("connectx", debug=True)
# env.run([my_agent, 'random'])
env.run(['negamax', my_agent])
# env.run([my_agent, human_agent])
env.render(mode="ipython")

0 0 0 0 0 0 0 0 0 0 2 0 0 0 0 0 0 1 1 0 0 0 0 0 1 1 0 2 0 0 0 1 2 0 2 0 1 0 2 1 0 2

# Get Win Percentage

In [108]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

In [113]:
get_win_percentages(agent1=my_agent, agent2="random", n_rounds=20)

Agent 1 Win Percentage: 1.0
Agent 2 Win Percentage: 0.0
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


In [160]:
get_win_percentages(agent1=my_agent, agent2="negamax", n_rounds=20)

Agent 1 Win Percentage: 0.85
Agent 2 Win Percentage: 0.0
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0
