In [2]:
import numpy as np
import gym
import random
import matplotlib.pyplot as plt
from random import choice
from tqdm.notebook import tqdm
from kaggle_environments import evaluate, make
import pdb

# Create ConnectX Environment

In [3]:
from kaggle_environments import evaluate, make, utils

env = make("connectx", debug=True)
env.render()

+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+



# SparseQTable Implementation

In [6]:
env.configuration

{'episodeSteps': 1000,
 'agentExec': 'LOCAL',
 'agentTimeout': 10,
 'actTimeout': 5,
 'runTimeout': 600,
 'columns': 7,
 'rows': 6,
 'inarow': 4,
 'timeout': 5}

In [7]:
class SparseQTable():
    def __init__(self, n_actions):
        self.n_actions = n_actions
        self.table = dict()
        self.fine_tune = False
        
    def __getitem__(self, key):
        if hasattr(self.table, key): return self.table[key]
        elif not self.fine_tune:
            self.table[key] = np.zeros((self.n_actions))
            return self.table[key]
        else:
            return np.zeros((self.n_actions))
    def __setitem__(self, idx, val):
        self[idx] = val
    
    def __repr__(self):
        return str(self.table)

In [8]:
def get_state_key(state): return str(state.board)[1:-1].replace(', ', '')

In [9]:
def get_rand_choice(cols, state): return random.choice([c for c in range(cols) if state.board[c] == 0])

In [10]:
states = []

In [11]:
def my_agent(state, configuration):
    state_key = get_state_key(state)
    if hasattr(q_table, state_key):
        c = np.argmax(q_table[state_key])
        if state.board[c] == 0: return action
        else: return get_rand_choice(configuration.columns, state)
    else:
        return get_rand_choice(configuration.columns, state)
        
    
    

In [12]:
cols = env.configuration.columns
q_table = SparseQTable(cols)

In [None]:
alpha = 0.1
gamma = 0.6
epsilon = 0.99
min_epsilon = 0.1
epsilon_decay_rate = 0.9

# Play as first position against random agent.
trainer = env.train([None, "negamax"])

for i in tqdm(range(10000)):
    state = trainer.reset()
    done = False
    epsilon = max(min_epsilon, epsilon * epsilon_decay_rate)
    while not done:
        state_key = get_state_key(state)
        
        if random.uniform(0,1) < epsilon:
            action = get_rand_choice(cols, state)
        else:
            c = np.argmax(q_table[state_key])
            action = c if state.board[c] == 0 else get_rand_choice(cols, state)
            
        next_state, reward, done, info = trainer.step(int(action))
        
        if done:
            if reward == 0:
                reward = -20
            elif reward == 1:
                reward = 20
        else:
            reward = -0.05
            
        next_state_key = get_state_key(next_state)
        q = q_table[state_key][action]
        next_q = np.max(q_table[next_state_key])
        q_table[state_key][action] = (1-alpha)*q + alpha*(reward + gamma * next_q)
        state = next_state

HBox(children=(IntProgress(value=0, max=10000), HTML(value='')))

In [17]:
q_table.fine_tune = True
for i in tqdm(range(10000)):
    state = trainer.reset()
    while not env.done:
        state_key = get_state_key(state)
        if random.uniform(0,1) < epsilon:
            c = np.argmax(q_table[state_key])
            action = c if state.board[c] == 0 else get_rand_choice(cols, state)
        else:
            action = get_rand_choice(cols, state)

        next_state, reward, done, info = trainer.step(int(action))
        
        if reward == 0:
            reward = -20
        elif reward == 1:
            reward = 20
        else:
            reward = -0.05
        next_state_key = get_state_key(next_state)
        q = q_table[state_key][action]
        next_q = np.max(q_table[next_state_key])
        q_table[state_key][action] = (1-alpha)*q + alpha*(reward + gamma * next_q)
        state = next_state

HBox(children=(IntProgress(value=0, max=10000), HTML(value='')))




In [26]:
q_table

{'000000000000000000000000000000000000000000': array([ 0.,  0.,  0.,  0., -2.,  0.,  0.]), '000000000000000000000000000000000000120000': array([ 0., -2.,  0.,  0.,  0.,  0.,  0.]), '000000000000000000000000000000200001120000': array([ 0.,  0.,  0.,  0.,  0., -2.,  0.]), '000000000000000000000000000021200001120000': array([-2.,  0.,  0.,  0.,  0.,  0.,  0.]), '000000000000000000000100000021200001122000': array([ 0., -2.,  0.,  0.,  0.,  0.,  0.]), '000000000000000000000112000021200001122000': array([ 0.,  0.,  0.,  0.,  0.,  0., -2.]), '000000000000000000000112000021220001122001': array([0., 2., 0., 0., 0., 0., 0.]), '000000000000000100000112000021220001122001': array([0., 0., 0., 0., 0., 0., 0.]), '000000000000000000000000000000000000002100': array([ 0.,  0.,  0.,  0.,  0., -2.,  0.]), '000000000000000000000000000000000000002112': array([ 0.,  0.,  0.,  0.,  0.,  0., -2.]), '000000000000000000000000000000002000012112': array([ 0., -2.,  0.,  0.,  0.,  0.,  0.]), '0000000000000000000000

# Evaluate your Agent

In [37]:
def mean_reward(rewards):
    return sum(r[0] for r in rewards) / float(len(rewards))

# Run multiple episodes to estimate its performance.
print("Random Agent vs Random Agent:", mean_reward(evaluate("connectx", ["random", "random"], num_episodes=10)))
print("My Agent vs Negamax:", mean_reward(evaluate("connectx", [my_agent, "negamax"], num_episodes=10)))

Random Agent vs Random Agent: 0.2
My Agent vs Negamax: -0.8


# Play your Agent
Click on any column to place a checker there ("manually select action").

In [44]:
# "None" represents which agent you'll manually play as (first or second player).
env.play([None, my_agent], width=500, height=450)

# Write Submission File



In [13]:
import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "a" if os.path.exists(file) else "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

write_agent_to_file(my_agent, "submission.py")

<function my_agent at 0x10b60e8c8> written to submission.py


In [14]:
!cat submission.py

def my_agent(observation, configuration):
    from random import choice
    return choice([c for c in range(configuration.columns) if observation.board[c] == 0])


# Validate Submission
Play your submission against itself.  This is the first episode the competition will run to weed out erroneous agents.

Why validate? This roughly verifies that your submission is fully encapsulated and can be run remotely.

In [16]:
# Note: Stdout replacement is a temporary workaround.
import sys
out = sys.stdout
submission = utils.read_file("./submission.py")
agent = utils.get_last_callable(submission)
sys.stdout = out

env = make("connectx", debug=True)
env.run([agent, agent])
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")


Success!


# Submit to Competition

1. Commit this kernel.
2. View the commited version.
3. Go to "Data" section and find submission.py file.
4. Click "Submit to Competition"
5. Go to [My Submissions](https://kaggle.com/c/connectx/submissions) to view your score and episodes being played.