In [8]:
!pip install "kaggle-environments"
from kaggle_environments import evaluate, make, utils
import random
import math
import time
import numpy as np
import torch
import torch.nn as nn
import torch



In [9]:
env = make("connectx", debug=True)
configuration = env.configuration
print(configuration)

{'episodeSteps': 1000, 'actTimeout': 2, 'runTimeout': 1200, 'columns': 7, 'rows': 6, 'inarow': 4, 'agentTimeout': 60, 'timeout': 2}


In [10]:
def MCTS_agent(observation, configuration):
    global current_state  # D'aquesta forma podem eliminar l'arbre

    init_time = time.time()
    EMPTY = 0
    T_max = configuration.timeout - 0.34  # Temps per moviment
    Cp_default = 1

    def play(board, column, mark, config):
        """
        Coloca una peça.
        Proporcionat per l'environment de Kaggle
        """
        columns = config.columns
        rows = config.rows
        row = max([r for r in range(rows) if board[column + (r * columns)] == EMPTY])
        board[column + (row * columns)] = mark

    def is_win(board, column, mark, config):
        """
        Comprova si possar la peça és un moviment guanyador.
        Proporcionat per l'environment de Kaggle.
        """
        columns = config.columns
        rows = config.rows
        inarow = config.inarow - 1
        row = min([r for r in range(rows) if board[column + (r * columns)] == mark])

        def count(offset_row, offset_column):
            for i in range(1, inarow + 1):
                r = row + offset_row * i
                c = column + offset_column * i
                if (
                        r < 0
                        or r >= rows
                        or c < 0
                        or c >= columns
                        or board[c + (r * columns)] != mark
                ):
                    return i - 1
            return inarow

        return (
                count(1, 0) >= inarow  # vertical
                or (count(0, 1) + count(0, -1)) >= inarow  # horitzontal
                or (count(-1, -1) + count(1, 1)) >= inarow  # diagonal esquerra
                or (count(-1, 1) + count(1, -1)) >= inarow  # diagonal dreta
        )

    def is_tie(board):
        """
        Mira si hi ha un empat
        """
        return not(any(mark == EMPTY for mark in board))

    def check_finish_and_score(board, column, mark, config):
        """
        Torna una tupla on el primer argument ens diu si el joc ha finalitzat i el segon torna
        el resultat si hi ha acabat
        """
        if is_win(board, column, mark, config):
            return (True, 1)
        if is_tie(board):
            return (True, 0.5)
        else:
            return (False, None)

    def uct_score(node_total_score, node_total_visits, parent_total_visits, Cp=Cp_default):
        """
        UCB1
        """
        if node_total_visits == 0:
            return math.inf
        return node_total_score / node_total_visits + Cp * math.sqrt(
            2 * math.log(parent_total_visits) / node_total_visits)

    def opponent_mark(mark):
        """
        Indica quin es l'usuari actiu (Jugador 1 o 2)
        """
        return 3 - mark

    def opponent_score(score):
        """
        Per propagar els resultats de l'arbre (Backpropagation)
        """
        return 1 - score

    def random_action(board, config):
        """ Torna una acció legal aleatoria """
        return random.choice([c for c in range(config.columns) if board[c] == EMPTY])

    def default_policy_simulation(board, mark, config):
        """
        Executa la simulació d'una jugada aleatoria. L'estat inicial assumeix que no és un estat final.
        Torna el resultat de la partida.
        """
        original_mark = mark
        board = board.copy()
        column = random_action(board, config)
        play(board, column, mark, config)
        is_finish, score = check_finish_and_score(board, column, mark, config)
        while not is_finish:
            mark = opponent_mark(mark)
            column = random_action(board, config)
            play(board, column, mark, config)
            is_finish, score = check_finish_and_score(board, column, mark, config)
        if mark == original_mark:
            return score
        return opponent_score(score)
    
    def find_action_taken_by_opponent(new_board, old_board, config):
        """
        Troba quin moviment ha fet l'oponent. S'utilitza per reciclar arbres entre moviments
        """
        for i, piece in enumerate(new_board):
            if piece != old_board[i]:
                return i % config.columns
        return -1  # shouldn't get here

    class Node():
        """ 
        Representa els nodes de l'arbre de joc.
        
        """
        def __init__(self, board, mark, config, parent=None, is_terminal=False, terminal_score=None, action_taken=None):
            self.board = board.copy()
            self.mark = mark
            self.config = config
            self.children = []
            self.parent = parent
            self.node_total_score = 0
            self.node_total_visits = 0
            self.available_moves = [c for c in range(config.columns) if board[c] == EMPTY]
            self.expandable_moves = self.available_moves.copy()
            self.is_terminal = is_terminal
            self.terminal_score = terminal_score
            self.action_taken = action_taken

        def is_expandable(self):
            """
            Mira si el node té fills encara per explorar
            """
            return (not self.is_terminal) and (len(self.expandable_moves) > 0)

        def expand_and_simulate_child(self):
            """
            Expandeix un moviment aleatori i fa una simulació d'aquest.
            Expansió + Simulació + Propagació cap enrera
            """
            column = random.choice(self.expandable_moves)
            child_board = self.board.copy()
            play(child_board, column, self.mark, self.config)
            is_terminal, terminal_score = check_finish_and_score(child_board, column, self.mark, self.config)
            self.children.append(Node(child_board, opponent_mark(self.mark),
                                       self.config, parent=self,
                                       is_terminal=is_terminal,
                                       terminal_score=terminal_score,
                                       action_taken=column
                                       ))
            simulation_score = self.children[-1].simulate()
            self.children[-1].backpropagate(simulation_score)
            self.expandable_moves.remove(column)

        def choose_strongest_child(self, Cp):
            """
            Elegeix el fill amb la major puntuació UCB1
            (Estat de selecció)
            """
            children_scores = [uct_score(child.node_total_score,
                                         child.node_total_visits,
                                         self.node_total_visits,
                                         Cp) for child in self.children]
            max_score = max(children_scores)
            best_child_index = children_scores.index(max_score)
            return self.children[best_child_index]
            
        def choose_play_child(self):
            """
            Elegeix el fill amb la puntuació màxima
            """
            children_scores = [child.node_total_score for child in self.children]
            max_score = max(children_scores)
            best_child_index = children_scores.index(max_score)
            return self.children[best_child_index]

        def tree_single_run(self):
            """
            Iteració de les 4 etapes de l'algorisme MCTS
            """
            if self.is_terminal:
                self.backpropagate(self.terminal_score)
                return
            if self.is_expandable():
                self.expand_and_simulate_child()
                return
            self.choose_strongest_child(Cp_default).tree_single_run()

        def simulate(self):
            """
            Simula l'estat actual.
            Si s'ha arribat a un estat terminal la puntuació pertany al jugador que ha fet el darrer moviment,
            si no, és la de l'oponent
            """
            if self.is_terminal:
                return self.terminal_score
            return opponent_score(default_policy_simulation(self.board, self.mark, self.config))

        def backpropagate(self, simulation_score):
            """
            Propagació cap enrera de la puntuació i el compte de visites
            """
            self.node_total_score += simulation_score
            self.node_total_visits += 1
            if self.parent is not None:
                self.parent.backpropagate(opponent_score(simulation_score))
                
        def choose_child_via_action(self, action):
            """
            Escull un fill donat una acció de l'estat.
            """
            for child in self.children:
                if child.action_taken == action:
                    return child
            return None

    board = observation.board
    mark = observation.mark
    
    # Si l'estat actual existeix, ho reciclam basat en les accions de l'oponent
    try:  
        current_state = current_state.choose_child_via_action(
            find_action_taken_by_opponent(board, current_state.board, configuration))
        current_state.parent = None  # Feim l'estat actual el node arrel
        
    except:  # Nova partida o qualsevol excepció
        current_state = Node(board, mark,  # Aquest estat és considerat després del darrer moviment de l'oponent
                              configuration, parent=None, is_terminal=False, terminal_score=None, action_taken=None)
   
    # Feim iteracions fins que arribam al temps límit
    while time.time() - init_time <= T_max:
        current_state.tree_single_run()
        
    current_state = current_state.choose_play_child()
    return current_state.action_taken

In [11]:
env.reset()
try:
    del current_state
except:
    pass

env.run([MCTS_agent, MCTS_agent])
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

Success!


In [12]:
# Els agents aniran primer un 50% dels pics
def evaluate_agents(agent, config = {'rows': 6, 'columns': 7, 'inarow': 4}, rounds=20):
    outcomes = evaluate(
        environment = "connectx",
        agents = [MCTS_agent, agent],
        configuration = config,
        num_episodes = rounds // 2
    )
    
    outcomes += [
        [b,a] for [a,b] in evaluate(
            environment = "connectx",
            agents = [agent, MCTS_agent],
            configuration = config,
            num_episodes = rounds -  rounds // 2
        )
    ]

    print("Victories agent MCTS vs agent {}: {}%".format(agent, np.round(outcomes.count([1,-1])/len(outcomes), 2) * 100))

In [7]:
evaluate_agents("random", rounds=10)
evaluate_agents("negamax", rounds=6)

Victories agent MCTS vs agent random: 100.0%
Victories agent MCTS vs agent negamax: 100.0%
