Derived from a public notebook by https://www.kaggle.com/mrgeislinger

In [None]:
debug = True

# Install kaggle-environments

# Create ConnectX Environment

In [None]:
import random as rd
import pandas as pd
import json
import numpy as np
import time

!pip install 'kaggle-environments>=0.1.6'
from kaggle_environments import evaluate, make, utils
# Since utils.get_last_callable moved to agent.get_last_callable
# See https://github.com/Kaggle/kaggle-environments/blob/e4a5651a3a0775b823fc27fe2c24b55cbd340420/kaggle_environments/agent.py#L37
from kaggle_environments import agent as kaggle_env_agent

In [None]:
env = make("connectx", debug=True)
#env.render()

In [None]:
# Check version of tensorflow
!pip install 'tensorflow==1.15.0'
import tensorflow as tf
#tf.__version__
!apt-get update
!apt-get install -y cmake libopenmpi-dev python3-dev zlib1g-dev
!pip install "stable-baselines[mpi]==2.9.0"
from gym import spaces
#For Trained Agent
from stable_baselines import PPO1 
from stable_baselines.common.policies import CnnPolicy

In [None]:
#@title Trained Agent
def trained_agent(obs, config, model=None, debug=False):
    start = time.time()
    #Import saved model trained on Agent Heuristic
    if model is None:
        trained_model = stable_baselines.PPO1.load('/content/trained.zip', env=None, verbose=0)
    else:
        trained_model = model
    
    # Use the trained model to select a column
    col, _ = trained_model.predict(np.array(obs['board']).reshape(6,7,1))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    
    if debug:
        print("\nTrained model predicted column:", col)
        print("Time taken =", time.time() - start)
    
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])

# Create an Agent

To create the submission, an agent function should be fully encapsulated (no external dependencies).  

When your agent is being evaluated against others, it will not have access to the Kaggle docker image.  Only the following can be imported: Python Standard Library Modules, gym, numpy, scipy, pytorch (1.3.1, cpu only), and more may be added later.



In [None]:
def debug_agent(obs, config, N_STEPS=2, debug=False):
    ''' Copy and paste your agent here '''
    
    return True

In [15]:
#@ title debug_agent
def debug_agent(obs, config, N_STEPS=2, debug=False, df=None):
    ''' Copy and paste your agent here '''
    import numpy as np
    import random
    import time

    ########################### Regular pruner ################
    # global vars
    global glob_it
    # constants (given by game)
    ROWS = config.rows
    COLUMNS = config.columns
    CNCTX = config.inarow
    ## coefficients
    A = 2     #my twos
    B = 20    #my threes
    C = 200   #my fours
    D = -1    #opp-twos
    E = -10    #opp-threes
    F = -100   #opp-fours
    
    # vary lookahead depth according to state of play:
    if obs.board.count(0) >= 6*(ROWS*COLUMNS//7):
        N_STEPS =       2     
    elif obs.board.count(0) >= 5*(ROWS*COLUMNS//7):
        N_STEPS =       3
    elif obs.board.count(0) >= 4*(ROWS*COLUMNS//7):
        N_STEPS =       4  
    elif obs.board.count(0) >= 3*(ROWS*COLUMNS//7):
        N_STEPS =       5
    elif obs.board.count(0) >= 2*(ROWS*COLUMNS//7):
        N_STEPS =       6 
    else:    
        N_STEPS =       7
    
    if debug:
        if obs.board.count(1) == 0:
            print(f'"configuration":{config}')  
        print(f'\n###### Player {obs.mark} Turn {obs.board.count(obs.mark):02} ######') 
        print('Board:', np.reshape(obs.board, (6,7)))
        print(f'Using {N_STEPS} step lookahead')

    # Gets board at next step if agent drops piece in selected column
    def drop_piece(grid, col, mark):
        next_grid = grid.copy()
        for row in range(ROWS-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid

    # Helper function for get_score: checks if window satisfies heuristic conditions
    def check_window(window, num_discs, piece):
        return (window.count(piece) == num_discs and window.count(0) == CNCTX-num_discs)

    # Helper function for get_score: counts number of windows satisfying specified heuristic conditions
    def count_windows(grid, num_discs, piece):
        num_windows = 0
        # horizontal
        for row in range(ROWS):
            for col in range(COLUMNS-(CNCTX-1)):
                window = list(grid[row, col:col+CNCTX])
                if check_window(window, num_discs, piece):
                    num_windows += 1
        # vertical
        for row in range(ROWS-(CNCTX-1)):
            for col in range(COLUMNS):
                window = list(grid[row:row+CNCTX, col])
                if check_window(window, num_discs, piece):
                    num_windows += 1
        # positive diagonal
        for row in range(ROWS-(CNCTX-1)):
            for col in range(COLUMNS-(CNCTX-1)):
                window = list(grid[range(row, row+CNCTX), range(col, col+CNCTX)])
                if check_window(window, num_discs, piece):
                    num_windows += 1
        # negative diagonal
        for row in range(CNCTX-1, ROWS):
            for col in range(COLUMNS-(CNCTX-1)):
                window = list(grid[range(row, row-CNCTX, -1), range(col, col+CNCTX)])
                if check_window(window, num_discs, piece):
                    num_windows += 1
        return num_windows
        
    # Quickly checks to see if the game could be won or lost in next step
    def check_terminal(grid, mark):
        num_fours = count_windows(grid, 4, mark)   #C
        num_fours_opp = count_windows(grid, 4, mark%2+1)  #F
        is_terminal = (num_fours != 0) or (num_fours_opp != 0) or (list(grid[0, :]).count(0) == 0)
        return is_terminal
    
    # Helper function for alphabeta: calculates value of heuristic for grid
    def get_score(grid, mark):
        num_fours = count_windows(grid, 4, mark)   #C
        num_fours_opp = count_windows(grid, 4, mark%2+1)  #F
        num_twos = count_windows(grid, 2, mark) #A
        num_threes = count_windows(grid, 3, mark)  #B
        num_twos_opp = count_windows(grid, 2, mark%2+1) #D
        num_threes_opp = count_windows(grid, 3, mark%2+1) #E
        score = A*num_twos + B*num_threes + C*num_fours + D*num_twos_opp + E*num_threes_opp + F*num_fours_opp
        is_terminal = (num_fours != 0) or (num_fours_opp != 0) or (list(grid[0, :]).count(0) == 0)
        return score, is_terminal

    # Minimax with alphabeta pruning implementation:
    def alphabeta(node, depth, alpha, beta, maximizingPlayer, mark):
        node_score, is_terminal = get_score(node, mark)
        if depth == 0 or is_terminal:
             return node_score

        valid_moves = [c for c in range(COLUMNS) if node[0][c] == 0]
        if maximizingPlayer:
            value = -np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark)
                value = max(value, alphabeta(child, depth-1, alpha, beta, False, mark))
                alpha = max(alpha, value)
                if alpha >= beta:
                    break
            return value

        else: #minimizing player
            value = np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark%2+1)
                value = min(value, alphabeta(child, depth-1, alpha, beta, True, mark))
                beta = min(beta, value)
                if alpha >= beta:
                    break
            return value

    # Uses alphabeta pruning to calculate value
    # of dropping piece in selected column
    def score_move(grid, col, mark, depth):
        go = time.time()
        next_grid = drop_piece(grid, col, mark)
        score = alphabeta(next_grid, depth-1, -np.Inf, np.Inf, False, mark)
        if debug:
            summary_stats = {
                'column': col,
                'score': score,
                'column time': round(time.time() - go, 5),
                'time_left': round(2.0 - (time.time() - choice_time), 5)           
            }
            print(f'"summary_stats":{summary_stats}')
        return score
    
    def first_pass(grid, col, mark):
        player_is_terminal = check_terminal(drop_piece(grid, col, mark), mark)
        opp_is_terminal = check_terminal(drop_piece(grid, col, mark%2+1), mark%2+1)
        return opp_is_terminal or player_is_terminal
    
    #########################
    # Agent makes selection #
    #########################

    # Get list of valid moves
    valid_moves = [c for c in range(COLUMNS) if obs.board[c] == 0]

    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(ROWS, COLUMNS)

    # Do a quick pass to see if there is a terminal node on the surface 
    choice_time = time.time()
    quick_pick = False
    for col in valid_moves:
        quick_pick = first_pass(grid, col, obs.mark)
        if quick_pick:
            choice = col   
            if debug:
                print("Column {} is terminal.".format(choice))
            break
    
    if not quick_pick:   
        # Use the heuristic to assign a score to each possible board in the next step
        scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, N_STEPS) for col in valid_moves]))

        # Get a list of columns (moves) that maximize the heuristic
        max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
        
        #select column in order of preference
        for pref in [3,4,2]: 
            if pref in max_cols:
                choice = pref
                break
            else:
                choice = random.choice(max_cols)
    if debug:
        print("Chosen column:", choice)
        print("Choice took:", round(time.time()-choice_time,5))

    if not df is None:
        df.iloc[glob_it] = [choice]+[obs.step]+[obs.mark]+[x for x in obs.board]
        glob_it += 1
        
    return choice


# Test your Agent

In [9]:
#from test_agent_v4 import my_agent as test_agent_v4
from experimental_agent_v8 import my_agent as pruner
#from heuristic_v8 import my_agent as heuristic
#from test_agent_v9 import my_agent as test_agent
from quick_look_v0 import my_agent as quick_pick

In [None]:
#@title subagent
def sub_agent(obs, config, N_STEPS=2, debug=False):

    import numpy as np
    import random
    import time

    ########################### Regular pruner ################
    # constants (given by game)
    ROWS = config.rows
    COLUMNS = config.columns
    CNCTX = config.inarow
    ## coefficients
    A = 2     #my twos
    B = 20    #my threes
    C = 200   #my fours
    D = -1    #opp-twos
    E = -10    #opp-threes
    F = -100   #opp-fours
    
    # vary lookahead depth according to state of play:
    if obs.board.count(0) >= ROWS*COLUMNS//2:
        N_STEPS =      2
    else:
        N_STEPS =      3  # deeper search after half the board is filled

    if debug:
        if obs.board.count(1) == 0:
            print(f'"configuration":{config}')  
        print(f'\n###### Agent Turn {obs.board.count(1):02} ######') 
        print(f'Using {N_STEPS} step lookahead')

    # Gets board at next step if agent drops piece in selected column
    def drop_piece(grid, col, mark):
        next_grid = grid.copy()
        for row in range(ROWS-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid

    # Helper function for get_score: checks if window satisfies heuristic conditions
    def check_window(window, num_discs, piece):
        return (window.count(piece) == num_discs and window.count(0) == CNCTX-num_discs)

    # Helper function for get_score: counts number of windows satisfying specified heuristic conditions
    def count_windows(grid, num_discs, piece):
        num_windows = 0
        # horizontal
        for row in range(ROWS):
            for col in range(COLUMNS-(CNCTX-1)):
                window = list(grid[row, col:col+CNCTX])
                if check_window(window, num_discs, piece):
                    num_windows += 1
        # vertical
        for row in range(ROWS-(CNCTX-1)):
            for col in range(COLUMNS):
                window = list(grid[row:row+CNCTX, col])
                if check_window(window, num_discs, piece):
                    num_windows += 1
        # positive diagonal
        for row in range(ROWS-(CNCTX-1)):
            for col in range(COLUMNS-(CNCTX-1)):
                window = list(grid[range(row, row+CNCTX), range(col, col+CNCTX)])
                if check_window(window, num_discs, piece):
                    num_windows += 1
        # negative diagonal
        for row in range(CNCTX-1, ROWS):
            for col in range(COLUMNS-(CNCTX-1)):
                window = list(grid[range(row, row-CNCTX, -1), range(col, col+CNCTX)])
                if check_window(window, num_discs, piece):
                    num_windows += 1
        return num_windows

    # Helper function for alphabeta: calculates value of heuristic for grid
    def get_score(grid, mark):
        num_fours = count_windows(grid, 4, mark)   #C
        num_fours_opp = count_windows(grid, 4, mark%2+1)  #F
        is_terminal = (num_fours != 0) or (num_fours_opp != 0) or (list(grid[0, :]).count(0) == 0)
        #if debug:
        #    print (list(grid[0, :]).count(0))
        if is_terminal:
            return C*num_fours + F*num_fours_opp, is_terminal
        num_twos = count_windows(grid, 2, mark) #A
        num_threes = count_windows(grid, 3, mark)  #B
        num_twos_opp = count_windows(grid, 2, mark%2+1) #D
        num_threes_opp = count_windows(grid, 3, mark%2+1) #E
        score = A*num_twos + B*num_threes + C*num_fours + D*num_twos_opp + E*num_threes_opp + F*num_fours_opp
        return score, is_terminal

    # Minimax with alphabeta pruning implementation:
    def alphabeta(node, depth, alpha, beta, maximizingPlayer, mark):
        node_score, is_terminal = get_score(node, mark)
        if depth == 0 or is_terminal:
             return node_score

        valid_moves = [c for c in range(COLUMNS) if node[0][c] == 0]
        if maximizingPlayer:
            value = -np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark)
                value = max(value, alphabeta(child, depth-1, alpha, beta, False, mark))
                alpha = max(alpha, value)
                if alpha >= beta:
                    break
            return value

        else: #minimizing player
            value = np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark%2+1)
                value = min(value, alphabeta(child, depth-1, alpha, beta, True, mark))
                beta = min(beta, value)
                if alpha >= beta:
                    break
            return value

    # Uses alphabeta pruning to calculate value
    # of dropping piece in selected column
    def score_move(grid, col, mark, depth):
        next_grid = drop_piece(grid, col, mark)
        score = alphabeta(next_grid, depth-1, -np.Inf, np.Inf, False, mark)
        if debug:
            summary_stats = {
                'column': col,
                'score': score,
            }
            print(f'"summary_stats":{summary_stats}')
        return score
    
    def first_pass(grid, col, mark):
        score, self_is_terminal = get_score(drop_piece(grid, col, mark), mark)
        score, opp_is_terminal = get_score(drop_piece(grid, col, mark%2+1), mark%2+1)
        return self_is_terminal or opp_is_terminal
    
    #########################
    # Agent makes selection #
    #########################

    # Get list of valid moves
    valid_moves = [c for c in range(COLUMNS) if obs.board[c] == 0]

    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(ROWS, COLUMNS)

    # Do a quick pass at depth zero to see if there is a positive terminal node
    quick_pick = False
    for col in valid_moves:
        quick_pick = first_pass(grid, col, obs.mark)
        if quick_pick:
            choice = col   
            if debug:
                print("Column {} is terminal.".format(choice))
            break
    if not quick_pick:   
        # Use the heuristic to assign a score to each possible board in the next step
        scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, N_STEPS) for col in valid_moves]))

        # Get a list of columns (moves) that maximize the heuristic
        max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
        
        #select column in order of preference
        for pref in [3,4,2]: 
            if pref in max_cols:
                choice = pref
                break
            else:
                choice = random.choice(max_cols)
    if debug:
        print("Chosen column:", choice)
    return choice


In [None]:
gp = np.zeros((100,45))
glob_it = 0
gameplay_df = pd.DataFrame(gp, columns=["choice"]+["step"]+["mark"]+['b' + str(x) for x in range(42)],dtype='int64')
gameplay_df

In [None]:
START_TIME = time.time()

debug = True
test_run = debug # Set debug to True to test
agent1 = lambda x,y: debug_agent(x,y, debug=True, df=gameplay_df)
#agent2 = lambda x,y: dba(x,y,debug=False)
agent2 = pruner#test_agent

while test_run:
    env.reset()
    if rd.choice([True, False]):
        env.run([agent1, agent2])
        print("Agent order: [debug_agent, opponent]") 
    else:
        env.run([agent2, agent1])
        print("Agent order: [opponent, debug_agent]")

    # Don't count ties as losses
    if len(env.steps) == 43:
        print('tie')
        break
    elif len(env.steps) % 2 == 1:
        print('--- b1ue agent lost ---')
        break
    else:
        print('+++ b1ue agent won +++')
        break

print('\n###### Game Over ######')
print(f'Game time: {round(time.time()-START_TIME,3)}')
#env.render(mode="ipython", width=500, height=450)

In [None]:
env.render(mode="ipython", width=244, height=250)

# Evaluate your Agent

In [None]:
def get_win_percentages(agent1, agent2, n_rounds=10):
    # Use default Connect Four setup
    import numpy as np
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 3))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 3))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))
    return outcomes

In [None]:
import time
num_episodes = 12
start = time.time()
outcomes = get_win_percentages(agent1, agent2, num_episodes)
end = time.time()
print ("Total time:",round(end-start,3),"\tAvg game time:",round((end-start)/num_episodes,3))


In [None]:
import time
num_episodes = 111
start = time.time()
outcomes = get_win_percentages(test_agent, test_agent_v4, num_episodes)
end = time.time()
print ("Total time:",round(end-start,3),"\tAvg game time:",round((end-start)/num_episodes,3))




*   Agent 1 Win Percentage: 0.48
*   Agent 2 Win Percentage: 0.52
*   Number of Invalid Plays by Agent 1: 0
*   Number of Invalid Plays by Agent 2: 0
*   Total time: 224.808 	Avg game time: 6.812



In [None]:
def mean_reward(rewards):
    return sum(r[0] for r in rewards) / float(len(rewards))

# Run multiple episodes to estimate its performance.
num_episodes = 33
agent1 = test_agent
agent2 = test_agent_v4   #  "negamax"
if debug:
    print("Debug Agent goes first:", mean_reward(evaluate("connectx", [agent1, agent2], num_episodes)))
    print("Opp Agent goes first:", mean_reward(evaluate("connectx", [agent2, agent1], num_episodes)))

# Step through your Agent
Click on any column to place a checker there ("manually select action").

In [None]:
# Play as first position against random agent.
trainer = env.train([None, "negamax"])

observation = trainer.reset()

while debug and not env.done:
    my_action = agent1(observation, env.configuration)
    print("My Action", my_action)
    observation, reward, done, info = trainer.step(my_action)
    env.render(mode="ipython", width=100, height=90, header=False, controls=False)
env.render()

# Write Submission File



In [None]:
import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "a" if os.path.exists(file) else "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

In [None]:
submission_file = 'submission.py'
write_agent_to_file(my_agent, submission_file)

# Validate Submission
Play your submission against itself.  This is the first episode the competition will run to weed out erroneous agents.

Why validate? This roughly verifies that your submission is fully encapsulated and can be run remotely.

In [None]:
# Note: Stdout replacement is a temporary workaround.
import sys
out = sys.stdout
submission = utils.read_file("submission.py")
agent = kaggle_env_agent.get_last_callable(submission)
sys.stdout = out

In [None]:
env = make("connectx", debug=True)
env.run([agent, agent])
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

# Submit to Competition

1. Save this kernel.
2. View the commited version.
3. Go to "Data" section and find submission.py file.
4. Click "Submit to Competition"
5. Go to [My Submissions](https://kaggle.com/c/connectx/submissions) to view your score and episodes being played.