# Install kaggle-environments

In [None]:
# 1. Enable Internet in the Kernel (Settings side pane)

# 2. Curl cache may need purged if v0.1.6 cannot be found (uncomment if needed). 
# !curl -X PURGE https://pypi.org/simple/kaggle-environments

# ConnectX environment was defined in v0.1.6
!pip install 'kaggle-environments>=0.1.6'

# Create ConnectX Environment

In [None]:
from kaggle_environments import evaluate, make, utils

env = make("connectx", debug=True)
env.render()

In [None]:
import random
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import os # for creating directories

# Create an Agent

To create the submission, an agent function should be fully encapsulated (no external dependencies).  

When your agent is being evaluated against others, it will not have access to the Kaggle docker image.  Only the following can be imported: Python Standard Library Modules, gym, numpy, scipy, pytorch (1.3.1, cpu only), and more may be added later.



In [None]:
# evaluate whether player has C pieces in a row on the board
def count_pos(board, n_col, player, C):
    result = 0
    n_row = int(len(board) / n_col)
    # i is row, j is column

    # check rows
    for i in range(n_row):
        count = 0
        for j in range(n_col):
            if board[i*n_col + j] == player:
                count = count + 1
            else:
                count = 0
            if count == C:
                result += 1
    
    # check columns
    for j in range(n_col):
        count = 0
        for i in range(n_row):
            if board[i*n_col + j] == player:
                count = count + 1
            else:
                count = 0
            if count == C:
                result += 1

    # check \ columns
    for offset in range(2*n_col):
        count = 0
        for start in range(0, 2*n_col):
            i = start - n_col
            j = i + offset
            if i < 0 or i >= n_row or j < 0 or j >= n_col:
                continue
            if board[i*n_col + j] == player:
                count = count + 1
            else:
                count = 0
            if count == C:
                result += 1

    # check / columns
    for offset in range(2*n_col):
        count = 0
        for start in range(0, 2*n_col):
            i = start - n_col
            j = offset - i
            if i < 0 or i >= n_row or j < 0 or j >= n_col:
                continue
            if board[i*n_col + j] == player:
                count = count + 1
            else:
                count = 0
            if count == C:
                result += 1
    
    return result

In [None]:
def player_won(board, n_col, player):
    return count_pos(board, n_col, player, 4) >= 1

In [None]:
# update board state when *player places a piece in *pos column
def make_move(board, n_col, pos, player):
    new_board = board.copy()
    n_row = int(len(board) / n_col)
    for i in range(0, n_row):
        idx = n_col * (n_row-1-i) + pos
        if new_board[idx] == 0:
            new_board[idx] = player
            return new_board
    print("Error placing {} on {}", pos, board)
    return board

In [None]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=5000) # double-ended queue; acts like list, but elements can be added/removed from either end
        self.current_memory = deque(maxlen=5000)
        self.gamma = 0.95 # decay or discount rate: enables agent to take into account future actions in addition to the immediate ones, but discounted at this rate
        self.epsilon = 1 # exploration rate: how much to act randomly; more initially than later due to epsilon decay
        self.epsilon_decay = 0.995 # decrease number of random explorations as the agent's performance (hopefully) improves over time
        self.epsilon_min = 0.01 # minimum amount of random exploration permitted
        self.learning_rate = 0.001 # rate at which NN adjusts models parameters via SGD to reduce cost 
        self.model = self._build_model() # private method

    def _build_model(self):
        # neural net to approximate Q-value function:
        model = Sequential()
        model.add(Dense(42, input_dim=self.state_size, activation='relu')) # 1st hidden layer; states as input
        model.add(Dense(24, activation='relu')) # 2nd hidden layer
        model.add(Dense(24, activation='relu')) # 3rd hidden layer
        model.add(Dense(self.action_size, activation='linear')) # output layer = number of possible actions
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model
    
    def remember(self, state, action, reward, next_state, done, n_moves):
        # list of previous experiences, enabling re-training later
        self.current_memory.append((state, action, reward, next_state, done, n_moves))
    
    # method to determine the next action given the current state
    def act(self, state):
        n_col = self.action_size
        valid_moves = [col for col in range(n_col) if state[0][col] == 0]

        # first apply human heuristic to speed up training process       
        # make the move if player can win the game, reward = 10
        for m in valid_moves:
            new_board = make_move(state[0], n_col, m, 1)
            if player_won(new_board, n_col, 1):
                return m, 10

        # make the move if player can prevent opponent from winning the game, reward = 2
        for m in valid_moves:
            new_board = make_move(state[0], n_col, m, 2)
            if player_won(new_board, n_col, 2):
                return m, 2

        # exploration: if acting randomly, take random action
        if np.random.rand() <= self.epsilon: # 
            return random.choice(valid_moves), 0

        # exploitation: if not acting randomly, predict reward value based on current state
        act_values = self.model.predict(state)
        predicted_action = int(np.argmax(act_values[0])) # pick the action that will give the highest reward (i.e., go left or right?)
        if predicted_action not in valid_moves:
            # train model to not predict this action
            self.remember(state, predicted_action, -20, state, True, -1)
            
            # choose new action based on the best out of the valid choices
            valid_filter = [0 if state[0][col] == 0 else -1000 for col in range(self.action_size)]
            predicted_action = int(np.argmax(act_values[0] + valid_filter))

        return predicted_action, 0

    # method that trains NN with experiences sampled from memory
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size) # sample a minibatch from memory
        for state, action, reward, next_state, done, _ in minibatch: # extract data for each minibatch sample
            target = reward # if done (boolean whether game ended or not, i.e., whether final state or not), then target = reward
            if not done: # if not done, then predict future discounted reward
                target = (reward + self.gamma * # (target) = reward + (discount rate gamma) * 
                          np.amax(self.model.predict(next_state)[0])) # (maximum target Q based on future action a')
            target_f = self.model.predict(state) # approximately map current state to future discounted reward
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0) # single epoch of training with x=state, y=target_f; fit decreases loss btwn target_f and y_hat

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def replay_game(self, reward, total_moves):
        for state, action, state_reward, next_state, done, n_moves in self.current_memory: # extract data for each minibatch sample
            target_f = self.model.predict(state) # approximately map current state to future discounted reward
            target = reward if n_moves == -1 else reward * pow(self.gamma, total_moves - n_moves)
            target_f[0][action] = reward * pow(self.gamma, total_moves - n_moves)
            self.model.fit(state, target_f, epochs=1, verbose=0) # single epoch of training with x=state, y=target_f; fit decreases loss btwn target_f and y_hat

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        
        # add to total memory and clear current memory
        self.memory.extend(self.current_memory)
        self.current_memory.clear()
        
    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)


# Debug/Train your Agent

In [None]:
my_agent = DQNAgent(env.configuration.columns * env.configuration.rows, env.configuration.columns)

In [None]:
# Play as first position against random agent.
trainer = env.train([None, "negamax"])
debug = False

batch_size = 32

print(env.configuration)
total = 0
for i in range(0, 300):
    observation = trainer.reset()
    n_moves = 0
    # run a simulation with the current Q-learning agent, then train the model using the result
    while not env.done:
        prev_state = np.reshape(observation.board.copy(), [1, my_agent.state_size])        
        my_action, my_reward = my_agent.act(prev_state)
        if debug:
            print("My Action", my_action, "reward ", my_reward)
        
        observation, reward, done, info = trainer.step(my_action)
        if debug:
            env.render(mode="ipython", width=200, height=180, header=False, controls=False)
        
        next_state = np.reshape(observation.board.copy(), [1, my_agent.state_size]) 
        if done:
            my_reward = reward * 10
        n_moves += 1
        my_agent.remember(prev_state, my_action, my_reward, next_state, done, n_moves)

    my_agent.replay_game(my_reward, n_moves)
    env.render()
    
    # metric for evaluating how good agent is performing
    total += reward
    if i % 10 == 9:
        print("Game ", i, " Average reward: ", total / (i+1))


# Evaluate your Agent

In [None]:
def game_agent(observation, configuration):
    state = np.reshape(observation.board.copy(), [1, my_agent.state_size])        
    return my_agent.act(state)[0]

# Selects random valid column
def agent_random(obs, config):
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    return random.choice(valid_moves)

# Agents play one game round
env.run([game_agent, agent_random])

# Show the game
env.render(mode="ipython")

In [None]:
def mean_reward(rewards):
    return sum(r[0] for r in rewards) / float(len(rewards))

# Run multiple episodes to estimate its performance.
print("My Agent vs Random Agent:", mean_reward(evaluate("connectx", [game_agent, "random"], num_episodes=50)))
print("My Agent vs Negamax Agent:", mean_reward(evaluate("connectx", [game_agent, "negamax"], num_episodes=50)))

# Play your Agent
Click on any column to place a checker there ("manually select action").

In [None]:
# "None" represents which agent you'll manually play as (first or second player).
# env.play([None, "negamax"], width=500, height=450)

# Write Submission File



In [None]:
my_agent.save("model_mc_v0.h5")

In [None]:
import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "a" if os.path.exists(file) else "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

write_agent_to_file(game_agent, "submission.py")

# Validate Submission
Play your submission against itself.  This is the first episode the competition will run to weed out erroneous agents.

Why validate? This roughly verifies that your submission is fully encapsulated and can be run remotely.

In [None]:
# Note: Stdout replacement is a temporary workaround.
#import sys
#out = sys.stdout
#submission = utils.read_file("/kaggle/working/submission.py")
#agent = utils.get_last_callable(submission)
#sys.stdout = out

#env = make("connectx", debug=True)
#env.run([agent, agent])
#print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

# Submit to Competition

1. Commit this kernel.
2. View the commited version.
3. Go to "Data" section and find submission.py file.
4. Click "Submit to Competition"
5. Go to [My Submissions](https://kaggle.com/c/connectx/submissions) to view your score and episodes being played.