In [None]:
# define setup
run = 'V015'
learner = 'valid_moves' # valid_moves to include double check; self_learner to try let agent learn himself which moves is valid
model_type = 'cnn'
train = True
evaluation = True
starting = True
callbacks = False

In [None]:
import sys
sys.path.append(r'c:\users\richa\.conda\envs\connectx_37\lib\site-packages')

# disable infos and warnings
import warnings
warnings.filterwarnings("ignore", message=r"Passing", category=FutureWarning)
import tensorflow.python.util.deprecation as deprecation
deprecation._PRINT_DEPRECATION_WARNINGS = False

In [None]:
import random
import numpy as np
import matplotlib.pyplot as plt

import gym
from kaggle_environments import evaluate, make, utils

In [None]:
from keras.models import Model, Sequential
from keras.layers import Dense, Dropout, MaxPooling2D, Activation, Flatten, Input, Conv2D, BatchNormalization
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, EarlyStopping

In [None]:
from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

In [None]:
# setup connect x environment
env = make("connectx", debug=True)
print(list(env.agents))

In [None]:
env.run(["negamax", "random"])
env.render(mode="ipython")

In [None]:
def my_agent(observation, config):
    # define grid to play
    grid = np.asarray(observation.board).reshape(config.rows, config.columns)
    grid = np.expand_dims(grid, 0)
    grid = np.expand_dims(grid, 0)

    move = model.predict(grid)
    move = int(np.argmax(move))
    return move

In [None]:
def my_agent_val(observation, config):
    valid_moves = [col for col in range(config.columns) if observation.board[col] == 0]
    
    # define grid to play
    grid = np.asarray(observation.board).reshape(config.rows, config.columns)
    grid = np.expand_dims(grid, 0)
    grid = np.expand_dims(grid, 0)

    
    move = model.predict(grid)
    move = int(np.argmax(move))
    
    if move in valid_moves:
        move = move
    else:
        move = random.choice(valid_moves)
        
    return move

In [None]:
class ConnectFour(gym.Env):
    def __init__(self, learner, starting):
        # initialize environment with oppenent agent to play against
        if learner == 'self_learner':
            agent2 = my_agent
        elif learner == 'valid_moves':
            agent2 = my_agent_val
        ks_env = make("connectx", debug=True)
        if starting:
            self.env = ks_env.train([None, agent2])
            self.mark = 1
            self.opp_mark = 2
        else:
            self.env = ks_env.train([agent2, None])
            self.mark = 2
            self.opp_mark = 1
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = gym.spaces.Discrete(self.columns)
        self.observation_space = gym.spaces.Box(low=0, high=2, 
                                            shape=(1, self.rows, self.columns), dtype=np.int)
        
        # Tuple corresponding to the min and max possible rewards
        self.min_reward = -30 # penalization
        self.max_reward = 10
        self.reward_range = (self.min_reward, self.max_reward)
        
    """grid functions
    functions to check if multiple own marks were achieved or multiple opp_marks were stopped
    """
    def check_left(self, grid, row, action, mark, opp_mark):
        """get reward - in case action was set on very left, pass and leave reward as it is"""
        reward = 1/42
        # grid = np.asarray(self.obs["board"]).reshape(self.rows, self.columns)
        if grid[row, action-1] == mark and action-1 >= 0:
            reward = 1/3
        else:
            pass
        if grid[row, action-1] == mark and grid[row, action-2] == mark and action-2 >= 0:
            reward = 1
        elif grid[row, action-1] == opp_mark and grid[row, action-2] == opp_mark and action-2 >= 0:
            reward = 1
        else:
            pass
        return reward
    
    def check_right(self, grid, row, action, mark, opp_mark):
        """get reward - in case of index error, mark was set on very right which can not result in multiple marks to right 
        but was also not a mistake (simply a move on the righthandside) --> set reward to 1/42"""
        reward = 1/42
        try:
            if grid[row, action+1] == mark:
                reward = 1/3
            else:
                pass
        except IndexError:
            pass
        try:
            if grid[row, action+1] == mark and grid[row, action+2] == mark:
                reward = 1
            elif grid[row, action+1] == opp_mark and grid[row, action+2] == opp_mark:
                reward = 1
            else:
                pass
        except IndexError:
            pass
        return reward
    
    def check_bottom(self, grid, row, action, mark, opp_mark):
        """get reward - in case of index error, mark was set on bottom which can not result in multiple marks to bottom
        but was also not a mistake (simply the beginning of the game) --> set reward to 1/42"""
        reward = 1/42
        try:
            if grid[row+1, action] == mark:
                reward = 1/3
            else:
                pass
        except IndexError:
            pass
        try:
            if grid[row+1, action] == mark and grid[row+2, action] == mark:
                reward = 1
            elif grid[row+1, action] == opp_mark and grid[row+2, action] == opp_mark:
                reward = 1
            else:
                pass
        except IndexError:
            pass
        return reward
    
    def check_bottom_left(self, grid, row, action, mark, opp_mark):
        """get reward - in case of index error, mark was set on bottom which can not result in que to bottom but was also not
        a mistake (simply the beginning of the game) --> set reward to 1/42"""
        reward = 1/42
        try:
            if grid[row+1, action-1] == mark and action-2 >= 0:
                reward = 1/3
            else:
                pass
        except IndexError:
            pass
        try:
            if grid[row+1, action-1] == mark and grid[row+2, action-2] == mark and action-2 >= 0:
                reward = 1
            elif grid[row+1, action-1] == opp_mark and grid[row+2, action-2] == opp_mark and action-2 >= 0:
                reward = 1
            else:
                pass
        except IndexError:
            pass
        return reward
        
    def check_bottom_right(self, grid, row, action, mark, opp_mark):
        """get reward - in case of index error, mark was set on bottom which can not result in que to bottom but was also not
        a mistake (simply the beginning of the game) --> set reward to 1/42"""
        reward = 1/42
        try:
            if grid[row+1, action+1] == mark:
                reward = 1/3
            else:
                pass
        except IndexError:
            pass
        try:
            if grid[row+1, action+1] == mark and grid[row+2, action+2] == mark:
                reward = 1
            elif grid[row+1, action+1] == opp_mark and grid[row+2, action+2] == opp_mark:
                reward = 1
            else:
                pass
        except IndexError:
            pass
        return reward
    
    def check_top_left(self, grid, row, action, mark, opp_mark):
        """get reward - in case mark was set to very top and very left, pass and leave reward as it is"""
        reward = 1/42
        if grid[row-1, action-1] == mark and row-1 >= 0 and action-1 >= 0:
            reward = 1/3
        else:
            pass
        if grid[row-1, action-1] == mark and grid[row-2, action-2] == mark and row-2 >= 0 and action-2 >= 0:
            reward = 1
        elif grid[row-1, action-1] == opp_mark and grid[row-2, action-2] == opp_mark and row-2 >= 0 and action-2 >= 0:
            reward = 1
        else:
            pass
        return reward
        
    def check_top_right(self, grid, row, action, mark, opp_mark):
        """get reward - in case mark was set to very top and very right, pass and leave reward as it is"""
        reward = 1/42
        try:
            if grid[row-1, action+1] == mark and row-1 >= 0:
                reward = 1/3
            else:
                pass
        except IndexError:
            pass
        try:
            if grid[row-1, action+1] == mark and grid[row-2, action+2] == mark and row-2 >= 0:
                reward = 1
            elif grid[row-1, action+1] == opp_mark and grid[row-2, action+2] == opp_mark and row-2 >= 0:
                reward = 1
            else:
                pass
        except IndexError:
            pass
        return reward

    def check_grid_reward(self, action, mark, opp_mark):
        # initialize state of game
        grid = np.asarray(self.obs['board']).reshape(self.rows, self.columns)
        action_col = grid[:,action] # to define marks and later action_row
        marks = np.squeeze(np.asarray(np.where(action_col==mark))) # own marks in action col
        if marks.size > 1: # get row of top action mark --> action row
            action_row = marks[0]
        elif marks.size == 1:
            action_row = marks
        else: # start of game, does not apply with env rules
            action_row = False
        
        grid_reward = 0
        grid_reward = grid_reward + self.check_left(grid, action_row, action, mark, opp_mark)
        grid_reward = grid_reward + self.check_right(grid, action_row, action, mark, opp_mark)
        grid_reward = grid_reward + self.check_bottom(grid, action_row, action, mark, opp_mark)
        grid_reward = grid_reward + self.check_bottom_left(grid, action_row, action, mark, opp_mark)
        grid_reward = grid_reward + self.check_bottom_right(grid, action_row, action, mark, opp_mark)
        grid_reward = grid_reward + self.check_top_right(grid, action_row, action, mark, opp_mark)
        grid_reward = grid_reward + self.check_top_left(grid, action_row, action, mark, opp_mark)

        return grid_reward
    
    """setting functions"""    
    def reset(self):
        self.obs = self.env.reset()
        return np.array(self.obs['board']).reshape(self.rows,self.columns)
    
    def change_reward(self, old_reward, done, action):
        if old_reward == 1: # The agent won the game
            return self.max_reward
        elif done: # The opponent won the game
            return self.max_reward * (-1)
        else: # get grid reward (self made function --> reward is dependent on sourroundings)
            grid_reward = self.check_grid_reward(action, self.mark, self.opp_mark)
            return grid_reward
        
    def step(self, action):
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.env.step(int(action))
            reward = self.change_reward(old_reward, done, action)
        else: # End the game and penalize agent --> not valid move
            reward, done, _ = self.min_reward, True, {}
#         print(f'printing reward: {reward}')
        return np.array(self.obs['board']).reshape(self.rows,self.columns), reward, done, _
    

In [None]:
class DQN_AGENT:
    def __init__(self, env, model_type,
                 target_model_update, lr, num_steps, policy, memory_limit, window_length, steps_warmup):
        self.env = env
        self.states = self.env.observation_space.shape
        self.num_actions = self.env.action_space.n
        self.target_model_update = target_model_update
        self.lr = lr
        self.optimizer = Adam(self.lr)
        self.num_steps = num_steps
        self.memory_limit = memory_limit
        self.window_length = window_length
        self.steps_warmup = steps_warmup
        self.checkpoint_path = 'log/checkpoints/' + model_type + '_model_' + version + '.hdf5'
        self.weights_path = 'log/weights/' + model_type + '_model_' + version + '.hdf5'
        self.plot_path = 'log/plots/' + model_type + '_model_' + version


    def build_conv_model(self):
        input_layer = Input(shape=(self.states))
        x = Conv2D(filters=16, kernel_size=5, strides=(3, 3), padding='same',
                   data_format='channels_first', activation='tanh')(input_layer)
        x = BatchNormalization(axis=1)(x)
        x = Conv2D(filters=8, kernel_size=5, strides=(3, 3), padding='same',
                   data_format='channels_first', activation='tanh')(x)
        x = BatchNormalization(axis=1)(x)
        x = Conv2D(filters=8, kernel_size=5, strides=(3, 3), padding='same',
                   data_format='channels_first', activation='tanh')(x)
        x = BatchNormalization(axis=1)(x)
        x = Conv2D(filters=16, kernel_size=5, strides=(3, 3), padding='same',
                   data_format='channels_first', activation='tanh')(x)

        x = Flatten()(x)
        # x = Dense(16, activation='relu')(x)
        output_layer = Dense(self.num_actions)(x)

        self.model = Model(inputs=input_layer, outputs=output_layer)
        # print(self.model.summary())
        return self.model

    def build_dense_model(self):
        input_layer = Input(shape=(self.states))
        x = Flatten()(input_layer)
        x = Dense(32, activation='relu')(x)
        x = Dense(16, activation='relu')(x)
        output_layer = Dense(self.num_actions, activation='linear')(x)
        self.model = Model(inputs=input_layer, outputs=output_layer)
        print(self.model.summary())
        return self.model
    
    def build_callbacks(self):
        checkpoint = ModelCheckpoint(filepath=self.checkpoint_path, save_weights_only=True, monitor='episode_reward', mode='max',
                 save_best_only=True, verbose=1)
        stopper = EarlyStopping(monitor='episode_reward', min_delta=0, patience=5, verbose=1, mode='max',
                                baseline=None, restore_best_weights=False)
        # self.callbacks = [checkpoint, stopper]
        self.callbacks = [checkpoint]
        return self.callbacks
    
    def build_agent(self):
        self.policy = BoltzmannQPolicy()
        self.memory = SequentialMemory(limit=self.memory_limit, window_length=self.window_length)
        self.dqn = DQNAgent(model=self.model, memory=self.memory, policy=self.policy,
                            nb_actions=self.num_actions, nb_steps_warmup=self.steps_warmup,
                            target_model_update=self.target_model_update)
        return self.dqn
    
    def train_agent(self, verbose):
        self.dqn.compile(self.optimizer, metrics=['mae'])
        if callbacks:
            self.history = self.dqn.fit(self.env, nb_steps=self.num_steps, callbacks=self.callbacks, visualize=False,
                                    verbose=verbose)
        else:
            self.history = self.dqn.fit(self.env, nb_steps=self.num_steps, visualize=False, verbose=verbose)
        self.dqn.save_weights(self.weights_path, overwrite=True)
        
        # plot rewards of each game
        plt.plot(self.history.epoch, self.history.history['episode_reward'])
        plt.title('Reward over the Games played')
        plt.xlabel('Epochs')
        plt.ylabel('Reward')
        plt.show()
        plt.savefig(self.plot_path + '_games_reward.png')
        
        # plot mean reward over the last 100 games
        print(f'target model update: {target_model_update}\
                learing rate: {lr}\
                memory limit: {memory_limit}')
        mean_rewards = []
        for i in range(0, len(self.history.history['episode_reward']), 100):
            mean_rewards.append(np.mean(self.history.history['episode_reward'][i:i+99]))
        plt.plot(list(range(len(mean_rewards))), mean_rewards)
        plt.title('Average Reward over the last 100 Games played')
        plt.xlabel('Number of 100 Games')
        plt.ylabel('Average Reward')
        plt.show()
        plt.savefig(self.plot_path + '_avg_reward.png')
        
        return self.history
    
    def load_weights(self):
        self.model.load_weights(self.weights_path)
        return self.model
    

In [None]:
# This agent random chooses a non-empty column.
def random_agent(observation, config):
    valid_moves = [col for col in range(config.columns) if observation.board[col] == 0]
    move = random.choice(valid_moves)
    return move

In [None]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

In [None]:
"""Train Agent"""
if train:
    env = ConnectFour(learner, starting)

    # define grid
    target_model_updates = [0.01, 0.001]
    lrs = [0.01, 0.001]
    memory_limits = [1_000, 10_000]
    num_opp_updates = [5, 10]
    
    for i, (target_model_update, lr, memory_limit, num_opp_update) in enumerate(
        zip(target_model_updates, lrs, memory_limits, num_opp_updates)):
        version = run + '_' + str(i)

        # define model, define agent, train agent and adapt weights
        DQN = DQN_AGENT(env, model_type,
                        target_model_update=target_model_update, lr=lr, num_steps=99_800, policy=BoltzmannQPolicy,
                        memory_limit=memory_limit, window_length=1, steps_warmup=100)
        
        # update model to train against
        for update_opp in range(num_opp_update):
            print(f'opponent has now been updatet: {update_opp} times')
            # initialize weights randomly for first run
            if update_opp == 0:
                if model_type == 'cnn':
                    model = DQN.build_conv_model()
                elif model_type == 'dense':
                    model = DQN.build_dense_model()
                    
                    
            # load next state after training for at least one full run
            elif update_opp > 0:
                if model_type == 'cnn':
                    model = DQN.build_conv_model()
                    DQN.load_weights()
                elif model_type == 'dense':
                    model = DQN.build_dense_model()
                    DQN.load_weights()
            else:
                pass

            if callbacks:
                callbacks = DQN.build_callbacks()
            else:
                pass

            # build agent and start training
            dqn = DQN.build_agent()
            history = DQN.train_agent(verbose=0)            
else:
    pass

In [None]:
"""Evaluate Agent"""
if evaluation:
    env = ConnectFour(learner, starting)
    
    # set values from best run of grid search
    target_model_update = 0.001
    lr = 0.001
    memory_limit = 10_000
    version = run + '_' + '1'

    DQN = DQN_AGENT(env, model_type,
                    target_model_update=target_model_update, lr=lr, num_steps=1, policy=BoltzmannQPolicy,
                    memory_limit=memory_limit, window_length=1, steps_warmup=1)
    
    if model_type == 'cnn':
        model = DQN.build_conv_model()
    elif model_type == 'dense':
        model = DQN.build_dense_model()
    else:
        pass
    
    dqn = DQN.build_agent()
    model = DQN.load_weights()

    # Create the game environment
    env = make("connectx", debug=True)
    
    # initialize agent
    if learner == 'self_learner':
        agent = my_agent
    elif learner == 'valid_moves':
        agent = my_agent_val

    # play against random agent
    env.run([agent, 'random'])

    # Show the game
    env.render(mode="ipython")

    # evaluate performance over n games
    if starting:
        print('win percentages against random agent when starting')
        get_win_percentages(agent, 'random', n_rounds=100)
        print('\nwin percentages against negamax when starting')
        get_win_percentages(agent, 'negamax', n_rounds=50)
    else:
        print('win percentages against random agent when going second')
        get_win_percentages('random', agent, n_rounds=100)
        print('\nwin percentages against negamax when going second')
        get_win_percentages('negamax', agent, n_rounds=50)
else:
    pass

In [None]:
# play against my agent
env.play([None, my_agent], width=500, height=450)