In [94]:
import random
import numpy as np

def drop_piece(grid, col, mark, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = mark
    return next_grid

def check_window(window, n_disks, mark, config):
    return window.count(mark) == n_disks and window.count(0) == config.inarow - n_disks

def count_windows(grid, n_disks, mark, config):
    n_windows = 0

    # horizontal
    for row in range(config.rows):
        for col in range(config.columns - config.inarow + 1):
            window = list(grid[row, col: col + config.inarow])
            if check_window(window, n_disks, mark, config):
                n_windows += 1
    
    # vertical
    for row in range(config.rows - config.inarow + 1):
        for col in range(config.columns):
            window = list(grid[row: row + config.inarow, col])
            if check_window(window, n_disks, mark, config):
                n_windows += 1

    # positive diagonal
    for row in range(config.rows - config.inarow + 1):
        for col in range(config.columns - config.inarow + 1):
            window = list(grid[range(row, row + config.inarow), range(col, col + config.inarow)])
            if check_window(window, n_disks, mark, config):
                n_windows += 1

    # negative diagonal
    for row in range(config.inarow - 1, config.rows):
        for col in range(config.columns - config.inarow + 1):
            window = list(grid[range(row, row - config.inarow, -1), range(col, col + config.inarow)])
            if check_window(window, n_disks, mark, config):
                n_windows += 1
        
    return n_windows

# Helper function for minimax: calculates value of heuristic for grid
def get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    num_fours_opp = count_windows(grid, 4, mark%2+1, config)
    score = num_threes - 1e2*num_threes_opp - 1e4*num_fours_opp + 1e6*num_fours
    return score

In [95]:
def minimax(node, depth, maximizingPlayer, mark, config):
    isTerminal = is_terminal_node(node, config)
    if depth == 0 or isTerminal:
        return get_heuristic(node, mark, config)
    
    valid_moves = [col for col in range(config.columns) if node[0, col] == 0]

    if maximizingPlayer:
        value = -np.Inf
        for move in valid_moves:
            child = drop_piece(node, move, mark, config)
            value = max(value, minimax(child, depth-1, False, mark, config))
        return value
    else:
        value = np.Inf
        for move in valid_moves:
            child = drop_piece(node, move, mark%2+1, config)
            value = min(value, minimax(child, depth-1, True, mark, config))
        return value

# Uses minimax to calculate value of dropping piece in selected column
def score_move(grid, col, mark, config, nsteps):
    next_grid = drop_piece(grid, col, mark, config)
    score = minimax(next_grid, nsteps-1, False, mark, config)
    return score

# Helper function for minimax: checks if agent or opponent has four in a row in the window
def is_terminal_window(window, config):
    return window.count(1) == config.inarow or window.count(2) == config.inarow

# Helper function for minimax: checks if game has ended
def is_terminal_node(grid, config):
    # Check for draw 
    if list(grid[0, :]).count(0) == 0:
        return True
    # Check for win: horizontal, vertical, or diagonal
    # horizontal 
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if is_terminal_window(window, config):
                return True
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if is_terminal_window(window, config):
                return True
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
            
    return False

In [96]:
N_STEPS = 3

def agent(obs, config):
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    scores = dict(zip(valid_moves, [score_move(grid, move, obs.mark, config, N_STEPS)
                                    for move in valid_moves]))
    max_score = max(scores.values())
    max_moves = [key for key in scores.keys() if scores[key] == max_score]
    move = random.choice(max_moves)
    
    return move

from kaggle_environments import make, evaluate

env = make("connectx")
env.run([agent, "random"])

[[{'action': 0,
   'reward': 0,
   'info': {},
   'observation': {'remainingOverageTime': 60,
    'step': 0,
    'board': [0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0],
    'mark': 1},
   'status': 'ACTIVE'},
  {'action': 0,
   'reward': 0,
   'info': {},
   'observation': {'remainingOverageTime': 60, 'mark': 2},
   'status': 'INACTIVE'}],
 [{'action': 1,
   'reward': 0,
   'info': {},
   'observation': {'remainingOverageTime': 60,
    'step': 1,
    'board': [0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0

In [97]:
env = make("connectx")

In [98]:
config = env.configuration

In [99]:
grid = np.zeros((config.rows, config.columns))

In [100]:
grid = np.asarray([
    [0,0,0,0,0,0,0],
    [0,0,0,0,0,0,0],
    [0,0,0,0,0,0,0],
    [0,0,0,0,0,0,0],
    [0,0,0,0,0,0,0],
    [2,2,2,1,2,0,0]
])

In [101]:
minimax(grid, 3, True, 1, config)

0.0

In [102]:
env.run([agent, "random"])
env.render(mode="ipython")

In [103]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

In [104]:
get_win_percentages(agent1=agent, agent2="random", n_rounds=15)

KeyboardInterrupt: 

In [None]:
def minimax(node, depth, maximizingPlayer, mark, config):
    isTerminal = is_terminal_node(node, config)
    if depth == 0 or isTerminal:
        return get_heuristic(node, mark, config)
    
    valid_moves = [col for col in range(config.columns) if node[0, col] == 0]

    if maximizingPlayer:
        value = -np.Inf
        for move in valid_moves:
            child = drop_piece(node, move, mark, config)
            value = max(value, minimax(child, depth-1, False, mark, config))
        return value
    else:
        value = np.Inf
        for move in valid_moves:
            child = drop_piece(node, move, mark%2+1, config)
            value = min(value, minimax(child, depth-1, True, mark, config))
        return value


In [140]:
def alphabeta(node, depth, alpha, beta, maximizingPlayer, mark, config):
    isTerminal = is_terminal_node(node, config)
    if depth == 0 or isTerminal:
        return get_heuristic(node, mark, config)

    valid_moves = [col for col in range(config.columns) if node[0, col] == 0]

    if maximizingPlayer:
        value = -np.Inf
        for move in valid_moves:
            child = drop_piece(node, move, mark, config)
            value = max(value, alphabeta(child, depth-1, alpha, beta, False, mark, config))
            if value > beta:
                break
            alpha = max(alpha, value)
        return value
    else:
        value = np.Inf
        for move in valid_moves:
            child = drop_piece(node, move, mark%2+1, config)
            value = min(value, alphabeta(child, depth-1, alpha, beta, True, mark, config))
            if value < alpha:
                break
            beta = min(beta, value)
        return value

In [141]:
# Uses minimax to calculate value of dropping piece in selected column
def score_move_minimax(grid, col, mark, config, nsteps):
    next_grid = drop_piece(grid, col, mark, config)
    score = minimax(next_grid, nsteps-1, False, mark, config)
    return score

# Uses minimax with alphabeta pruning to calculate value of dropping piece in selected column
def score_move_alphabeta(grid, col, mark, config, nsteps):
    next_grid = drop_piece(grid, col, mark, config)
    score = alphabeta(next_grid, nsteps-1, -np.Inf, np.Inf, False, mark, config)
    return score

In [142]:
def agent_minimax(obs, config):
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    scores = dict(zip(valid_moves, [score_move_minimax(grid, move, obs.mark, config, N_STEPS)
                                    for move in valid_moves]))
    max_score = max(scores.values())
    max_moves = [key for key in scores.keys() if scores[key] == max_score]
    move = random.choice(max_moves)
    
    return move

In [143]:
def agent_alphabeta(obs, config):
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    scores = dict(zip(valid_moves, [score_move_alphabeta(grid, move, obs.mark, config, N_STEPS)
                                    for move in valid_moves]))
    max_score = max(scores.values())
    max_moves = [key for key in scores.keys() if scores[key] == max_score]
    move = random.choice(max_moves)
    
    return move

In [123]:
env = make("connectx")

In [117]:
env.run(agents=[agent_minimax, "random"])
env.render(mode="ipython")

In [125]:
env.run(agents=[agent_alphabeta, "random"])
env.render(mode="ipython")

In [127]:
env.run(agents=[agent_minimax, agent_alphabeta])
env.render(mode="ipython")

In [129]:
env.run(agents=[agent_minimax, agent_minimax])
env.render(mode="ipython")


In [145]:
get_win_percentages(agent1=agent_alphabeta, agent2=agent_minimax, n_rounds=10)

Agent 1 Win Percentage: 0.5
Agent 2 Win Percentage: 0.5
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


In [146]:
get_win_percentages(agent1=agent_alphabeta, agent2=agent_alphabeta, n_rounds=50)

Agent 1 Win Percentage: 0.54
Agent 2 Win Percentage: 0.38
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


In [147]:
get_win_percentages(agent1=agent_minimax, agent2=agent_minimax, n_rounds=10)

Agent 1 Win Percentage: 0.5
Agent 2 Win Percentage: 0.5
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


## Final alpha-beta agent definition

In [8]:
def agent(obs, config):
    
    import random
    import numpy as np
    
    N_STEPS = 3

    def drop_piece(grid, col, mark, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid

    def check_window(window, n_disks, mark, config):
        return window.count(mark) == n_disks and window.count(0) == config.inarow - n_disks

    def count_windows(grid, n_disks, mark, config):
        n_windows = 0

        # horizontal
        for row in range(config.rows):
            for col in range(config.columns - config.inarow + 1):
                window = list(grid[row, col: col + config.inarow])
                if check_window(window, n_disks, mark, config):
                    n_windows += 1
        
        # vertical
        for row in range(config.rows - config.inarow + 1):
            for col in range(config.columns):
                window = list(grid[row: row + config.inarow, col])
                if check_window(window, n_disks, mark, config):
                    n_windows += 1

        # positive diagonal
        for row in range(config.rows - config.inarow + 1):
            for col in range(config.columns - config.inarow + 1):
                window = list(grid[range(row, row + config.inarow), range(col, col + config.inarow)])
                if check_window(window, n_disks, mark, config):
                    n_windows += 1

        # negative diagonal
        for row in range(config.inarow - 1, config.rows):
            for col in range(config.columns - config.inarow + 1):
                window = list(grid[range(row, row - config.inarow, -1), range(col, col + config.inarow)])
                if check_window(window, n_disks, mark, config):
                    n_windows += 1
            
        return n_windows

    # Helper function for minimax: calculates value of heuristic for grid
    def get_heuristic(grid, mark, config):
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        num_fours_opp = count_windows(grid, 4, mark%2+1, config)
        score = num_threes - 1e2*num_threes_opp - 1e4*num_fours_opp + 1e6*num_fours
        return score

    # Helper function for minimax: checks if agent or opponent has four in a row in the window
    def is_terminal_window(window, config):
        return window.count(1) == config.inarow or window.count(2) == config.inarow

    # Helper function for minimax: checks if game has ended
    def is_terminal_node(grid, config):
        # Check for draw 
        if list(grid[0, :]).count(0) == 0:
            return True
        # Check for win: horizontal, vertical, or diagonal
        # horizontal 
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if is_terminal_window(window, config):
                    return True
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if is_terminal_window(window, config):
                    return True
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
                
        return False

    def alphabeta(node, depth, alpha, beta, maximizingPlayer, mark, config):
        isTerminal = is_terminal_node(node, config)
        if depth == 0 or isTerminal:
            return get_heuristic(node, mark, config)

        valid_moves = [col for col in range(config.columns) if node[0, col] == 0]

        if maximizingPlayer:
            value = -np.Inf
            for move in valid_moves:
                child = drop_piece(node, move, mark, config)
                value = max(value, alphabeta(child, depth-1, alpha, beta, False, mark, config))
                if value > beta:
                    break
                alpha = max(alpha, value)
            return value
        else:
            value = np.Inf
            for move in valid_moves:
                child = drop_piece(node, move, mark%2+1, config)
                value = min(value, alphabeta(child, depth-1, alpha, beta, True, mark, config))
                if value < alpha:
                    break
                beta = min(beta, value)
            return value

    # Uses minimax with alphabeta pruning to calculate value of dropping piece in selected column
    def score_move_alphabeta(grid, col, mark, config, nsteps):
        next_grid = drop_piece(grid, col, mark, config)
        score = alphabeta(next_grid, nsteps-1, -np.Inf, np.Inf, False, mark, config)
        return score


    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    scores = dict(zip(valid_moves, [score_move_alphabeta(grid, move, obs.mark, config, N_STEPS)
                                    for move in valid_moves]))
    max_score = max(scores.values())
    max_moves = [key for key in scores.keys() if scores[key] == max_score]
    move = random.choice(max_moves)
    
    return move

In [2]:
get_win_percentages(agent1=agent, agent2="random", n_rounds=25)

NameError: name 'get_win_percentages' is not defined

In [7]:
from kaggle_environments import make
env = make("connectx")
env.run(agents=[agent, "random"])
env.render(mode="ipython")

In [1]:
def my_agent(obs, config):
    
    import random
    import numpy as np
    
    N_STEPS = 3

    def drop_piece(grid, col, mark, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid

    def check_window(window, n_disks, mark, config):
        return window.count(mark) == n_disks and window.count(0) == config.inarow - n_disks

    def count_windows(grid, n_disks, mark, config):
        n_windows = 0

        # horizontal
        for row in range(config.rows):
            for col in range(config.columns - config.inarow + 1):
                window = list(grid[row, col: col + config.inarow])
                if check_window(window, n_disks, mark, config):
                    n_windows += 1
        
        # vertical
        for row in range(config.rows - config.inarow + 1):
            for col in range(config.columns):
                window = list(grid[row: row + config.inarow, col])
                if check_window(window, n_disks, mark, config):
                    n_windows += 1

        # positive diagonal
        for row in range(config.rows - config.inarow + 1):
            for col in range(config.columns - config.inarow + 1):
                window = list(grid[range(row, row + config.inarow), range(col, col + config.inarow)])
                if check_window(window, n_disks, mark, config):
                    n_windows += 1

        # negative diagonal
        for row in range(config.inarow - 1, config.rows):
            for col in range(config.columns - config.inarow + 1):
                window = list(grid[range(row, row - config.inarow, -1), range(col, col + config.inarow)])
                if check_window(window, n_disks, mark, config):
                    n_windows += 1
            
        return n_windows

    # Helper function for minimax: calculates value of heuristic for grid
    def get_heuristic(grid, mark, config):
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        num_fours_opp = count_windows(grid, 4, mark%2+1, config)
        score = num_threes - 1e2*num_threes_opp - 1e4*num_fours_opp + 1e6*num_fours
        return score

    # Helper function for minimax: checks if agent or opponent has four in a row in the window
    def is_terminal_window(window, config):
        return window.count(1) == config.inarow or window.count(2) == config.inarow

    # Helper function for minimax: checks if game has ended
    def is_terminal_node(grid, config):
        # Check for draw 
        if list(grid[0, :]).count(0) == 0:
            return True
        # Check for win: horizontal, vertical, or diagonal
        # horizontal 
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if is_terminal_window(window, config):
                    return True
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if is_terminal_window(window, config):
                    return True
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
                
        return False

    def alphabeta(node, depth, alpha, beta, maximizingPlayer, mark, config):
        isTerminal = is_terminal_node(node, config)
        if depth == 0 or isTerminal:
            return get_heuristic(node, mark, config)

        valid_moves = [col for col in range(config.columns) if node[0, col] == 0]

        if maximizingPlayer:
            value = -np.Inf
            for move in valid_moves:
                child = drop_piece(node, move, mark, config)
                value = max(value, alphabeta(child, depth-1, alpha, beta, False, mark, config))
                if value > beta:
                    break
                alpha = max(alpha, value)
            return value
        else:
            value = np.Inf
            for move in valid_moves:
                child = drop_piece(node, move, mark%2+1, config)
                value = min(value, alphabeta(child, depth-1, alpha, beta, True, mark, config))
                if value < alpha:
                    break
                beta = min(beta, value)
            return value

    # Uses minimax with alphabeta pruning to calculate value of dropping piece in selected column
    def score_move_alphabeta(grid, col, mark, config, nsteps):
        next_grid = drop_piece(grid, col, mark, config)
        score = alphabeta(next_grid, nsteps-1, -np.Inf, np.Inf, False, mark, config)
        return score


    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    scores = dict(zip(valid_moves, [score_move_alphabeta(grid, move, obs.mark, config, N_STEPS)
                                    for move in valid_moves]))
    max_score = max(scores.values())
    max_moves = [key for key in scores.keys() if scores[key] == max_score]
    move = random.choice(max_moves)
    
    return move

In [3]:
from kaggle_environments import make

env = make("connectx")
env.run(agents=[my_agent, my_agent])
env.render(mode="ipython")