In [1]:
import random as rd
import numpy as np

In [2]:
from IPython.display import HTML
import random

def hide_toggle(for_next=False):
    this_cell = """$('div.cell.code_cell.rendered.selected')"""
    next_cell = this_cell + '.next()'

    toggle_text = 'Toggle show/hide'  # text shown on toggle link
    target_cell = this_cell  # target cell to control with toggle
    js_hide_current = ''  # bit of JS to permanently hide code in current cell (only when toggling next cell)

    if for_next:
        target_cell = next_cell
        toggle_text += ' next cell'
        js_hide_current = this_cell + '.find("div.input").hide();'

    js_f_name = 'code_toggle_{}'.format(str(random.randint(1,2**64)))

    html = """
        <script>
            function {f_name}() {{
                {cell_selector}.find('div.input').toggle();
            }}

            {js_hide_current}
        </script>

        <a href="javascript:{f_name}()">{toggle_text}</a>
    """.format(
        f_name=js_f_name,
        cell_selector=target_cell,
        js_hide_current=js_hide_current, 
        toggle_text=toggle_text
    )

    return HTML(html)

hide_toggle()

In [3]:
from kaggle_environments import make, evaluate

env = make('connectx', debug=True)

print(list(env.agents))

Loading environment football failed: No module named 'gfootball'
['random', 'negamax']


In [4]:
env.run(['random', 'random'])

env.render(mode='ipython')

In [5]:
def agent_random(obs, config):
    valid_moves = [col for col in range(config.columns) 
                   if obs.board[col] == 0]
    return random.choice(valid_moves)

def agent_middle(obs, config):
    return config.columns//2


def agent_leftmost(obs, config):
    valid_moves = [col for col in range(config.columns)
                  if obs.board[col] == 0]
    return valid_moves[0]

In [6]:
env.run([agent_random, agent_random])

env.render(mode='ipython')

In [7]:
# hide
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))
    
hide_toggle()

In [8]:
get_win_percentages(agent1=agent_leftmost, agent2=agent_random)

Agent 1 Win Percentage: 0.82
Agent 2 Win Percentage: 0.18
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


In [9]:
get_win_percentages(agent1=agent_random, agent2=agent_random)

Agent 1 Win Percentage: 0.52
Agent 2 Win Percentage: 0.48
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


## Heuristic

In [10]:
def score_move(grid, col, mark, config):
    next_grid = drop_piece(grid, col, mark, config)
    score = get_heuristic(next_grid, mark, config)
    return score

def drop_piece(grid, col, mark, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = mark
    return next_grid

def get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    score = num_threes + 1e6*num_fours - 1e2*num_threes_opp
    return score

def check_window(window, num_discs, piece, config):
    return (window.count(piece) == num_discs
           and window.count(0) == config.inarow - num_discs)

def count_windows(grid, num_discs, piece, config):
    num_windows = 0
    for row in range(config.rows):
        for col in range(config.columns - (config.inarow - 1)):
            window = list(grid[row, col:col+config.inarow])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    for row in range(config.rows - (config.inarow - 1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    for row in range(config.rows - (config.inarow - 1)):
        for col in range(config.columns - (config.inarow - 1)):
            window = list(grid[range(row, row-config.inarow, -1),
                              range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    return num_windows

In [11]:
def agent(obs, config):
    valid_moves = [c for c in range(config.columns)
                  if obs.board[c] == 0]
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    scores = dict(zip(valid_moves, 
                      [score_move(grid, col, obs.mark, config)
                      for col in valid_moves]))
    max_cols = [key for key in scores.keys()
               if scores[key] == max(scores.values())]
    return random.choice(max_cols)

In [12]:
env.run([agent, 'random'])
# env.render(mode='ipython')

[[{'action': 0,
   'reward': 0,
   'info': {},
   'observation': {'board': [0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0],
    'mark': 1},
   'status': 'ACTIVE'},
  {'action': 0,
   'reward': 0,
   'info': {},
   'observation': {'mark': 2},
   'status': 'INACTIVE'}],
 [{'action': 1,
   'reward': 0,
   'info': {},
   'observation': {'board': [0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     0,
     1,
     0,
     0,
     0,
     0,
     0],
    'mark': 1},
   'status': 'INACTIVE'

In [13]:
get_win_percentages(agent, 'random')

Agent 1 Win Percentage: 0.97
Agent 2 Win Percentage: 0.03
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


In [14]:
def my_agent(obs, config):
    import numpy as np
    import random as rd
    
    def score_move(grid, col, mark, config):
        next_grid = drop_piece(grid, col, mark, config)
        score = get_heuristic(next_grid, mark, config)
        return score

    def drop_piece(grid, col, mark, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid

    def get_heuristic(grid, mark, config):
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        score = num_threes + 1e6*num_fours - 1e2*num_threes_opp
        return score

    def check_window(window, num_discs, piece, config):
        return (window.count(piece) == num_discs
               and window.count(0) == config.inarow - num_discs)

    def count_windows(grid, num_discs, piece, config):
        num_windows = 0
        for row in range(config.rows):
            for col in range(config.columns - (config.inarow - 1)):
                window = list(grid[row, col:col+config.inarow])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        for row in range(config.rows - (config.inarow - 1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        for row in range(config.rows - (config.inarow - 1)):
            for col in range(config.columns - (config.inarow - 1)):
                window = list(grid[range(row, row-config.inarow, -1),
                                  range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        return num_windows
    
    valid_moves = [c for c in range(config.columns)
                  if obs.board[c] == 0]
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    scores = dict(zip(valid_moves, 
                      [score_move(grid, col, obs.mark, config)
                      for col in valid_moves]))
    max_cols = [key for key in scores.keys()
               if scores[key] == max(scores.values())]
    return random.choice(max_cols)

## N-Step Lookahead

In [7]:
# How deep to make the game tree: higher values take longer to run!

def n_step_agent(obs, config):
    
    import random
    import numpy as np
    
    N_STEPS = 2

    def drop_piece(grid, col, mark, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid

    def check_window(window, num_discs, piece, config):
        return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)

    def count_windows(grid, num_discs, piece, config):
        num_windows = 0
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        return num_windows
    
    def get_heuristic(grid, mark, config):
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        num_fours_opp = count_windows(grid, 4, mark%2+1, config)
        score = num_threes - 1e3*num_threes_opp - 1e6*num_fours_opp + 1e10*num_fours
        return score
    
    def score_move(grid, col, mark, config, nsteps):
        next_grid = drop_piece(grid, col, mark, config)
        score = minimax(next_grid, nsteps-1, False, mark, config)
        return score

    def is_terminal_window(window, config):
        return window.count(1) == config.inarow or window.count(2) == config.inarow

    def is_terminal_node(grid, config):
        if list(grid[0, :]).count(0) == 0:
            return True
        # horizontal 
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if is_terminal_window(window, config):
                    return True
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if is_terminal_window(window, config):
                    return True
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        return False

    # Minimax implementation
    def minimax(node, depth, maximizingPlayer, mark, config):
        is_terminal = is_terminal_node(node, config)
        valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
        if depth == 0 or is_terminal:
            return get_heuristic(node, mark, config)
        if maximizingPlayer:
            value = -np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark, config)
                value = max(value, minimax(child, depth-1, False, mark, config))
            return value
        else:
            value = np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark%2+1, config)
                value = min(value, minimax(child, depth-1, True, mark, config))
            return value
    
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves]))
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    return random.choice(max_cols)

In [64]:
import time
startTime = time.time()

env.run([n_step_agent, n_step_agent])
env.render(mode='ipython')

executionTime = (time.time() - startTime)
print('Execution time in seconds: ' + str(executionTime))

Execution time in seconds: 22.06244468688965


In [60]:
import time
startTime = time.time()

env.run([my_agent, n_step_agent])
env.render(mode='ipython')

executionTime = (time.time() - startTime)
print('Execution time in seconds: ' + str(executionTime))

Execution time in seconds: 13.899564027786255


## Submission File

In [8]:
import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "a" if os.path.exists(file) else "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

filename = './submissions/2_step_agent.py'
write_agent_to_file(n_step_agent, filename)

<function n_step_agent at 0x7f96040426a8> written to ./submissions/2_step_agent.py


In [62]:
get_win_percentages(my_agent, 'random')

Agent 1 Win Percentage: 0.94
Agent 2 Win Percentage: 0.06
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


## Validate submission file

In [5]:
import sys
from kaggle_environments import utils, agent
from kaggle_environments import make, evaluate

env = make('connectx', debug=True)

out = sys.stdout
submission = utils.read_file('./submissions/2_step_agent.py')
agent = agent.get_last_callable(submission)
agent1 = agent
sys.stdout = out

env = make("connectx", debug=True)
env.run([agent1, agent1])
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

Success!


In [1]:
ls submissions/

2_step_agent.py  heuristic-agent.py  q-learning.py
3_step_agent.py  heuristic.py        submission.py


In [11]:
env.run([agent, agent])
env.render(mode='ipython')

In [8]:
import sys
from kaggle_environments import utils, agent
from kaggle_environments import make, evaluate

env = make('connectx', debug=True)

out = sys.stdout
submission = utils.read_file('./submissions/4_step_agent.py')
agent = agent.get_last_callable(submission)
agent4 = agent
sys.stdout = out

env = make("connectx", debug=True)
env.run([agent4, agent4])
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

Timeout: 
Failed...


In [10]:
env.run([agent1, agent2])
env.render(mode='ipython')

In [6]:
agent1

<function n_step_agent(obs, config)>