# Connect X Project

In this game, your objective is to get a certain number of your checkers in a row horizontally, vertically, or diagonally on the game board before your opponent. When it's your turn, you “drop” one of your checkers into one of the columns at the top of the board. Then, let your opponent take their turn. This means each move may be trying to either win for you, or trying to stop your opponent from winning. The default number is four-in-a-row, but we’ll have other options to come soon.

# Import Allowed Libraries

In [2]:
import scipy
import numpy as np
import random
import time

# Board Class

In [135]:
class Board:
    """Class representing the ConnectX Board"""
    
    def __init__(self, level=None, branch=None, parent=None, state=None, n_rows=6, 
                 n_cols=7, game_type=4, win_state=None):
        
        self.n_rows = n_rows
        self.n_cols = n_cols
        self.game_type = game_type
        self.win_state = win_state
        self.level = level
        self.branch = branch
        self.parent = parent
        
        if state is None:
            self.state = np.zeros(shape=[self.n_rows, self.n_cols])
        else:
            self.state = state
    
            
    def update_board(self, column, player, copy=True):
        """
        Drop the specified player's chip into the chosen column
        
        column: 0 <= column < n_cols
        player: [1, 2]
        
        
        """

        # if already draw/win/lose, dont update the board in that column.
        if self.win_state == 0:
            print('Draw')
            return None
        elif self.win_state == 1:
            print('Player 1 Won')
            return None
        elif self.win_state == 2:
            print('Player 2 Won')
            return None        
        
        # check if all rows in the column are filled
        if self.check_full_col(column=column):
            print(f'Column {column} is full')
            return None
        
        if copy is False:
            for row, row_val in enumerate(self.state[:, column], 0):
                if row_val == 0:
#                     print(f'Player {player}\'s chip dropped onto column {column}, row {row}')
                    self.state[row, column] = player
                    break
        else: 
            state_copy = self.state.copy()
            for row, row_val in enumerate(state_copy[:, column], 0):
                if row_val == 0:
#                     print(f'Player {player}\'s chip dropped onto column {column}, row {row}')
                    state_copy[row, column] = player
                    break
            new_board = Board(state=state_copy) # generate new child board with updated state
            new_board.check_win() # check win_state of new board
            return new_board
        
        # check for win
        if self.check_win():
            self.win_state = 1 # if true, player 1 won
        else:
            self.win_state = 2 # if false, player 2 won
            
        if self.check_draw():
            self.win_state = 0 # if true, board full and draw
        
    def check_vert_line(self):
        """
        Check if there are 4 connected chips in a vertical line.
        """
        
        r_lim = self.n_rows - self.game_type  # limit for which row we check till. past the r_lim row, not possible to extend grid vertically up
        for c in range(self.n_cols):
            for r in range(r_lim + 1):
                cell = self.state[r, c]
                
                if cell == 1: # if p1 chip is in [r, c]
                    vert_group = self.state[r:r+4, c] # isolate the 4-cell rectangle extending up from the p1 chip cell
                    if np.all(vert_group == 1): # if all 4 cells have the same value (p1 chip), return win.
#                         print('Player 1 Wins')
                        self.win_state = 1
                        return None
                        
                elif cell == 2: # if p2 chip is in [r, c]
                    vert_group = self.state[r: r+4, c] # isolate the 4-cell rectangle extending up from the p1 chip cell
                    if np.all(vert_group == 2): # if all 4 cells have the same value (p1 chip), return win.
#                         print('Player 2 Wins')
                        self.win_state = 2
                        return None

#         print('No Vert Lines Found')
        
        
        
    def check_horizontal_line(self):
        """
        Check if there are 4 connected chips in a horizontal line.
        """
        c_lim = self.n_cols - self.game_type # limit for which column we check till. past the c_lim column, not possible to extend grid horizontally to the right
        for c in range(0, c_lim + 1):
            for r in range(self.n_rows):
                cell = self.state[r, c]
                
                if cell == 1: # if p1 chip is in [r, c]
                    hori_group = self.state[r, c:c+4] # isolate the 4-cell rectangle extending up from the p1 chip cell
                    if np.all(hori_group == 1): # if all 4 cells have the same value (p1 chip), return win.
#                         print('Player 1 Wins')
                        self.win_state = 1
                        return None
                        
                elif cell == 2: # if p2 chip is in [r, c]
                    hori_group = self.state[r, c:c+4] # isolate the 4-cell rectangle extending up from the p1 chip cell
                    if np.all(hori_group == 2): # if all 4 cells have the same value (p1 chip), return win.
#                         print('Player 2 Wins')
                        self.win_state = 2
                        return None

#         print('No Horizontal Lines Found')
        

    def check_diagonal_line(self):
        """
        Check if there are 4 connected chips in a diagonal line.
        """

        for c in range(self.n_cols):
            for r in range(self.n_rows):
                cell = self.state[r, c]
                
                if cell == 1:
                    # check / diagonals
                    if not (r + 4 >= self.n_rows) and not (c + 4 > self.n_cols):
                        up_right_group = []
                        for i in range(self.game_type):
                            up_right_group.append(self.state[r+i, c+i])
                        up_right_group = np.array(up_right_group)
                        
                        if np.all(up_right_group == 1):
#                             print('Player 1 Wins')
                            self.win_state = 1
                            return None
                        
                    
                    if not (r - 3 < 0) and not (c + 4 > self.n_cols):
                        down_right_group = []
                        for i in range(self.game_type):
                            down_right_group.append(self.state[r-i, c+i])
                            
                        down_right_group = np.array(down_right_group)
 
                        if np.all(down_right_group == 1):
#                             print('Player 1 Wins')
                            self.win_state = 1
                            return None
                        
                elif cell == 2:
                    # check / diagonals
                    if not (r + 3 >= self.n_rows) and not (c + 4 > self.n_cols):
                        up_right_group = []
                        for i in range(self.game_type):
                            up_right_group.append(self.state[r+i, c+i])
                        up_right_group = np.array(up_right_group)
                        
                        if np.all(up_right_group == 2):
#                             print('Player 2 Wins')
                            self.win_state = 2
                            return None
                        
                    # check \ diagonals
                    if not (r - 3 < 0) and not (c + 4 > self.n_cols):
                        down_right_group = []
                        for i in range(self.game_type):
                            down_right_group.append(self.state[r-i, c+i])
                        down_right_group = np.array(down_right_group)
                        
                        if np.all(down_right_group == 2):
#                             print('Player 2 Wins')
                            self.win_state = 2
                            return None
                        
#         print('No Diagonal Lines Found')
        
    def check_win(self):
        """
        Check if a player has won.
        """
        self.check_vert_line()
        self.check_horizontal_line()
        self.check_diagonal_line()
    
        if self.win_state == 1:
#             print('Player 1 Wins')
            return 1
        elif self.win_state == 2:
#             print('Player 2 Wins')
            return 2
        else:
            return False
        
    def check_draw(self):
        """
        Check if the board is full.
        """
        if np.all(self.state != 0):
            return True
        else:
            return False
    
    def check_full_col(self, column):
        """
        Check if a column is full
        """
        
        if np.all(self.state[:, column] != 0):
            return True
        else:
            return False

# Running Connect X Games

Now that our board class is ready, we can simulate random games by using the methods of the class. Here, we initialise a Connect 4 board and run random moves for 2 players till we have a winner.

In [4]:
board = Board()
i = 1

In [5]:
while board.win_state is None:

    if i % 2 != 0:
        col = random.randint(0, 6)
        while board.check_full_col(col): # check if chosen row is full. if so, change column
            col = random.randint(0, 6)
        board.update_board(column=col, player=1, copy=False)
        i += 1

    elif i % 2 == 0:
        while board.check_full_col(col): # check if chosen row is full. if so, change column
            col = random.randint(0, 6)
        board.update_board(column=col, player=2, copy=False)
        i += 1

    print(board.state)
    print(' ')
    time.sleep(1)

[[0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]]
 
[[0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 2.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]]
 
[[1. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 2.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]]
 
[[1. 0. 0. 0. 0. 0. 1.]
 [2. 0. 0. 0. 0. 0. 2.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]]
 
[[1. 0. 0. 1. 0. 0. 1.]
 [2. 0. 0. 0. 0. 0. 2.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]]
 
[[1. 0. 0. 1. 0. 0. 1.]
 [2. 0. 0. 2. 0. 0. 2.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]]
 
[[1. 0. 0. 1. 1. 0. 1.]
 [2. 0. 0. 2. 0. 0. 2.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.

# Minimax Implementation

In [16]:
from kaggle_environments import evaluate, make, utils

env = make("connectx", debug=True)
env.render(mode='ipython')


In [7]:
print(np.array(env.state[0]['observation']['board']).reshape([6, 7]))

[[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]]


In [8]:
print(env.state)

[{'action': 0, 'reward': 0, 'info': {}, 'observation': {'remainingOverageTime': 60, 'step': 0, 'board': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'mark': 1}, 'status': 'ACTIVE'}, {'action': 0, 'reward': 0, 'info': {}, 'observation': {'remainingOverageTime': 60, 'mark': 2}, 'status': 'INACTIVE'}]


In [155]:
# minimax algorithm referenced from
# https://medium.com/analytics-vidhya/artificial-intelligence-at-play-connect-four-minimax-algorithm-explained-3b5fc32e4a4f

# heuristic reference
# https://github.com/kupshah/Connect-Four/blob/master/player.py

INFINITY = 9999999999999999999 # yes this is infinity fuck you



def heuristic_func(current_node):
    """function to calculate the heuristic value of a node"""
    
    node = Board(state=current_node) # initialise state into a node
    heuristic = 0
#     # immediate 100 if player won; immediate -100 if opponent won
#     if node.check_win() == 1:
#         heuristic = 1000
#         return heuristic
#     elif node.check_win() == 2:
# #         print('FUCK')
#         heuristic = -1000
#         return heuristic
#     elif node.check_win() == False: # if no one won, check if board is full
#         if node.check_draw():
# #             print('draw')
#             heuristic = 0 # if game ends in draw, heuristic immediately is just 0 since neutral result to both
#             return heuristic
    
    # if board is not in win/lose/draw state, score as follows
    # award points for player 1 chips that are together
    for j in range(node.n_cols):
        for i in range(node.n_rows):

            # check vert lines
            if (i + 2 <= node.n_rows) and node.state[i][j] == node.state[i+1][j] == 1:
                heuristic += 1

            if (i + 3 <= node.n_rows) and node.state[i][j] == node.state[i+1][j] == node.state[i+2][j] == 1:
                heuristic += 5
            
            if (i + 4 <= node.n_rows) and node.state[i][j] == node.state[i+1][j] == node.state[i+2][j] == node.state[i+3][j] == 1:
                heuristic += 3000

            if (i + 2 <= node.n_rows) and node.state[i][j] == node.state[i+1][j] == 2:
                heuristic -= 1

            if (i + 3 <= node.n_rows) and node.state[i][j] == node.state[i+1][j] == node.state[i+2][j] == 2:
                heuristic -= 50
                
            if (i + 4 <= node.n_rows) and node.state[i][j] == node.state[i+1][j] == node.state[i+2][j] == node.state[i+3][j] == 2:
                heuristic -= 10000    

            # check horizontal lines
            if (j + 2 <= node.n_cols) and node.state[i][j] == node.state[i][j+1] == 1:
                heuristic += 1

            if (j + 3 <= node.n_cols) and node.state[i][j] == node.state[i][j+1] == node.state[i][j+2] == 1:
                heuristic += 5
            
            if (j + 4 <= node.n_cols) and node.state[i][j] == node.state[i][j+1] == node.state[i][j+2] == node.state[i][j+3] == 1:
                heuristic += 3000

            if (j + 2 <= node.n_cols) and node.state[i][j] == node.state[i][j+1] == 2:
                heuristic -= 1

            if (j + 3 <= node.n_cols) and node.state[i][j] == node.state[i][j+1] == node.state[i][j+2] == 2:
                heuristic -= 50
            
            if (j + 4 <= node.n_cols) and node.state[i][j] == node.state[i][j+1] == node.state[i][j+2] == node.state[i][j+3] == 2:
                heuristic -= 10000

            # check broken horizontals --_- -_--
            
            
            if (j + 4 <= node.n_cols) and (node.state[i][j] == node.state[i][j+1] == node.state[i][j+3] == 1) and (node.state[i][j+2] == 0):
                heuristic += 50
            if (j + 4 <= node.n_cols) and (node.state[i][j] == node.state[i][j+2] == node.state[i][j+3] == 1) and (node.state[i][j+1] == 0):
                heuristic += 50
            
            if (j + 4 <= node.n_cols) and (node.state[i][j] == node.state[i][j+1] == node.state[i][j+3] == 2) and (node.state[i][j+2] == 0):
                heuristic -= 50
                
            if (j + 4 <= node.n_cols) and (node.state[i][j] == node.state[i][j+2] == node.state[i][j+3] == 2) and (node.state[i][j+1] == 0):
                heuristic -= 50
                
            # check / diagonal lines
            if (i + 2 <= node.n_rows) and (j + 2 <= node.n_cols) and node.state[i][j] == node.state[i+1][j+1] == 1:
                heuristic += 1

            if (i + 3 <= node.n_rows) and (j + 3 <= node.n_cols) and node.state[i][j] == node.state[i+1][j+1] == node.state[i+2][j+2] == 1:
                heuristic += 5

            if (i + 4 <= node.n_rows) and (j + 4 <= node.n_cols) and node.state[i][j] == node.state[i+1][j+1] == node.state[i+2][j+2] == node.state[i+3][j+3] == 1:
                heuristic += 3000
                
            if (i + 2 <= node.n_rows) and (j + 2 <= node.n_cols) and node.state[i][j] == node.state[i+1][j+1] == 2:
                heuristic -= 1

            if (i + 3 <= node.n_rows) and (j + 3 <= node.n_cols) and node.state[i][j] == node.state[i+1][j+1] == node.state[i+2][j+2] == 2:
                heuristic -= 50
                
            if (i + 4 <= node.n_rows) and (j + 4 <= node.n_cols) and node.state[i][j] == node.state[i+1][j+1] == node.state[i+2][j+2] == node.state[i+3][j+3] == 2:
                heuristic -= 10000
                
#             # check broken / diagonal lines
#             if (i + 4 <= node.n_rows) and (j + 4 <= node.n_cols) and (node.state[i][j] == node.state[i+1][j+1] == node.state[i+3][j+3] == 2) and (node.state[i+2][j+2] == 0):
#                 heuristic -= 50
#             if (i + 4 <= node.n_rows) and (j + 4 <= node.n_cols) and (node.state[i][j] == node.state[i+2][j+2] == node.state[i+3][j+3] == 2) and (node.state[i+1][j+1] == 0):
#                 heuristic -= 50
                
            # check \ diagonal lines

            if (i - 1 >= 0) and (j + 2 <= node.n_cols) and node.state[i][j] == node.state[i-1][j+1] == 1:
                heuristic += 1

            if (i - 2 >= 0) and (j + 3 <= node.n_cols) and node.state[i][j] == node.state[i-1][j+1] == node.state[i-2][j+2] == 1:
                heuristic += 5
            
            if (i - 3 >= 0) and (j + 4 <= node.n_cols) and node.state[i][j] == node.state[i-1][j+1] == node.state[i-2][j+2] == node.state[i-3][j+3] == 1:
                heuristic += 3000

            if (i - 1 >= 0) and (j + 2 <= node.n_cols) and node.state[i][j] == node.state[i-1][j+1] == 2:
                heuristic -= 1

            if (i - 2 >= 0) and (j + 3 <= node.n_cols) and node.state[i][j] == node.state[i-1][j+1] == node.state[i-2][j+2] == 2:
                heuristic -= 50
            
            if (i - 3 >= 0) and (j + 4 <= node.n_cols) and node.state[i][j] == node.state[i-1][j+1] == node.state[i-2][j+2] == node.state[i-3][j+3] == 2:
                heuristic -= 10000
            
#             # check broken \ diagonal lines
#             if (i - 3 >= 0) and (j + 4 <= node.n_cols) and (node.state[i][j] == node.state[i-1][j+1] == node.state[i-3][j+3] == 2) and (node.state[i-2][j+2] == 0):
#                 heuristic -= 50
#             if (i - 3 >= 0) and (j + 4 <= node.n_cols) and (node.state[i][j] == node.state[i-2][j+2] == node.state[i-3][j+3] == 2) and (node.state[i-1][j+1] == 0):
#                 heuristic -= 50
            
    return heuristic

def minimax(current_node, depth, player):
    node = Board(state=current_node) # initialise Board with state of current node
    
    if depth == 0 or node.win_state: # if at final level of depth, or if node is draw/win/lose (i.e. terminal node)
        value = heuristic_func(node.state)
        return value, -1
    
    if player == 1: # player 1 is us. we want to maximise our heuristic score
        value = -INFINITY
        max_c = None
        for c in range(node.n_cols):
#             print(node.check_full_col(c))
            # generate the children of the node 
            if node.check_full_col(c):
                continue
            elif not node.check_full_col(c):
                child = node.update_board(column=c, player=1, copy=True)
#                 print(child.state)
                child_val, child_c = minimax(child.state, depth-1, player=2)
#                 print(child_val)
                if child_val > value: # find the child with max heuristic
                    value = child_val 
                    max_c = c
#                     print('Player 1')
#                     print(child.state)
                    print(child_val)
        return value, max_c # return smallest heuristic
    
    
    
    else: # if player 2 (opponent), they want to minimise the heuristic score.
        value = INFINITY
        min_c = None
        for c in range(node.n_cols):
            if node.check_full_col(c):
                continue
            elif not node.check_full_col(c):
                child = node.update_board(column=c, player=2, copy=True)
                child_val, child_c = minimax(child.state, depth-1, player=1)
                if child_val < value:
                    value = child_val
                    min_c = c
#                 print('Player 2')
#                 print(child.state)
#                 print(child_val)
        return value, min_c

In [156]:
def my_agent(observation, configuration):
#     from random import choice
#     return choice([c for c in range(configuration.columns) if observation.board[c] == 0])
    n_rows = configuration['rows']
    n_cols = configuration['columns']
    root_node = np.flipud(np.array(observation['board']).reshape((n_rows, n_cols)))

    player = observation.mark # whenever i receive observation, its my turn
    
    value_of_move, best_move = minimax(root_node, depth=3, player=player) # player 1 wants max heuristic
    print('final', best_move, value_of_move)
    return best_move
    
    
    

In [180]:
# Play as first position against random agent.
trainer = env.train([None, "negamax"])

observation = trainer.reset()

while not env.done:
    for c in range(7):
        print(Board(state=np.flipud(np.array(observation['board']).reshape((6, 7)))).check_full_col(c))
    my_action = my_agent(observation, env.configuration)
    print("My Action", my_action)
    observation, reward, done, info = trainer.step(my_action)
    
    # env.render(mode="ipython", width=100, height=90, header=False, controls=False)
env.render(mode='ipython')

False
False
False
False
False
False
False
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
final 0 1
My Action 0
False
False
False
False
False
False
False
1
3
7
7
7
7
6
6
3
1
3
7
51
3
7
51
3
3
7
3
7
51
2
6
50
2
6
50
0
7
51
1
2
1
7
51
1
7
1
7
51
0
6
50
0
6
50
0
51
1
1
1
51
1
51
0
50
0
50
0
1
1
1
1
1
0
0
0
1
1
1
1
1
0
0
0
1
1
1
1
1
0
1
final 0 3
My Action 0
False
False
False
False
False
False
False
7
9
3013
3012
3012
2962
2962
3012
9
3
6
9
9
8
8
-42
2
-42
2
8
1
9
7
6
8
6
8
-44
-42
0
-44
-42
0
6
8
1
3
7
53
6
51
7
-44
0
-44
1
6
1
3
7
6
6
6
6
6
1
3
7
6
6
6
6
6
1
3
7
6
6
-44
1
-44
0
7
final 0 9
My Action 0
False
False
False
False
False
False
False
9
8
10
7
8
7
8
-43
-42
6
-43
-42
6
7
8
6
8
12
14
9
11
15
8
12
8
12
14
-42
-38
-36
8
-42
-38
-36
8
8
12
14
8
6
14
7
16
6
14
6
14
-44
-36
6
-44
-36
6
6
14
6
8
7
60
6
8
57
7
9
-44
-42
6
-44
-42
7
6
8
6
8
7
10
6
8
6
8
6
8
6
8
6
8
6

In [170]:
print(np.array(observation.board).reshape((6, 7)))

[[0 0 0 0 0 0 2]
 [1 2 0 0 0 1 2]
 [2 1 2 2 0 2 1]
 [1 2 1 1 2 2 2]
 [1 1 1 2 1 1 2]
 [1 1 1 2 1 2 2]]


In [176]:
def mean_reward(rewards):
    return sum(r[0] for r in rewards) / float(len(rewards))

# Run multiple episodes to estimate its performance.
print("My Agent vs Random Agent:", mean_reward(evaluate("connectx", [my_agent, "random"], num_episodes=10)))
print("My Agent vs Negamax Agent:", mean_reward(evaluate("connectx", [my_agent, "negamax"], num_episodes=10)))

My Agent vs Random Agent: 1.0
My Agent vs Negamax Agent: 0.6


In [143]:
# "None" represents which agent you'll manually play as (first or second player).
env.play([None, "negamax"], width=500, height=450)

In [181]:
import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "a" if os.path.exists(file) else "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

write_agent_to_file(my_agent, "submission.py")

<function my_agent at 0x000001A6D1EFD550> written to submission.py


In [94]:
# Note: Stdout replacement is a temporary workaround.
import sys
out = sys.stdout
submission = utils.read_file("submission.py")
agent = utils.get_last_callable(submission)
sys.stdout = out

env = make("connectx", debug=True)
env.run([agent, agent])
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

AttributeError: module 'kaggle_environments.utils' has no attribute 'get_last_callable'