### Libraries Imports

In [2]:
import os
import time
import copy
import random
from IPython.display import clear_output
from types import SimpleNamespace
import concurrent.futures

### Class Definations

In [4]:
class ConnectX:
    # { INITIALIZATION } -------------------------------------------------------------------------
    def __init__(self, inarow, columns = 0, rows = 0, board = None, turn = 1, last_checker = None):
        self.PLAYER1 = 1
        self.PLAYER2 = 2
        self.NONE = 0
        
        self.inarow = inarow
        if board:
            self.columns = len(board[0])
            self.rows = len(board)
            if board != None:
                self.board = copy.deepcopy(board)
            else:
                self.board = [[self.NONE for i in range(self.columns)] for j in range(self.rows)]
            self.turn = turn
            self.last_checker = copy.deepcopy(last_checker) if last_checker is not None else [-1, -1]
        else: 
            self.columns = columns
            self.rows = rows
            self.board = [[self.NONE for i in range(columns)] for j in range(rows)]
            self.turn = self.PLAYER1
            self.last_checker = [-1, -1]    
    def __str__(self):
        def map_value(value):
            if value == 0:
                return ' '
            elif value == 1:
                return 'X'
            elif value == 2:
                return 'O'
            else:
                return str(value)
        return '\n'.join([' '.join(map(map_value, row)) for row in self.board[::-1]])  
    # { BASIC FUNCTIONs } ------------------------------------------------------------------------
    def print(self, mode ='simplified', mark_last_checker = True, clear = True, sleep = 1, human_mode = False):
        def map_value(value):
            if value == 0:
                return '   '
            elif value == 1:
                return ' X '
            elif value == 2:
                return ' O '
            else:
                return str(value)
        if clear:
            clear_output(wait=True)
        
        display_board = [row[:] for row in self.board]

        if mark_last_checker and hasattr(self, 'last_checker') and self.last_checker:
            x, y = self.last_checker
            if display_board[x][y] == 1:
                display_board[x][y] = '[X]'
            elif display_board[x][y] == 2:
                display_board[x][y] = '[O]'
        
        if mode == 'numerical':
            board = '\n'.join([' '.join(map(str, row)) for row in display_board[::-1]])
        elif mode == 'simplified':
            board = '\n'.join([' '.join(map(map_value, row)) for row in display_board[::-1]])
        else:
            board = '\n'.join([' '.join(map(map_value, row)) for row in display_board[::-1]])
            
        if human_mode:
            column_label = ''
            for c in range(self.columns):
                column_label = column_label + '[' + str(c) + '] '
            board = column_label + '\n' + board + '\n' + column_label
        print(board)
        time.sleep(sleep)
    def clear(self):
        self.board = [[self.NONE for i in range(self.columns)] for j in range(self.rows)]
        self.last_checker = [-1, -1]
        self.turn = self.PLAYER1
    def random(self):
        total_drop_checkers = random.randint(0, self.columns * self.rows)
        for i in range(total_drop_checkers):
            self.drop_checker(random.randint(0, self.columns - 1), self.turn)
    # { AGENT MOVEMENTs } ------------------------------------------------------------------------
    def drop_checker(self, column, player = None):
        if player is None:
            player = self.turn
        for i in range(self.rows):
            if self.board[i][column] == 0:
                self.board[i][column] = player
                self.last_checker = [i, column]
                self.turn = self.PLAYER1 if self.turn == self.PLAYER2 else self.PLAYER2
                return i
        
        return -1
    def retrieve_last_checker(self):
        if self.last_checker[0] != -1 and self.last_checker[1] != -1:
            self.board[self.last_checker[0]][self.last_checker[1]] = 0
            self.last_checker = [-1, -1]
            self.turn = self.PLAYER1 if self.turn == self.PLAYER2 else self.PLAYER2
    # { BOARD STATEs } ---------------------------------------------------------------------------
    def pervious_state(self):
        pervious_state = ConnectX(inarow = self.inarow, board = self.board, turn = self.turn, last_checker = self.last_checker)
        pervious_state.retrieve_last_checker()
        return pervious_state     
    def current_state(self, score_calculation_type = 'NONE', verbose = False): 
        current_instance = ConnectX(inarow = self.inarow, board = self.board, turn = self.turn)  
        return self.calculate_instance_scores(current_instance, score_calculation_type, verbose)
    def next_states(self, score_calculation_type = 'NONE', verbose = False):
        next_states = []
        for drop_column in range(self.columns):
            next_instance = ConnectX(inarow = self.inarow, board = self.board, turn = self.turn, last_checker = self.last_checker)
            drop_row = next_instance.drop_checker(drop_column)
            #IGNORE INVALID MOVES
            if drop_row == -1:
                continue
            
            next_state = self.calculate_instance_scores(next_instance, score_calculation_type, verbose)
            next_states.append(next_state)
        if verbose:
            return None
        else:
            return next_states
   # { STATE SPACE SCORE CALCULATION } ----------------------------------------------------------
    def score(self, score_calculation_type = 'NONE'): 
        if score_calculation_type == 'last_checker_longest_connection':
            return self.last_checker_longest_connection()
        elif score_calculation_type == 'last_checker_longest_connection_remove_limited':
            return self.last_checker_longest_connection_remove_limited()
        elif score_calculation_type == 'last_checker_longest_connection_zero_to_infinity':
            return self.last_checker_longest_connection_zero_to_infinity()
        elif score_calculation_type == 'last_checker_all_connections':
            return self.last_checker_all_connections()
        elif score_calculation_type == 'last_checker_all_connections_remove_limited':
            return self.last_checker_all_connections_remove_limited()
        elif score_calculation_type == 'last_checker_all_connections_zero_to_infinity':
            return self.last_checker_all_connections_zero_to_infinity()
        elif score_calculation_type == 'alpha':
            return self.alpha()            
        elif score_calculation_type == 'beta':
            return self.beta()
        elif 'monte_carlo_random' in score_calculation_type:
            if len(score_calculation_type.split('_')) == 4:
                return self.monte_carlo_random(iteration = score_calculation_type.split('_')[3])
            else:
                return self.monte_carlo_random()
        return 0 
    def calculate_instance_scores(self, instance, score_calculation_type, verbose):
        last_checker_longest_connection = float('-inf')
        last_checker_longest_connection_remove_limited = float('-inf')
        last_checker_longest_connection_zero_to_infinity = float('-inf')
        
        last_checker_all_connections = float('-inf')
        last_checker_all_connections_remove_limited = float('-inf')
        last_checker_all_connections_zero_to_infinity = float('-inf')
        
        alpha = float('-inf')
        beta = float('-inf')
        
        monte_carlo_random = float('-inf')
        
        if score_calculation_type == 'last_checker_longest_connection':
            last_checker_longest_connection = instance.last_checker_longest_connection()
        elif score_calculation_type == 'last_checker_longest_connection_remove_limited':
            last_checker_longest_connection_remove_limited = instance.last_checker_longest_connection_remove_limited()
        elif score_calculation_type == 'last_checker_longest_connection_zero_to_infinity':
            last_checker_longest_connection_zero_to_infinity = instance.last_checker_longest_connection_zero_to_infinity()
        
        elif score_calculation_type == 'last_checker_all_connections':
            last_checker_all_connections = instance.last_checker_all_connections()
        elif score_calculation_type == 'last_checker_all_connections_remove_limited':
            last_checker_all_connections_remove_limited = instance.last_checker_all_connections_remove_limited()
        elif score_calculation_type == 'last_checker_all_connections_zero_to_infinity':
            last_checker_all_connections_zero_to_infinity = instance.last_checker_all_connections_zero_to_infinity()
            
        elif score_calculation_type == 'alpha':
            alpha = instance.alpha()
        elif score_calculation_type == 'beta':
            beta = instance.beta()
            
        elif 'monte_carlo_random' in score_calculation_type:
            if len(score_calculation_type.split('_')) == 4:
                return instance.monte_carlo_random(iteration = score_calculation_type.split('_')[3])
            else:
                return instance.monte_carlo_random()
            
        elif score_calculation_type == 'ALL':
            last_checker_longest_connection = instance.last_checker_longest_connection()
            last_checker_longest_connection_remove_limited = instance.last_checker_longest_connection_remove_limited()
            last_checker_longest_connection_zero_to_infinity = instance.last_checker_longest_connection_zero_to_infinity()
            last_checker_all_connections = instance.last_checker_all_connections()
            last_checker_all_connections_remove_limited = instance.last_checker_all_connections_remove_limited()
            last_checker_all_connections_zero_to_infinity = instance.last_checker_all_connections_zero_to_infinity()
            alpha = instance.alpha()
            beta = instance.beta()
            monte_carlo_random = instance.monte_carlo_random()
                
        calculated_instance = SimpleNamespace(
            state_instance = instance,
            
            last_checker_longest_connection = last_checker_longest_connection,
            last_checker_longest_connection_remove_limited = last_checker_longest_connection_remove_limited,
            last_checker_longest_connection_zero_to_infinity = last_checker_longest_connection_zero_to_infinity,
            
            last_checker_all_connections = last_checker_all_connections,
            last_checker_all_connections_remove_limited = last_checker_all_connections_remove_limited,
            last_checker_all_connections_zero_to_infinity = last_checker_all_connections_zero_to_infinity,
            
            alpha = alpha,
            beta = beta,
            
            monte_carlo_random = monte_carlo_random
        )
        if verbose:
            instance.print(clear=False)
            if score_calculation_type == 'last_checker_longest_connection':
                print("LAST CHECKER LONGEST CONNECTION:", last_checker_longest_connection)
            elif score_calculation_type == 'last_checker_longest_connection_remove_limited':
                print("LAST CHECKER LONGEST CONNECTION REMOVE LIMITED:", last_checker_longest_connection_remove_limited)
            elif score_calculation_type == 'last_checker_longest_connection_zero_to_infinity':
                print("LAST CHECKER LONGEST CONNECTION ZERO TO INFINITY:", last_checker_longest_connection_zero_to_infinity)
            elif score_calculation_type == 'last_checker_all_connections':
                print("LAST CHECKER ALL CONNECTIONS:", last_checker_all_connections)
            elif score_calculation_type == 'last_checker_all_connections_remove_limited':
                print("LAST CHECKER ALL CONNECTIONS REMOVE LIMITED:", last_checker_all_connections_remove_limited)
            elif score_calculation_type == 'last_checker_all_connections_zero_to_infinity':
                print("LAST CHECKER ALL CONNECTIONS ZERO TO INFINITY:", last_checker_all_connections_zero_to_infinity)
            elif score_calculation_type == 'alpha':
                print("ALPHA:", alpha)
            elif score_calculation_type == 'beta':
                print("BETA:", beta)
            elif score_calculation_type == 'monte_carlo_random':
                print("MONTE CARLO RANDOM:", monte_carlo_random)
            elif score_calculation_type == 'ALL':
                print("LAST CHECKER LONGEST CONNECTION:", last_checker_longest_connection)
                print("LAST CHECKER LONGEST CONNECTION REMOVE LIMITED:", last_checker_longest_connection_remove_limited)
                print("LAST CHECKER LONGEST CONNECTION ZERO TO INFINITY:", last_checker_longest_connection_zero_to_infinity)
                print("LAST CHECKER ALL CONNECTIONS:", last_checker_all_connections)
                print("LAST CHECKER ALL CONNECTIONS REMOVE LIMITED:", last_checker_all_connections_remove_limited)
                print("LAST CHECKER ALL CONNECTIONS ZERO TO INFINITY:", last_checker_all_connections_zero_to_infinity)
                print("ALPHA:", alpha)
                print("BETA:", beta)
                print("MONTE CARLO RANDOM:", monte_carlo_random)
        return calculated_instance
 
    def last_checker_longest_connection(self):
        def check_count_in_a_row_next_position(row, column, connection, direction):
            if row < 0 or row >= self.rows or column < 0 or column >= self.columns:
                return connection
            if self.board[row][column] != 3 - self.turn:
                return connection
            return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection + 1, direction)
        
        directions = [[0,1], [1,0], [1,1], [1,-1]]
        longest_connection = 0
        
        for direction in directions:
            connection = check_count_in_a_row_next_position(self.last_checker[0] + direction[0], self.last_checker[1] + direction[1], 1, direction)
            connection += check_count_in_a_row_next_position(self.last_checker[0] - direction[0], self.last_checker[1] - direction[1], 0, [-direction[0], -direction[1]])
            if connection >= longest_connection:
                longest_connection = connection
        return longest_connection
    def last_checker_longest_connection_remove_limited(self):
        def check_count_in_a_row_next_position(row, column, connection, space, connected, direction):
            if row < 0 or row >= self.rows or column < 0 or column >= self.columns:
                return {'connection': connection, 'space': space}
            if self.board[row][column] == self.turn:
                return {'connection': connection, 'space': space}
            if self.board[row][column] == 3 - self.turn and connected:
                return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection + 1, space + 1, True, direction)
            return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection, space + 1, False, direction)
        
        
        directions = [[0,1], [1,0], [1,1], [1,-1]]
        longest_connection = 0
        
        for direction in directions:
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] + direction[0], self.last_checker[1] + direction[1], 1, 1, True, direction)
            connection = count_connection_and_space['connection']
            space = count_connection_and_space['space']
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] - direction[0], self.last_checker[1] - direction[1], 0, 0, True, [-direction[0], -direction[1]])
            connection += count_connection_and_space['connection']
            space += count_connection_and_space['space']
            
            if space < self.inarow:
                connection = 0
            if connection >= longest_connection:
                longest_connection = connection
        return longest_connection
    def last_checker_longest_connection_zero_to_infinity(self):
        def check_count_in_a_row_next_position(row, column, connection, space, connected, direction):
            if row < 0 or row >= self.rows or column < 0 or column >= self.columns:
                return {'connection': connection, 'space': space}
            if self.board[row][column] == self.turn:
                return {'connection': connection, 'space': space}
            if self.board[row][column] == 3 - self.turn and connected:
                return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection + 1, space + 1, True, direction)
            return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection, space + 1, False, direction)
        
        directions = [[0,1], [1,0], [1,1], [1,-1]]
        longest_connection = 0
        
        for direction in directions:
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] + direction[0], self.last_checker[1] + direction[1], 1, 1, True, direction)
            connection = count_connection_and_space['connection']
            space = count_connection_and_space['space']
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] - direction[0], self.last_checker[1] - direction[1], 0, 0, True, [-direction[0], -direction[1]])
            connection += count_connection_and_space['connection']
            space += count_connection_and_space['space']
            
            if space < self.inarow:
                connection = 0
            if connection >= self.inarow:
                connection = float('inf')
                return connection
            if connection >= longest_connection:
                longest_connection = connection
        return longest_connection
    
    def last_checker_all_connections(self):
        def check_count_in_a_row_next_position(row, column, connection, direction):
            if row < 0 or row >= self.rows or column < 0 or column >= self.columns:
                return connection
            if self.board[row][column] != 3 - self.turn:
                return connection
            return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection + 1, direction)
        
        directions = [[0,1], [1,0], [1,1], [1,-1]]
        all_connection = 1
        
        for direction in directions:
            connection = check_count_in_a_row_next_position(self.last_checker[0] + direction[0], self.last_checker[1] + direction[1], 0, direction)
            connection += check_count_in_a_row_next_position(self.last_checker[0] - direction[0], self.last_checker[1] - direction[1], 0, [-direction[0], -direction[1]])
            all_connection += connection
        return all_connection
    def last_checker_all_connections_remove_limited(self):
        def check_count_in_a_row_next_position(row, column, connection, space, connected, direction):
            if row < 0 or row >= self.rows or column < 0 or column >= self.columns:
                return {'connection': connection, 'space': space}
            if self.board[row][column] == self.turn:
                return {'connection': connection, 'space': space}
            if self.board[row][column] == 3 - self.turn and connected:
                return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection + 1, space + 1, True, direction)
            return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection, space + 1, False, direction)
        
        
        directions = [[0,1], [1,0], [1,1], [1,-1]]
        all_connection = 1
        
        for direction in directions:
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] + direction[0], self.last_checker[1] + direction[1], 0, 1, True, direction)
            connection = count_connection_and_space['connection']
            space = count_connection_and_space['space']
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] - direction[0], self.last_checker[1] - direction[1], 0, 0, True, [-direction[0], -direction[1]])
            connection += count_connection_and_space['connection']
            space += count_connection_and_space['space']
            
            if space < self.inarow:
                connection = 0
            all_connection += connection
        return all_connection
    def last_checker_all_connections_zero_to_infinity(self):
        def check_count_in_a_row_next_position(row, column, connection, space, connected, direction):
            if row < 0 or row >= self.rows or column < 0 or column >= self.columns:
                return {'connection': connection, 'space': space}
            if self.board[row][column] == self.turn:
                return {'connection': connection, 'space': space}
            if self.board[row][column] == 3 - self.turn and connected:
                return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection + 1, space + 1, True, direction)
            return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection, space + 1, False, direction)
        
        directions = [[0,1], [1,0], [1,1], [1,-1]]
        all_connection = 1
        
        for direction in directions:
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] + direction[0], self.last_checker[1] + direction[1], 0, 1, True, direction)
            connection = count_connection_and_space['connection']
            space = count_connection_and_space['space']
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] - direction[0], self.last_checker[1] - direction[1], 0, 0, True, [-direction[0], -direction[1]])
            connection += count_connection_and_space['connection']
            space += count_connection_and_space['space']
            
            if space < self.inarow:
                connection = 0
            if connection >= self.inarow-1:
                connection = float('inf')
                return connection
            all_connection += connection
        return all_connection
    
    def alpha(self):
        def check_count_in_a_row_next_position(row, column, connection, space, isolated_checker, droppable, connected, direction):
            if row < 0 or row >= self.rows or column < 0 or column >= self.columns:
                return {'connection': connection, 'space': space, 'isolated_checker': isolated_checker, 'droppable': droppable}
            if self.board[row][column] == self.turn:
                return {'connection': connection, 'space': space, 'isolated_checker': isolated_checker, 'droppable': droppable}
            if self.board[row][column] == 3 - self.turn and connected:
                return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection + 1, space, isolated_checker, droppable, True, direction)
            if self.board[row][column] == 3 - self.turn and not connected:
                isolated_checker += round(1.0 / (space + 1.0), 2)
            if space == 0 and (row - 1 < 0 or self.board[row - 1][column] != 0):
                droppable = True
            return check_count_in_a_row_next_position(row + direction[0], column + direction[1], connection, space + 1, isolated_checker, droppable, False, direction)
        
        directions = [[0,1], [1,0], [1,1], [1,-1]]
        all_connection = 1
        
        for direction in directions:
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] + direction[0], self.last_checker[1] + direction[1], 0, 0, 0, False, True, direction)
            forward_connection = count_connection_and_space['connection']
            forward_space = count_connection_and_space['space']
            isolated_checker = count_connection_and_space['isolated_checker']
            foward_droppable = count_connection_and_space['droppable']
            
            count_connection_and_space = check_count_in_a_row_next_position(self.last_checker[0] - direction[0], self.last_checker[1] - direction[1], 0, 0, 0, False, True, [-direction[0], -direction[1]])
            backward_connection = count_connection_and_space['connection']
            backward_space = count_connection_and_space['space']
            isolated_checker += count_connection_and_space['isolated_checker']
            backward_droppable = count_connection_and_space['droppable']
            
            connection = 0.0 + forward_connection + backward_connection + isolated_checker
            space = forward_space + backward_space
            
            if connection >= self.inarow - 1:
                all_connection = float('inf')
                return all_connection
            if connection >= self.inarow - 2 and foward_droppable and backward_droppable:
                all_connection = float('inf')
                return all_connection
                        
            if space + connection < self.inarow - 1:
                connection = 0
            
            all_connection += connection
        return all_connection  
    def beta(self):
        monte_carlo_random_385 = self.monte_carlo_random(iteration = 385)
        
        beta = monte_carlo_random_385 / 100.0    
        return beta
    
    def monte_carlo_random(self, iteration = 385):
        agent_pool = ['RANDOM']
        total_games = iteration
        score = 0
        
        current_instance = ConnectX(inarow = self.inarow, board = self.board, turn = self.turn, last_checker= self.last_checker)
        
        agent_pool = ['RANDOM']
        total_games = iteration
        
        if current_instance.last_checker_longest_connection() >= current_instance.inarow:
            score = 100.0
            return score
        next_states = current_instance.next_states()
        for next_state in next_states:
            if next_state.state_instance.last_checker_longest_connection() >= current_instance.inarow:
                score = -100.0
                return score
        
        agent_1 = Agent(random.choice(agent_pool)) # type: ignore
        agent_2 = Agent(random.choice(agent_pool)) # type: ignore
        match_result = current_instance.multiple_games(agent_1, agent_2, total_games = total_games, new_game = False, verbose = False)
        score += match_result[3 - self.turn] / total_games * 100.0 * 1.0
        score -= match_result[self.turn] / total_games * 100.0 * 1.0
                
        return score         
    # { GAME } -----------------------------------------------------------------------------------
    def game(self, agent_1, agent_2, echo_options = 'EACH_MOVE | X', new_game = True): # [ECHO_OPTIONS: 'EACH_MOVE', 'EACH_EPISODE', 'NONE'] 
        def count_checkers():
            checkers = 0
            for r in range(self.rows):
                for c in range(self.columns):
                    if self.board[r][c] != 0:
                        checkers += 1
            return checkers
        human_mode = False
        if agent_1.type == 'HUMAN' or agent_2.type == 'HUMAN':
            human_mode = True

        if new_game:
            self.clear()
            checker_dropped = 0
        else:
            checker_dropped = count_checkers()
        configuration = SimpleNamespace(
            columns = self.columns,
            rows = self.rows,
            inarow = self.inarow
        )
        while True:
            if checker_dropped >= self.rows * self.columns:
                if echo_options == 'EACH_EPISODE':
                    self.print(human_mode = human_mode)
                if echo_options == 'EACH_EPISODE' or echo_options == 'EACH_MOVE':
                    print("[DRAW THE GAME]")
                if echo_options == 'NONE':
                    return 0
                break
            observation = SimpleNamespace(
                board = self.board,
                mark = self.turn
            )
            if self.turn == self.PLAYER1:
                drop_column = agent_1.move(observation, configuration)
            else:
                drop_column = agent_2.move(observation, configuration)
            if drop_column == -1:
                if echo_options == 'EACH_EPISODE':
                    self.print(human_mode = human_mode)
                if echo_options == 'EACH_EPISODE' or echo_options == 'EACH_MOVE':
                    print("[PLAYER", self.turn, "LOST THE GAME]")
                if echo_options == 'NONE':
                    return self.turn
                break   
            drop_row = self.drop_checker(drop_column, self.turn)
            if 'EACH_MOVE' in echo_options:
                if len(echo_options.split(' | ')) > 1:
                    self.print(sleep = int(echo_options.split(' | ')[1]), human_mode = human_mode)
                else:
                    self.print(human_mode = human_mode)
            if drop_row == -1:
                if echo_options == 'EACH_EPISODE':
                    self.print(human_mode = human_mode)
                if echo_options == 'EACH_EPISODE' or echo_options == 'EACH_MOVE':
                    print("[PLAYER", self.turn, "LOST THE GAME]")
                if echo_options == 'NONE':
                    return self.turn
                break
            checker_dropped += 1
            if self.last_checker_longest_connection() >= self.inarow:
                if echo_options == 'EACH_EPISODE':
                    self.print(human_mode = human_mode)
                if echo_options == 'EACH_EPISODE' or echo_options == 'EACH_MOVE':
                    print("[PLAYER", 3 - self.turn, "WON THE GAME]")
                if echo_options == 'NONE':
                    return 3 - self.turn
                break
    def multiple_games(self, agent_1, agent_2, total_games = 100, new_game = True, verbose = True):
        results = {1: 0, 2: 0, 0: 0}
        for i in range(total_games):
            clone_instance = ConnectX(self.inarow, board = self.board, turn = self.turn)
            result = clone_instance.game(agent_1, agent_2, echo_options='NONE', new_game = new_game)
            results[result] += 1
        if verbose:
            print("[ PLAYER 1 WINS:", round(results[1]/total_games, 2) * 100, "% PLAYER 2 WINS:", round(results[2]/total_games, 2) * 100, "% DRAWS:", round(results[0]/total_games, 2) * 100, '% ]')
            time.sleep(1)
        return results

In [5]:
class Agent:
    def __init__(self, type): # [TYPE: 'RANDOM', 'GREEDY']
        self.type = type
    def move(self, observation, configuration): 
        if self.type == 'HUMAN':
            return self.move_human(observation, configuration)
        elif self.type == 'RANDOM':
            return self.move_random(observation, configuration)
        elif 'GREEDY' in self.type:
            greedy_type = self.type.split(' | ')
            if len(greedy_type) == 1:
                return self.move_greedy(observation, configuration)
            else:
                return self.move_greedy(observation, configuration, score_calculation_type = greedy_type[1])
        elif 'MINMAX_MONTE_CARLO' in self.type:
            return self.move_minmax_monte_carlo(observation, configuration)
        elif 'MINMAX' in self.type:
            minmax_type = self.type.split(' | ')
            if len(minmax_type) == 1:
                return self.move_minmax(observation, configuration)
            elif len(minmax_type) == 2:
                if minmax_type[1].isnumeric():
                    return self.move_minmax(observation, configuration, depth = int(minmax_type[1]))
                else:
                    return self.move_minmax(observation, configuration, score_calculation_type = minmax_type[1])
            elif len(minmax_type) == 3:
                if minmax_type[1].isnumeric():
                    return self.move_minmax(observation, configuration, depth = int(minmax_type[1]), score_calculation_type = minmax_type[2])
                else:
                    return self.move_minmax(observation, configuration, depth = int(minmax_type[2]), score_calculation_type = minmax_type[1])
            else:
                return self.move_minmax(observation, configuration)
        elif 'MONTE_CARLO' in self.type:
            monte_carlo_type = self.type.split(' | ')
            if len(monte_carlo_type) == 1:
                return self.move_monte_carlo(observation, configuration)
            elif len(monte_carlo_type) == 2:
                return self.move_monte_carlo(observation, configuration, iteration = int(monte_carlo_type[1]))
            else:
                return self.move_monte_carlo(observation, configuration)
        else:
            return self.move_random(observation, configuration)
    # { AGENT MOVEMENTs } ------------------------------------------------------------------------
    # { HUMAN }
    def move_human(self, observation, configuration):
        def str_board(board):
            def map_value(value):
                if value == 0:
                    return ' |'
                elif value == 1:
                    return ' X'
                elif value == 2:
                    return ' O'
                else:
                    return str(value)
            return '\n'.join([' '.join(map(map_value, row)) for row in board[::-1]])  
        
        columns = configuration.columns
        rows = configuration.rows
        inarow = configuration.inarow

        board = observation.board
        mark = observation.mark
        
        current_instance = ConnectX(inarow = inarow, board = board, turn = mark)
        valid_moves = [col for col in range(columns) if board[len(board)-1][col] == 0]
        
        input_label = ''
        for r in range(rows):
            input_label = input_label + '[' + str(r) + ']'
            
        user_input = input(input_label)
        if int(user_input) in valid_moves:
            return int(user_input)
        else:
            return self.move_human(observation, configuration)
    # { RANDOM AGENT }
    def move_random(self, observation, configuration):
        columns = configuration.columns
        rows = configuration.rows
        inarow = configuration.inarow

        board = observation.board
        mark = observation.mark
        
        valid_moves = [col for col in range(columns) if board[len(board)-1][col] == 0]
        if len(valid_moves) == 0:
            return -1
        return random.choice(valid_moves)
    # { GREEDY AGENT }
    def move_greedy(self, observation, configuration, score_calculation_type = 'alpha'):
        columns = configuration.columns
        rows = configuration.rows
        inarow = configuration.inarow

        board = observation.board
        mark = observation.mark
        
        current_state = ConnectX(inarow = inarow, board = board, turn = mark)
        next_states = current_state.next_states(score_calculation_type = 'NONE')
        
        max_score = float('-inf')
        max_score_column = self.move_random(observation, configuration)
        
        for next_state in next_states:
            score = next_state.state_instance.score(score_calculation_type)
            if score > max_score:
                max_score = score
                max_score_column = next_state.state_instance.last_checker[1]
                
        return max_score_column
    # { MINMAX AGENT }
    def move_minmax(self, observation, configuration, depth = 3, score_calculation_type = 'alpha'):
        def minmax_search(current_state, depth, inarow, score_calculation_type):
            if depth == 0:
                return { 'column': float('inf'), 'score': current_state.score(score_calculation_type)}
            
            next_player = current_state.turn
            next_states = current_state.next_states(score_calculation_type = 'NONE')
                        
            if next_player == current_state.PLAYER1:
                max_score = float('-inf')
                max_score_column = self.move_random(observation, configuration)
                for next_state in next_states:
                    score = minmax_search(next_state.state_instance, depth - 1, inarow, score_calculation_type)['score'] 
                    current_state_score = next_state.state_instance.score(score_calculation_type) / depth
                    if current_state_score == float('inf'):
                        score = float('inf')
                    else:
                        score += current_state_score
                    if score > max_score:
                        max_score = score
                        max_score_column = next_state.state_instance.last_checker[1]
                return { 'column': max_score_column, 'score': max_score }
            else:
                min_score = float('inf')
                min_score_column = self.move_random(observation, configuration)
                for next_state in next_states:
                    score = minmax_search(next_state.state_instance, depth - 1, inarow, score_calculation_type)['score'] 
                    current_state_score = next_state.state_instance.score(score_calculation_type) / depth
                    if current_state_score == float('inf'):
                        score = float('-inf')
                    else:
                        score -= current_state_score
                    if score < min_score:
                        min_score = score
                        min_score_column = next_state.state_instance.last_checker[1]
                return { 'column': min_score_column, 'score': min_score }
        
        columns = configuration.columns
        rows = configuration.rows
        inarow = configuration.inarow

        board = observation.board
        mark = observation.mark
        
        current_state = ConnectX(inarow = inarow, board = board, turn = mark)
        return minmax_search(current_state, depth, inarow, score_calculation_type)['column']
    # { MONTE CARLO AGENT }
    def move_monte_carlo(self, observation, configuration, iteration = 385, single_thread = False):
        def monte_carlo_thread_process(state, iterations):
            return state.state_instance.monte_carlo_random(iterations)
        columns = configuration.columns
        rows = configuration.rows
        inarow = configuration.inarow

        board = observation.board
        mark = observation.mark
        
        current_state = ConnectX(inarow = inarow, board = board, turn = mark)
        next_states = current_state.next_states(score_calculation_type = 'NONE')
        
        if single_thread:
            next_state_score_board = [0] * len(next_states)
            
            for index in range(len(next_states)):
                next_state_score_board[index] = next_states[index].state_instance.monte_carlo_random(iteration)
                    
            max_score = float('-inf')
            max_score_column = self.move_random(observation, configuration)
            for index in range(len(next_state_score_board)):
                if next_state_score_board[index] > max_score:
                    max_score = next_state_score_board[index]
                    max_score_column = next_states[index].state_instance.last_checker[1]
        else:
            with concurrent.futures.ThreadPoolExecutor() as executor:
                # Start the load operations and mark each future with its index
                future_to_index = {executor.submit(monte_carlo_thread_process, state, iteration): index for index, state in enumerate(next_states)}
                next_state_score_board = [0] * len(next_states)
                
                for future in concurrent.futures.as_completed(future_to_index):
                    index = future_to_index[future]
                    try:
                        next_state_score_board[index] = future.result()
                    except Exception as exc:
                        print(f"State {index} generated an exception: {exc}")  
            max_score = float('-inf')
            max_score_column = self.move_random(observation, configuration)
            for index, score in enumerate(next_state_score_board):
                if score > max_score:
                    max_score = score
                    max_score_column = next_states[index].state_instance.last_checker[1]
        return max_score_column     
    # { MONTE MINMAX CARLO AGENT }
    def move_minmax_monte_carlo(self, observation, configuration, depth = 3, score_calculation_type = 'beta'):
        def minmax_search(current_state, depth, inarow, score_calculation_type):
            if depth == 0:
                return { 'column': float('-inf'), 'score': current_state.score(score_calculation_type)}
            
            next_player = current_state.turn
            next_states = current_state.next_states(score_calculation_type = 'NONE')
                        
            if next_player == current_state.PLAYER1:
                max_score = -1
                max_score_column = self.move_random(observation, configuration)
                for next_state in next_states:
                    score = minmax_search(next_state.state_instance, depth - 1, inarow, score_calculation_type)['score'] 
                    current_state_score = next_state.state_instance.score(score_calculation_type) / depth
                    if current_state_score == 1:
                        max_score = 1
                        max_score_column = next_state.state_instance.last_checker[1]
                        break
                    else:
                        score += current_state_score
                    if score > max_score:
                        max_score = score
                        max_score_column = next_state.state_instance.last_checker[1]
                return { 'column': max_score_column, 'score': max_score }
            else:
                min_score = 1
                min_score_column = self.move_random(observation, configuration)
                for next_state in next_states:
                    score = minmax_search(next_state.state_instance, depth - 1, inarow, score_calculation_type)['score'] 
                    current_state_score = next_state.state_instance.score(score_calculation_type) / depth
                    if current_state_score == 1:
                        min_score = -1
                        min_score_column = next_state.state_instance.last_checker[1]
                        break
                    else:
                        score -= current_state_score
                    if score < min_score:
                        min_score = score
                        min_score_column = next_state.state_instance.last_checker[1]
                return { 'column': min_score_column, 'score': min_score }
        
        columns = configuration.columns
        rows = configuration.rows
        inarow = configuration.inarow

        board = observation.board
        mark = observation.mark
        
        current_state = ConnectX(inarow = inarow, board = board, turn = mark)
        return minmax_search(current_state, depth, inarow, score_calculation_type)['column']

### Mock Competations

In [6]:
C = ConnectX(inarow = 4, columns = 7, rows = 6)

A2 = Agent('MONTE_CARLO')
A1 = Agent('MINMAX_MONTE_CARLO')
# A2 = Agent('MONTE_CARLO')

# C.game(A1, A2, echo_options = 'EACH_MOVE')
C.multiple_games(A1, A2, total_games = 10, verbose = True)

In [1]:
C = ConnectX(inarow = 4, columns = 7, rows = 6)

A1 = Agent('MONTE_CARLO')
A2 = Agent('MINMAX_MONTE_CARLO')
# A2 = Agent('MONTE_CARLO')

# C.game(A1, A2, echo_options = 'EACH_MOVE')
C.multiple_games(A1, A2, total_games = 12, verbose = True)

NameError: name 'ConnectX' is not defined

#### Analysis why Last step Made

In [18]:
pervious_state = C.pervious_state()
pervious_state.next_states(score_calculation_type = 'ALL', verbose = True)

 O   X       O   O       O 
 X   X       X   X       O 
 O   O  [O]  O   X       O 
 X   X   X   O   O       X 
 O   X   X   X   O       X 
 X   X   O   X   O       O 
LAST CHECKER LONGEST CONNECTION: 4
LAST CHECKER LONGEST CONNECTION REMOVE LIMITED: 4
LAST CHECKER LONGEST CONNECTION ZERO TO INFINITY: inf
LAST CHECKER ALL CONNECTIONS: 6
LAST CHECKER ALL CONNECTIONS REMOVE LIMITED: 6
LAST CHECKER ALL CONNECTIONS ZERO TO INFINITY: inf
ALPHA: inf
BETA: inf
MONTE CARLO RANDOM: 100.0
 O   X       O   O       O 
 X   X       X   X       O 
 O   O       O   X       O 
 X   X   X   O   O       X 
 O   X   X   X   O       X 
 X   X   O   X   O  [O]  O 
LAST CHECKER LONGEST CONNECTION: 3
LAST CHECKER LONGEST CONNECTION REMOVE LIMITED: 3
LAST CHECKER LONGEST CONNECTION ZERO TO INFINITY: 3
LAST CHECKER ALL CONNECTIONS: 5
LAST CHECKER ALL CONNECTIONS REMOVE LIMITED: 3
LAST CHECKER ALL CONNECTIONS ZERO TO INFINITY: 3
ALPHA: 3.0
BETA: 0.8363636363636363
MONTE CARLO RANDOM: 77.92207792207792


In [395]:
#R = ConnectX(inarow = 4, columns = 7, rows = 6)
#R.random()
# R = ConnectX(4, board=[[1,1,1,2,0,0],[2,1,1,2,0,0],[2,2,2,0,0,0],[2,2,1,0,0,0],[1,1,0,0,0,0],[0,1,0,0,0,0]], turn = 2)
R = ConnectX(3, board=[[0,1,0,0,0],[0,0,0,0,0],[0,0,0,0,0],[0,0,0,0,0]])
states = R.next_states(score_calculation_type = 'ALL', verbose = True)

                   
                   
                   
[X]  X             
LAST CHECKER LONGEST CONNECTION: 2
LAST CHECKER LONGEST CONNECTION REMOVE LIMITED: 2
LAST CHECKER LONGEST CONNECTION ZERO TO INFINITY: 2
LAST CHECKER ALL CONNECTIONS: 2
LAST CHECKER ALL CONNECTIONS REMOVE LIMITED: 2
LAST CHECKER ALL CONNECTIONS ZERO TO INFINITY: 2
ALPHA: 2.0
                   
                   
    [X]            
     X             
LAST CHECKER LONGEST CONNECTION: 2
LAST CHECKER LONGEST CONNECTION REMOVE LIMITED: 2
LAST CHECKER LONGEST CONNECTION ZERO TO INFINITY: 2
LAST CHECKER ALL CONNECTIONS: 2
LAST CHECKER ALL CONNECTIONS REMOVE LIMITED: 2
LAST CHECKER ALL CONNECTIONS ZERO TO INFINITY: 2
ALPHA: 2.0
                   
                   
                   
     X  [X]        
LAST CHECKER LONGEST CONNECTION: 2
LAST CHECKER LONGEST CONNECTION REMOVE LIMITED: 2
LAST CHECKER LONGEST CONNECTION ZERO TO INFINITY: 2
LAST CHECKER ALL CONNECTIONS: 2
LAST CHECKER ALL CONNECTIONS REMOVE LIM

In [449]:
def monte_carlo_rollout(state, observation, configuration, agent_1, agent_2):
    columns = configuration.columns
    rows = configuration.rows
    inarow = configuration.inarow

    board = observation.board
    mark = observation.mark

    while True:
        observation = SimpleNamespace(
            board = state.state_instance.board,
            mark = state.state_instance.turn
        )
        if observation.mark == mark:
            drop_column = agent_1.move(observation, configuration)
        else:
            drop_column = agent_2.move(observation, configuration)
        drop_row = state.state_instance.drop_checker(drop_column, state.state_instance.turn)
        state.state_instance.print()
        if drop_row == -1:
            return state.state_instance.turn
        if state.state_instance.last_checker_longest_connection() >= inarow:
            return 3 - state.state_instance.turn
        is_draw = True
        for column in range(columns):
            if state.state_instance.board[rows-1][column] == 0:
                is_draw = False
        if is_draw:
            return 0

In [460]:
R = ConnectX(4, board=[[1,1,1,2,0,0],[2,1,1,2,0,0],[2,2,2,0,0,0],[2,2,1,0,0,0],[1,1,0,0,0,0],[0,1,0,0,0,0]], turn = 2)
# R.random()

A1 = Agent('MINMAX | 3')
A2 = Agent('MINMAX | 3')

configuration = SimpleNamespace(
    columns = R.columns,
    rows = R.rows,
    inarow = R.inarow
)
observation = SimpleNamespace(
    board = R.board,
    mark = R.turn
)

state = SimpleNamespace(
    state_instance = R,
)

monte_carlo_rollout(state, observation, configuration, A1, A2)

 O   X   O             
 X   X   X             
 O   O   X             
 O   O   O  [O]        
 O   X   X   O         
 X   X   X   O       X 


2