<a href="https://colab.research.google.com/github/LittleH0rst/Notebooks/blob/master/running_AlphaZero_Ninemensmorris.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# util

In [None]:
"""
Author: Surag or other contributor to the repo
https://github.com/suragnair/alpha-zero-general
"""
class AverageMeter(object):
    """From https://github.com/pytorch/examples/blob/master/imagenet/main.py"""

    def __init__(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def __repr__(self):
        return f'{self.avg:.2e}'

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


class dotdict(dict):
    def __getattr__(self, name):
        return self[name]

#mcts

In [None]:
"""
Author: Surag or other contributor to the repo
https://github.com/suragnair/alpha-zero-general
"""
import logging
import math

import numpy as np

EPS = 1e-8

log = logging.getLogger(__name__)


class MCTS():
    """
    This class handles the MCTS tree.
    """

    def __init__(self, game, nnet, args):
        self.game = game
        self.nnet = nnet
        self.args = args
        self.Qsa = {}  # stores Q values for s,a (as defined in the paper)
        self.Nsa = {}  # stores #times edge s,a was visited
        self.Ns = {}  # stores #times board s was visited
        self.Ps = {}  # stores initial policy (returned by neural net)

        self.Es = {}  # stores game.getGameEnded ended for board s
        self.Vs = {}  # stores game.getValidMoves for board s

    def getActionProb(self, canonicalBoard, temp=1):
        """
        This function performs numMCTSSims simulations of MCTS starting from
        canonicalBoard.

        Returns:
            probs: a policy vector where the probability of the ith action is
                   proportional to Nsa[(s,a)]**(1./temp)
        """
        for i in range(self.args.numMCTSSims):
            self.search(canonicalBoard)

        s = self.game.stringRepresentation(canonicalBoard)
        counts = [self.Nsa[(s, a)] if (s, a) in self.Nsa else 0 for a in range(self.game.getActionSize())]

        if temp == 0:
            bestAs = np.array(np.argwhere(counts == np.max(counts))).flatten()
            bestA = np.random.choice(bestAs)
            probs = [0] * len(counts)
            probs[bestA] = 1
            return probs

        counts = [x ** (1. / temp) for x in counts]
        counts_sum = float(sum(counts))
        probs = [x / counts_sum for x in counts]
        return probs

    def search(self, canonicalBoard):
        """
        This function performs one iteration of MCTS. It is recursively called
        till a leaf node is found. The action chosen at each node is one that
        has the maximum upper confidence bound as in the paper.

        Once a leaf node is found, the neural network is called to return an
        initial policy P and a value v for the state. This value is propagated
        up the search path. In case the leaf node is a terminal state, the
        outcome is propagated up the search path. The values of Ns, Nsa, Qsa are
        updated.

        NOTE: the return values are the negative of the value of the current
        state. This is done since v is in [-1,1] and if v is the value of a
        state for the current player, then its value is -v for the other player.

        Returns:
            v: the negative of the value of the current canonicalBoard
        """

        s = self.game.stringRepresentation(canonicalBoard)

        if s not in self.Es:
            self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
        if self.Es[s] != 0:
            # terminal node
            return -self.Es[s]

        if s not in self.Ps:
            # leaf node
            self.Ps[s], v = self.nnet.predict(canonicalBoard)
            valids = self.game.getValidMoves(canonicalBoard, 1)
            self.Ps[s] = self.Ps[s] * valids  # masking invalid moves
            sum_Ps_s = np.sum(self.Ps[s])
            if sum_Ps_s > 0:
                self.Ps[s] /= sum_Ps_s  # renormalize
            else:
                # if all valid moves were masked make all valid moves equally probable

                # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
                # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.
                log.error("All valid moves were masked, doing a workaround.")
                self.Ps[s] = self.Ps[s] + valids
                self.Ps[s] /= np.sum(self.Ps[s])

            self.Vs[s] = valids
            self.Ns[s] = 0
            return -v

        valids = self.Vs[s]
        cur_best = -float('inf')
        best_act = -1

        # pick the action with the highest upper confidence bound
        for a in range(self.game.getActionSize()):
            if valids[a]:
                if (s, a) in self.Qsa:
                    u = self.Qsa[(s, a)] + self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s]) / (
                            1 + self.Nsa[(s, a)])
                else:
                    u = self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s] + EPS)  # Q = 0 ?

                if u > cur_best:
                    cur_best = u
                    best_act = a

        a = best_act
        next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
        next_s = self.game.getCanonicalForm(next_s, next_player)

        v = self.search(next_s)

        if (s, a) in self.Qsa:
            self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[(s, a)] + v) / (self.Nsa[(s, a)] + 1)
            self.Nsa[(s, a)] += 1

        else:
            self.Qsa[(s, a)] = v
            self.Nsa[(s, a)] = 1

        self.Ns[s] += 1
        return -v

#ninemensmorris logic

In [None]:
'''
Author: Jonas Jakob
Created: May 31, 2023

Implementation of the NineMensMorris Game Logic
'''

class Board():

    """
    A Ninemensmorris Board is represented as a array of (25)
    The item on board[24] represents the placing phase. "0" if
    the phase is not over yet, "1" if it is.

    Board logic:

    The pieces are represented as
    - 1 for player one (black), 1 for player 2 (white) and 0 if there is no
    piece on the position (for the canonical Board the
    current players pieces are always shown as 1 and the
    opponents as -1). The initial board:

        board shape:
        [0,0,0,0,0,0,0,0,    -> outer ring
        0,0,0,0,0,0,0,0,     -> middle ring
        0,0,0,0,0,0,0,0]     -> inner ring



    Locations:

    Locations are given as the index in the board array.

    Actions:

    Actions are stored in a list of tuples of the form:
        action = [piece_location, move_location, remove_piece]
    """

    """
    6x6 configuration
    24 spots for pieces
    1 spot to count the placed pieces
    1 spot to count the current moves without mills

    -> need to be in the board itself, since only the board is
    """
    def __init__(self):
        "Set up initial board configuration."
        self.n = 6
        self.pieces = np.zeros((6,6), dtype=int)

    """
    currently not used
    """
    def __getitem__(self, index):
      return self.pieces[index]


    """
    returns a vector of ones and zeros, marking all the legal moves for the
    current board state
    """
    def get_legal_move_vector(self, player, all_moves):
        """
        Input:
            player: current player (1 or -1)
            all_moves: list with all possible moves

        Returns:
            legal_move_vector: vector of length = all_moves with ones and zeros
        """
        legal_moves = self.get_legal_moves(player)
        legal_move_vector = [0] * len(all_moves)

        for move in legal_moves:
          index = all_moves.index(move)
          legal_move_vector[index] = 1
        return legal_move_vector

    """
    Transforms the array form of the NineMensMorris board into a Image, that
    can be used as Input for the Neural Network
    """
    def arrayToImage(self, array, placements_and_moves):
        """
        Input:
            array: list with all 24 board positions
            placements_and_moves: Tuple containing the placed pieces in phase
            zero and the current number of moves without a mill

        Returns:
            legal_move_vector: vector of length = all_moves with ones and zeros
        """
        board_image = np.zeros((6,6), dtype=int)
        boardx = 0
        boardy = 0
        count_placements, current_moves = placements_and_moves
        assert(len(array) == 24)
        assert(0 <= count_placements <= 18)
        index = 0
        while index < 24:

          board_image[boardx][boardy] = np.copy(array[index])
          if boardy == 5:
            boardx += 1
            boardy = 0
          else:
            boardy += 1
          index += 1


        board_image[4][0] = count_placements
        board_image[4][1] = current_moves
        assert(0 <= board_image[4][0] <= 18)

        return board_image

    """
    Transforms the Image form used in the training of the Neural Network into an
    Array of the board and a Tuple containing the placed pieces in phase zero
    and the current number of moves without a mill.
    """
    def piecesToArray(self):
        """
        Returns:
            re_board: list with all 24 board positions
            placements_and_moves: Tuple containing the placed pieces in phase
            zero and the current number of moves without a mill
        """
        re_board = []
        re_board.extend(self.pieces[0])
        re_board.extend(self.pieces[1])
        re_board.extend(self.pieces[2])
        re_board.extend(self.pieces[3])


        assert(0 <= self.pieces[4][0] <= 18)
        assert(len(re_board) == 24)
        placements_and_moves = (self.pieces[4][0], self.pieces[4][1])

        return (re_board, placements_and_moves)

    """
    Gets the current game phase for the current player, then calls the
    right method to retrieve the legal moves for the specific game phase, board
    and player. Returns a list
    """
    def get_legal_moves(self, player):
        """
        Input:
            player: current player (1 or -1)

        Returns:
            legal_move_vector: list with all the move Tuples that are legal for
            the current board state
        """
        game_phase = self.get_game_phase(player)
        assert(0 <= game_phase <= 2)
        if game_phase == 0:
            return list(self.get_legal_moves_0(player))

        elif game_phase == 1:
            return list(self.get_legal_moves_1(player))
        elif game_phase == 2:
            return list(self.get_legal_moves_2(player))

    """
    Looks at the board, given the current player and identifies the
    phase of the game for the player.
    """
    def get_game_phase(self, player):

        array, placements_and_moves = self.piecesToArray()
        assert(0 <= placements_and_moves[0] <= 18)

        if placements_and_moves[0] < 18:
            return 0
        elif len(self.get_player_pieces(player)) <= 3:
            return 2
        else:
            return 1

    """
    looks at the board, given the current player and returns the
    locations of the players pieces in a list.
    """
    def get_player_pieces(self, player):

        board, placements = self.piecesToArray()
        locations = []

        index = 0
        while index < len(board):
            if board[index] == player:
                locations.append(index)
            index += 1
        if locations == []:
          return []
        else:
          return list(locations)

    """
    looks at the board and returns the indices for all empty
    positions in a list.
    """
    def get_empty_positions(self):

        board, placements = self.piecesToArray()
        assert(0 <= placements[0] <= 18)
        assert(len(board) == 24)

        locations = []

        index = 0
        while index < len(board):
            if board[index] == 0:
                locations.append(index)
            index += 1

        return list(locations)

    """
    identifies possible mills, checking if any of the moves on the current board would
    form a mill (results in a different marking for the list of all moves)
    move_locations => Array of Tuples (origin, move)
    Each check makes sure, that the origin of the move, isnt one of the pieces in the
    potentially new mill
    """
    def get_possible_mills(self, move_locations, player):

        board, placements = self.piecesToArray()
        assert(0 <= placements[0] <= 18)
        assert(len(board) == 24)
        move_forms_mill = []

        for move in move_locations:
            if (move != None) and (move[1] < 24) and (move[1] >= 0) :
                if (move[1] % 2) == 0: #move is in a corner
                    if (move[1] % 8) == 0: # move is in the top left corner of a ring
                        if (([move[1] + 7] == player) and (board[move[1] + 6] == player) and
                           (move[1] + 7 != move[0]) and (move[1] + 6 != move[0])): #check down
                            move_forms_mill.append(move)
                        if ((board[move[1] + 1] == player) and (board[move[1] + 2] == player) and
                           (move[1] + 1 != move[0]) and (move[1] + 2 != move[0])): #check right
                            move_forms_mill.append(move)
                    elif move in [6,14,22]: #move is in the bottom left corner of a ring
                        if ((board[move[1] + 1] == player) and (board[move[1] - 6] == player) and
                           (move[1] + 1 != move[0])and (move[1] - 6 != move[0])): #check up
                            move_forms_mill.append(move)
                        if ((board[move[1] - 1] == player) and (board[move[1] - 2] == player) and
                           (move[1] - 1 != move[0]) and (move[1] - 2 != move[0])): #check right
                            move_forms_mill.append(move)
                    elif move in [2,10,18,4,12,20]: #move is in the bottom or top right corner of a ring
                        if ((board[move[1] + 1] == player) and (board[move[1] + 2] == player) and
                           (move[1] + 1 != move[0]) and (move[1] + 2 != move[0])): #check down/ left
                            move_forms_mill.append(move)
                        if ((board[move[1] - 1] == player) and (board[move[1] - 2] == player) and
                           (move[1] - 1 != move[0]) and (move[1] - 2 != move[0])): #check left/ up
                            move_forms_mill.append(move)

                else: #move is in the middle of a row
                    if move[1] in [1,3,5,7]: #outer ring
                        if move[1] == 7:
                            if ((board[move[1] - 7] == player) and (board[move[1] - 1] == player) and
                               (move[1] - 7 != move[0]) and (move[1] - 1 != move[0])): #check ring
                                move_forms_mill.append(move)
                        else:
                            if ((board[move[1] - 1] == player) and (board[move[1] + 1] == player) and
                               (move[1] - 1 != move[0]) and (move[1] + 1 != move[0])): #check ring
                                move_forms_mill.append(move)
                        if ((board[move[1] + 8] == player) and (board[move[1] + 16] == player) and
                           (move[1] + 8 != move[0]) and (move[1] + 16 != move[0])): #check intersections
                                move_forms_mill.append(move)

                    elif move[1] in [9,11,13,15]: #middle ring
                        if move[1] == 15:
                            if ((board[move[1] - 7] == player) and (board[move[1] - 1] == player) and
                               (move[1] - 7 != move[0]) and (move[1] - 1 != move[0])): #check ring
                                move_forms_mill.append(move)
                        else:
                            if ((board[move[1] - 1] == player) and (board[move[1] + 1] == player) and
                               (move[1] - 1 != move[0]) and (move[1] + 1 != move[0])): #check ring
                                move_forms_mill.append(move)
                        if ((board[move[1] + 8] == player) and (board[move[1] - 8] == player) and
                           (move[1] + 8 != move[0]) and (move[1] - 8 != move[0])): #check intersections
                                move_forms_mill.append(move)

                    elif move[1] in [17,19,21,23]: #inner ring
                        if move[1] == 23:
                            if ((board[move[1] - 7] == player) and (board[move[1] - 1] == player) and
                               (move[1] - 7 != move[0]) and (move[1] - 1 != move[0])): #check ring
                                move_forms_mill.append(move)
                        else:
                            if ((board[move[1] - 1] == player) and (board[move[1] + 1] == player) and
                               (move[1] - 1 != move[0]) and (move[1] + 1 != move[0])): #check ring
                                move_forms_mill.append(move)
                        if ((board[move[1] - 8] == player) and (board[move[1] - 16] == player) and
                           (move[1] - 8 != move[0]) and (move[1] - 16 != move[0])): #check intersections
                                move_forms_mill.append(move)

        return list(move_forms_mill)

    """
    Looks at the board and returns all current mills for a given player, in tuples of their coordinates
    IDEA: maybe not in tuples, but in a set of coordinates
    """
    def check_for_mills(self, player):

        current_mills = []
        board, placements = self.piecesToArray()
        assert(0 <= placements[0] <= 18)
        assert(len(board) == 24)

        index = 0

        while index < 23: #check rings
            if (index in [6,14,22]):
              if (board[index] == board[index + 1] == board[index - 6] == player):
                current_mills.append((index, index + 1, index - 6))
            elif (board[index] == board[index + 1] == board[index + 2] == player):
              current_mills.append((index, index + 1, index + 2))

            index += 2

        index = 1

        while index < 8: #check intersections
            if (board[index] == board[index + 8] == board[index + 16] == player):
              current_mills.append((index, index + 8, index + 16))

            index += 2

        return list(current_mills)

    """
    given a position, this method returns a tuple with all the neighboring positions
    """
    def get_neighbours(self, position):

        if position != None:
                if (position % 2) == 0: #position is in a corner

                    if (position % 8) == 0: # position is in the top left corner of a ring
                        return (position + 1, position + 7)

                    else: #position is in top right, or bottom corners
                        return (position - 1, position + 1)

                else: #position is in a intersection
                    if position in [1,3,5,7]: #outer ring
                        if position == 7:
                            return (0, 6, 15)
                        else:
                            return (position - 1, position + 1, position + 8)


                    elif position in [9,11,13,15]: #middle ring
                        if position == 15:
                            return (7, 8, 14, 23)
                        else:
                            return (position - 8, position - 1, position + 1, position + 8)

                    elif position in [17,19,21,23]: #outer ring
                        if position == 23:
                            return (15, 16, 22)
                        else:
                            return (position - 8, position - 1, position + 1)


        return

    """
    Looks at the board, given the current player and returns a list
    with the locations of all pieces outside mills for the current
    player
    """
    def get_pieces_outside_mills(self, player):

        all_pieces = self.get_player_pieces(player)

        mills = self.check_for_mills(player)

        remaining_pieces = self.get_player_pieces(player)

        for piece in all_pieces:
            if len(mills) != 0:
                for mill in mills:
                    if piece in mill and piece in remaining_pieces:
                        remaining_pieces.remove(piece)


        return list(remaining_pieces)

    """
    Looks at the board, given the current player and identifies all
    legal moves for the current gamestate, given that the player is
    in Phase 0
    """
    def get_legal_moves_0(self, player):

        #get enemy pieces that can be taken if a mill is formed
        enemies_outside_mills = self.get_pieces_outside_mills(-player)
        if len(enemies_outside_mills) > 0:
            enemies_to_take = enemies_outside_mills
        else:
            enemies_to_take = self.get_player_pieces(-player)


        #get empty positions, they represent all possible move locations for phase zero
        empty_locations = []
        for position in self.get_empty_positions():
            empty_locations.append(('none',position))

        #get moves -> for each move_location, check if a mill is formed (check row(s))
        mill_moves = self.get_possible_mills(empty_locations, player)


        #generate action tuples
        moves = []

        for move in empty_locations:
            if move in mill_moves:
                for enemy in enemies_to_take:
                    moves.append(('none',move[1],enemy))
            else:
                moves.append(('none',move[1],'none'))


        return list(moves)


    """
    Looks at the board, given the current player and identifies all
    legal moves for the current gamestate, given that the player is
    in Phase 1
    """
    def get_legal_moves_1(self, player):

        moves = []
        board, placements = self.piecesToArray()
        assert(placements[0] == 18)
        assert(len(board) == 24)

        #get enemy pieces that can be taken if a mill is formed
        enemies_outside_mills = self.get_pieces_outside_mills(-player)
        if len(enemies_outside_mills) > 0:
            enemies_to_take = enemies_outside_mills
        else:
            enemies_to_take = self.get_player_pieces(-player)

        #get the current players pieces that will be moved
        current_positions = self.get_player_pieces(player)

        #creating the first part of the moves
        part_moves = []

        for position in current_positions:
            neighbours = self.get_neighbours(position)
            index = 0
            while index < len(neighbours):
                if board[neighbours[index]] == 0:
                    part_moves.append((position, neighbours[index]))
                index += 1

        #finding the part moves that create mills, then pairing them accordingly with enemy pieces to beat
        #get moves -> for each move_location, check if a mill is formed (check row(s))
        mill_moves = self.get_possible_mills(part_moves, player)

        for move in part_moves:
            if move in mill_moves:
                for enemy in enemies_to_take:
                    moves.append((move[0],move[1],enemy))
            else:
                moves.append((move[0],move[1],'none'))



        return list(moves)


    """
    Looks at the board, given the current player and identifies all
    legal moves for the current gamestate, given that the player is
    in Phase 2
    """
    def get_legal_moves_2(self, player):

        moves = []

        #get enemy pieces that can be taken if a mill is formed
        enemies_outside_mills = self.get_pieces_outside_mills(-player)
        if len(enemies_outside_mills) > 0:
            enemies_to_take = enemies_outside_mills
        else:
            enemies_to_take = self.get_player_pieces(-player)

        #get the current players pieces that will be moved
        current_positions = self.get_player_pieces(player)

        #creating the first part of the moves
        part_moves = []

        empty_locations = self.get_empty_positions()

        #pair the locations of current positions with all empty locations on the board
        for position in current_positions:
            for location in empty_locations:
                part_moves.append((position, location))

        #finding the part moves that create mills, then pairing them accordingly with enemy pieces to beat
        #get moves -> for each move_location, check if a mill is formed (check row(s))
        mill_moves = self.get_possible_mills(part_moves, player)

        for move in part_moves:
            if move in mill_moves:
                for enemy in enemies_to_take:
                    moves.append((move[0],move[1],enemy))
            else:
                moves.append((move[0],move[1],'none'))

        return list(moves)

    """
    Method makes sense, works as intended. Checked July 19th
    """
    def has_legal_moves(self, player):
        if (len(self.get_legal_moves(player)) > 0):
            return True
        return False

    '''
    rotates the board three times, each time creating a pair of the rotated
    board and the rotated policy array
    IDEA one: Reconstruct the current player through the policy array pi and
    regenerate a the array through all the legal moves for the board state
    IDEA two: Find rules to create a rotation vector to swap the positions in
    the right way
    NEW IDEA: Use simple Vector addition for the board rotations, but generate
    a lookup vektor for the policy vector, by generating all moves
    '''
    def get_board_rotations(self, pi, all_moves, policy_rotation_vector):

        #vector to rotate the board 90 degrees -> move each ring by two positions
        rot90_vector = [2,2,2,2,2,2,-6,-6,2,2,2,2,2,2,-6,-6,2,2,2,2,2,2,-6,-6]

        old_board, placements = self.piecesToArray()
        new_board = np.zeros((24), dtype = int)
        new_pi = np.zeros((len(all_moves)), dtype = int)

        rotated_results = []

        #rotates the board 3 times
        for i in range(3):
            index = 0
            while index < 24:
                new_board[index+rot90_vector[index]]= np.copy(old_board[index])
                index+=1

            index = 0
            while index < len(all_moves):
                new_pi[policy_rotation_vector[index]] = np.copy(pi[index])
                index += 1

            rotated_results += [(self.arrayToImage(new_board, placements),new_pi)]
            #print("rotating")
            #print(old_board)
            old_board = np.copy(new_board)
            #print(new_board)
            pi = np.copy(new_pi)

            i+=1

        return rotated_results



    def execute_move(self, player, move_index, all_moves):

        move = all_moves[move_index]
        assert(len(move)==3) #move is a tuple of length 3
        board, placements = self.piecesToArray()
        assert(0 <= placements[0] <= 18)
        assert(len(board) == 24)

        #log.info('TRYING TO EXEC MOVE %s', move)
        #log.info('with BOARD')
        #log.info('%s ______  %s ______ %s ', board[0], board[1], board[2])
        #log.info('|          |          | ')
        #log.info('|  %s____  %s ____ %s | ', board[8], board[9], board[10])
        #log.info('|  |       |       |  | ')
        #log.info('|  |  %s__ %s __%s |  | ', board[16], board[17], board[18])
        #log.info('|  |  |         |  |  | ')
        #log.info('%s-%s-%s        %s-%s-%s', board[7], board[15], board[23], board[19], board[11], board[3])
        #log.info('|  |  |         |  |  | ')
        #log.info('|  |  %s__ %s __%s |  | ', board[22], board[21], board[20])
        #log.info('|  |       |       |  | ')
        #log.info('|  %s_____ %s _____%s | ', board[14], board[13], board[12])
        #log.info('|          |          | ', )
        #log.info('%s _______ %s ______ %s ', board[6], board[5], board[4])

        count_placements, current_moves = placements
        if self.get_game_phase(player) == 0:
          count_placements += 1
        if move[0] != 'none':
          board[move[0]] = 0
        if move[2] != 'none':
          board[move[2]] = 0
          current_moves = 0
        elif move[2] == 'none':
          current_moves += 1
        board[move[1]] = player
        if current_moves > 50:
          print(current_moves)

        placements = (count_placements, current_moves)

        image = self.arrayToImage(board, placements)
        self.pieces = np.copy(image)







#game abstract

In [None]:
class Game():
    """
    This class specifies the base Game class. To define your own game, subclass
    this class and implement the functions below. This works when the game is
    two-player, adversarial and turn-based.

    Use 1 for player1 and -1 for player2.

    See othello/OthelloGame.py for an example implementation.
    """
    def __init__(self):
        pass

    def getInitBoard(self):
        """
        Returns:
            startBoard: a representation of the board (ideally this is the form
                        that will be the input to your neural network)
        """
        pass

    def getBoardSize(self):
        """
        Returns:
            (x,y): a tuple of board dimensions
        """
        pass

    def getActionSize(self):
        """
        Returns:
            actionSize: number of all possible actions
        """
        pass

    def getNextState(self, board, player, action):
        """
        Input:
            board: current board
            player: current player (1 or -1)
            action: action taken by current player

        Returns:
            nextBoard: board after applying action
            nextPlayer: player who plays in the next turn (should be -player)
        """
        pass

    def getValidMoves(self, board, player):
        """
        Input:
            board: current board
            player: current player

        Returns:
            validMoves: a binary vector of length self.getActionSize(), 1 for
                        moves that are valid from the current board and player,
                        0 for invalid moves
        """
        pass

    def getGameEnded(self, board, player):
        """
        Input:
            board: current board
            player: current player (1 or -1)

        Returns:
            r: 0 if game has not ended. 1 if player won, -1 if player lost,
               small non-zero value for draw.

        """
        pass

    def getCanonicalForm(self, board, player):
        """
        Input:
            board: current board
            player: current player (1 or -1)

        Returns:
            canonicalBoard: returns canonical form of board. The canonical form
                            should be independent of player. For e.g. in chess,
                            the canonical form can be chosen to be from the pov
                            of white. When the player is white, we can return
                            board as is. When the player is black, we can invert
                            the colors and return the board.
        """
        pass

    def getSymmetries(self, board, pi):
        """
        Input:
            board: current board
            pi: policy vector of size self.getActionSize()

        Returns:
            symmForms: a list of [(board,pi)] where each tuple is a symmetrical
                       form of the board and the corresponding pi vector. This
                       is used when training the neural network from examples.
        """
        pass

    def stringRepresentation(self, board):
        """
        Input:
            board: current board

        Returns:
            boardString: a quick conversion of board to a string format.
                         Required by MCTS for hashing.
        """
        pass


#ninemensmorris game

In [None]:
from __future__ import print_function
import sys
sys.path.append('..')
import numpy as np
import copy

class NineMensMorrisGame(Game):
    square_content = {
        -1: "B",
        +0: "-",
        +1: "W"
    }

    def __init__(self):
      self.n = 5
      self.all_moves = self.get_all_moves()
      self.policy_rotation_vector = self.get_policy_roation90()
      self.MAX_MOVES_WITHOUT_MILL = 200


    def get_all_moves(self):
       moves = self.get_all_moves_phase_zero() + self.get_all_moves_phase_one_and_two()
       return list(moves)

    def get_policy_roation90(self):

        rotation90 = [-1] * len(self.all_moves)

        i = 0
        while i < len(self.all_moves):

            move = self.all_moves[i]
            rotatedmove = self.rotate(move)
            newindex = self.all_moves.index(rotatedmove)
            rotation90[i] = newindex

            i+=1

        return rotation90

    def rotate(self, move):

        if move[0] == 'none':
            neworigin = 'none'

        elif move[0] in [6,7,14,15,22,23]:
            neworigin = move[0] - 6

        else:
            neworigin = move[0] + 2

        if move[1] in [6,7,14,15,22,23]:
            newdestination = move[1] - 6

        else:
            newdestination = move[1] + 2

        if move[2] == 'none':
            newenemy = 'none'

        elif move[2] in [6,7,14,15,22,23]:
            newenemy = move[2] - 6

        else:
            newenemy = move[2] + 2

        return (neworigin, newdestination, newenemy)

    def get_all_moves_phase_zero(self):

        moves = []
        index = 0

        while index < 24:

            moves.append(("none",index,"none"))
            count = 0

            while count < 24:

                if count != index:

                    moves.append(("none",index,count))

                count += 1

            index += 1

        return list(moves)

    def get_all_moves_phase_one_and_two(self):

        moves = []
        index_origin = 0

        while index_origin < 24:

            index_move = 0

            while index_move < 24:

                if index_move != index_origin:

                    moves.append((index_origin,index_move,"none"))

                    count = 0

                    while count <24:

                        if (count != index_move)and(count != index_origin):

                            moves.append((index_origin,index_move,count))

                        count += 1

                index_move += 1

            index_origin += 1

        return list(moves)

    def getInitBoard(self):

        # return initial board
        b = Board()

        return np.array(b.pieces)

    def getBoardSize(self):
        # (a,b) tuple
        return (6, 6)

    def getActionSize(self):
        # return number of actions
        return len(self.all_moves)

    def getNextState(self, board, player, move):
        # if player takes action on board, return next (board,player)
        # action must be a valid move
        b = Board()
        b.pieces = np.copy(board)
        # b.pieces[0] = np.copy(board[0])
        # b.pieces[1] = np.copy(board[1])
        # b.pieces[2] = np.copy(board[2])
        # b.pieces[3] = np.copy(board[3])
        # b.pieces[4] = np.copy(board[4])
        # b.pieces[5] = np.copy(board[5])


        b.execute_move(player, move, self.all_moves)

        return (b.pieces, -player)

    def getValidMoves(self, board, player):
        # return a fixed size binary vector
        b = Board()
        b.pieces = np.copy(board)
        # b.pieces[0] = np.copy(board[0])
        # b.pieces[1] = np.copy(board[1])
        # b.pieces[2] = np.copy(board[2])
        # b.pieces[3] = np.copy(board[3])
        # b.pieces[4] = np.copy(board[4])
        # b.pieces[5] = np.copy(board[5])

        valid_moves = b.get_legal_move_vector(player, self.all_moves)

        return np.array(valid_moves)

    def getGameEnded(self, board, player):
        # return 0 if not ended, 1 if player 1 won, -1 if player 1 lost
        # player = 1
        assert(not isinstance(board, str))
        #  b = Board()
        # counter = 0
        #  for element in board:
        #    b.pieces[counter] = int(element)
        #    counter += 1
        #else:
        b = Board()
        b.pieces = np.copy(board)
        # b.pieces[0] = np.copy(board[0])
        # b.pieces[1] = np.copy(board[1])
        # b.pieces[2] = np.copy(board[2])
        # b.pieces[3] = np.copy(board[3])
        # b.pieces[4] = np.copy(board[4])
        # b.pieces[5] = np.copy(board[5])
        if b.pieces[4][1] >= 50:
            return 0.0001
        elif not b.has_legal_moves(player):
            return -1
        elif not b.has_legal_moves(-player):
            return 1
        elif len(b.get_player_pieces(player)) < 3 and b.pieces[4][0] == 18:
            return -1
        elif len(b.get_player_pieces(-player)) < 3 and b.pieces[4][0] == 18:
            return 1
        elif b.has_legal_moves(-player) and b.has_legal_moves(player):
            return 0

    """
    multiplies each element with the given player, resulting in a canonical
    board from the perstpective of the given player. Tested on July 19th,
    works as intended
    """
    def getCanonicalForm(self, board, player):
        b = np.zeros((6,6), dtype=int)
        count_placements = copy.deepcopy(board[4][0])
        current_moves = copy.deepcopy(board[4][1])
        index = 0
        while index < 4:
          item = 0
          while item < 6:
            b[index][item] = board[index][item] * player
            item += 1
          index += 1

        b[4][0] = count_placements
        b[4][1] = current_moves
        return b

    def getSymmetries(self, board, pi):

        assert(len(pi) == len(self.all_moves))
        b = Board()
        b.pieces = np.copy(board)
        #b.pieces[0] = np.copy(board[0])
        #b.pieces[1] = np.copy(board[1])
        #b.pieces[2] = np.copy(board[2])
        #b.pieces[3] = np.copy(board[3])
        #b.pieces[4] = np.copy(board[4])
        #b.pieces[5] = np.copy(board[5])
        results = b.get_board_rotations(pi, self.all_moves, self.policy_rotation_vector)

        return results

    def stringRepresentation(self, board):
        board_s = ""
        index = 0
        i = 0
        while i < 4:
          while index < 6:
            board_s = board_s + str(board[i][index]) + ","
            index += 1
          index = 0
          i += 1
        board_s = board_s + str(board[4][0]) + ","
        board_s = board_s + str(board[4][1])

        return board_s

    def stringRepresentationReadable(self, board):

        board_s = ""
        index = 0
        i = 0
        while i < 4:
          while index < 6:
            board_s = board_s + str(board[i][index]) + ","
            index += 1
          index = 0
          i += 1
        board_s = board_s + str(board[4][0]) + ","
        board_s = board_s + str(board[4][1])

        return board_s


#neuralnet abstract

In [None]:
class NeuralNet():
    """
    This class specifies the base NeuralNet class. To define your own neural
    network, subclass this class and implement the functions below. The neural
    network does not consider the current player, and instead only deals with
    the canonical form of the board.

    See othello/NNet.py for an example implementation.
    """

    def __init__(self, game):
        pass

    def train(self, examples):
        """
        This function trains the neural network with examples obtained from
        self-play.

        Input:
            examples: a list of training examples, where each example is of form
                      (board, pi, v). pi is the MCTS informed policy vector for
                      the given board, and v is its value. The examples has
                      board in its canonical form.
        """
        pass

    def predict(self, board):
        """
        Input:
            board: current board in its canonical form.

        Returns:
            pi: a policy vector for the current board- a numpy array of length
                game.getActionSize
            v: a float in [-1,1] that gives the value of the current board
        """
        pass

    def save_checkpoint(self, folder, filename):
        """
        Saves the current neural network (with its parameters) in
        folder/filename
        """
        pass

    def load_checkpoint(self, folder, filename):
        """
        Loads parameters of the neural network from folder/filename
        """
        pass


#ninemensmorris nnet

In [None]:
pip install tensorflow



keras

In [None]:
import sys
sys.path.append('..')

import argparse
import tensorflow as tf

class NOTACTIVENineMensMorrisNNet():
    def __init__(self, game, args):
        # game params
        self.board_x, self.board_y = game.getBoardSize()
        self.action_size = game.getActionSize()
        self.args = args

        # Neural Net
        self.input_boards = tf.keras.Input(shape=(self.board_x, self.board_y))    # s: batch_size x board_x x board_y

        x_image = tf.keras.layers.Reshape((self.board_x, self.board_y, 1))(self.input_boards)                # batch_size  x board_x x board_y x 1
        h_conv1 = tf.keras.layers.Activation('relu')(tf.keras.layers.BatchNormalization(axis=3)(tf.keras.layers.Conv2D(args.num_channels, 3, padding='same', use_bias=False)(x_image)))         # batch_size  x board_x x board_y x num_channels
        h_conv2 = tf.keras.layers.Activation('relu')(tf.keras.layers.BatchNormalization(axis=3)(tf.keras.layers.Conv2D(args.num_channels, 3, padding='same', use_bias=False)(h_conv1)))         # batch_size  x board_x x board_y x num_channels
        h_conv3 = tf.keras.layers.Activation('relu')(tf.keras.layers.BatchNormalization(axis=3)(tf.keras.layers.Conv2D(args.num_channels, 3, padding='valid', use_bias=False)(h_conv2)))        # batch_size  x (board_x-2) x (board_y-2) x num_channels
        h_conv4 = tf.keras.layers.Activation('relu')(tf.keras.layers.BatchNormalization(axis=3)(tf.keras.layers.Conv2D(args.num_channels, 3, padding='valid', use_bias=False)(h_conv3)))        # batch_size  x (board_x-4) x (board_y-4) x num_channels
        h_conv4_flat = tf.keras.layers.Flatten()(h_conv4)
        s_fc1 = tf.keras.layers.Dropout(args.dropout)(tf.keras.layers.Activation('relu')(tf.keras.layers.BatchNormalization(axis=1)(tf.keras.layers.Dense(1024, use_bias=False)(h_conv4_flat))))  # batch_size x 1024
        s_fc2 = tf.keras.layers.Dropout(args.dropout)(tf.keras.layers.Activation('relu')(tf.keras.layers.BatchNormalization(axis=1)(tf.keras.layers.Dense(512, use_bias=False)(s_fc1))))          # batch_size x 1024
        self.pi = tf.keras.layers.Dense(self.action_size, activation='softmax', name='pi')(s_fc2)   # batch_size x self.action_size
        self.v = tf.keras.layers.Dense(1, activation='tanh', name='v')(s_fc2)                    # batch_size x 1

        self.model = tf.keras.Model(inputs=self.input_boards, outputs=[self.pi, self.v])
        self.model.compile(loss=['categorical_crossentropy','mean_squared_error'], optimizer=tf.keras.optimizers.Adam(args.lr))



pytorch

In [None]:
import sys
sys.path.append('..')

import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class NineMensMorrisNNet(nn.Module):
    def __init__(self, game, args):
        # game params
        self.board_x, self.board_y = game.getBoardSize()
        self.action_size = game.getActionSize()
        self.args = args

        super(NineMensMorrisNNet, self).__init__()
        self.conv1 = nn.Conv2d(1, args.num_channels, 3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1)
        self.conv4 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1)

        self.bn1 = nn.BatchNorm2d(args.num_channels)
        self.bn2 = nn.BatchNorm2d(args.num_channels)
        self.bn3 = nn.BatchNorm2d(args.num_channels)
        self.bn4 = nn.BatchNorm2d(args.num_channels)

        self.fc1 = nn.Linear(args.num_channels*(self.board_x-4)*(self.board_y-4), 1024)
        self.fc_bn1 = nn.BatchNorm1d(1024)

        self.fc2 = nn.Linear(1024, 512)
        self.fc_bn2 = nn.BatchNorm1d(512)

        self.fc3 = nn.Linear(512, self.action_size)

        self.fc4 = nn.Linear(512, 1)

    def forward(self, s):
        #                                                           s: batch_size x board_x x board_y
        s = s.view(-1, 1, self.board_x, self.board_y)                # batch_size x 1 x board_x x board_y
        s = F.relu(self.bn1(self.conv1(s)))                          # batch_size x num_channels x board_x x board_y
        s = F.relu(self.bn2(self.conv2(s)))                          # batch_size x num_channels x board_x x board_y
        s = F.relu(self.bn3(self.conv3(s)))                          # batch_size x num_channels x (board_x-2) x (board_y-2)
        s = F.relu(self.bn4(self.conv4(s)))                          # batch_size x num_channels x (board_x-4) x (board_y-4)
        s = s.view(-1, self.args.num_channels*(self.board_x-4)*(self.board_y-4))

        s = F.dropout(F.relu(self.fc_bn1(self.fc1(s))), p=self.args.dropout, training=self.training)  # batch_size x 1024
        s = F.dropout(F.relu(self.fc_bn2(self.fc2(s))), p=self.args.dropout, training=self.training)  # batch_size x 512

        pi = self.fc3(s)                                                                         # batch_size x action_size
        v = self.fc4(s)                                                                          # batch_size x 1

        return F.log_softmax(pi, dim=1), torch.tanh(v)



#nnetwrapper

keras

In [None]:
import argparse
import os
import shutil
import time
import random
import numpy as np
import math
import sys
sys.path.append('../..')

import argparse



# args = dotdict({
#     'lr': 0.001,
#     'dropout': 0.3,
#     'epochs': 10,
#     'batch_size': 64,
#     'cuda': False,
#     'num_channels': 512,
# })

class NOTACTIVENNetWrapper(NeuralNet):
    def __init__(self, game):
        self.nnet = NineMensMorrisNNet(game, args)
        self.board_x, self.board_y = game.getBoardSize()
        self.action_size = game.getActionSize()

    def train(self, examples):
        """
        examples: list of examples, each example is of form (board, pi, v)
        """

        input_boards, target_pis, target_vs = list(zip(*examples))
        input_boards = np.asarray(input_boards)
        target_pis = np.asarray(target_pis)
        target_vs = np.asarray(target_vs)
        self.nnet.model.fit(x = input_boards, y = [target_pis, target_vs], batch_size = args.batch_size, epochs = args.epochs)

    def predict(self, board):
        """
        board: np array with board
        """
        # timing
        start = time.time()

        # preparing input
        board = board[np.newaxis, :, :]

        # run

        pi, v = self.nnet.model.predict(board, verbose=False)

        print('PREDICTION TIME TAKEN : {0:03f}'.format(time.time()-start))
        return pi[0], v[0]

    def save_checkpoint(self, folder, filename):
        # change extension
        filename = filename.split(".")[0] + ".h5"

        filepath = folder + filename
        if not os.path.exists(filepath):
            print("Checkpoint Directory does not exist! Making directory {}".format(folder))
            os.mkdir(filepath)
        else:
            print("Checkpoint Directory exists! ")
        self.nnet.model.save_weights(filepath)

    def load_checkpoint(self, folder, filename):
        # change extension
        filename = filename.split(".")[0] + ".h5"

        # https://github.com/pytorch/examples/blob/master/imagenet/main.py#L98
        filepath = folder + filename
        if not os.path.exists(filepath):
            raise("No model in path {}".format(filepath))

        self.nnet.model.load_weights(filepath)


pytorch

In [None]:

import os
import sys
import time

import numpy as np
from tqdm import tqdm

sys.path.append('../../')

import torch
import torch.optim as optim

# args = dotdict({
#     'lr': 0.001,
#     'dropout': 0.3,
#     'epochs': 10,
#     'batch_size': 64,
#     'cuda': torch.cuda.is_available(),
#     'num_channels': 512,
# })


class NNetWrapper(NeuralNet):
    def __init__(self, game):
        self.nnet = NineMensMorrisNNet(game, args)
        self.board_x, self.board_y = game.getBoardSize()
        self.action_size = game.getActionSize()

        if args.cuda:
            #self.nnet.cuda.set_device({"cuda:0"})
            self.nnet.to("cuda:0")

    def train(self, examples):
        """
        examples: list of examples, each example is of form (board, pi, v)
        """
        optimizer = optim.Adam(self.nnet.parameters())

        for epoch in range(args.epochs):
            print('EPOCH ::: ' + str(epoch + 1))
            self.nnet.train()
            pi_losses = AverageMeter()
            v_losses = AverageMeter()

            batch_count = int(len(examples) / args.batch_size)

            t = tqdm(range(batch_count), desc='Training Net')
            for _ in t:
                sample_ids = np.random.randint(len(examples), size=args.batch_size)
                boards, pis, vs = list(zip(*[examples[i] for i in sample_ids]))
                boards = torch.FloatTensor(np.array(boards).astype(np.float64))
                target_pis = torch.FloatTensor(np.array(pis))
                target_vs = torch.FloatTensor(np.array(vs).astype(np.float64))

                # predict
                if args.cuda:
                    boards, target_pis, target_vs = boards.contiguous().cuda(), target_pis.contiguous().cuda(), target_vs.contiguous().cuda()

                # compute output
                out_pi, out_v = self.nnet(boards)
                l_pi = self.loss_pi(target_pis, out_pi)
                l_v = self.loss_v(target_vs, out_v)
                total_loss = l_pi + l_v

                # record loss
                pi_losses.update(l_pi.item(), boards.size(0))
                v_losses.update(l_v.item(), boards.size(0))
                t.set_postfix(Loss_pi=pi_losses, Loss_v=v_losses)

                # compute gradient and do SGD step
                optimizer.zero_grad()
                total_loss.backward()
                optimizer.step()

    def predict(self, board):
        """
        board: np array with board
        """
        # timing
        start = time.time()

        # preparing input
        board = torch.FloatTensor(board.astype(np.float64))
        if args.cuda: board = board.contiguous().cuda()
        board = board.view(1, self.board_x, self.board_y)
        self.nnet.eval()
        with torch.no_grad():
            pi, v = self.nnet(board)

        # print('PREDICTION TIME TAKEN : {0:03f}'.format(time.time()-start))
        return torch.exp(pi).data.cpu().numpy()[0], v.data.cpu().numpy()[0]

    def loss_pi(self, targets, outputs):
        return -torch.sum(targets * outputs) / targets.size()[0]

    def loss_v(self, targets, outputs):
        return torch.sum((targets - outputs.view(-1)) ** 2) / targets.size()[0]

    def save_checkpoint(self, folder, filename):
        filepath = folder + filename
        if not os.path.exists(folder):
            print("Checkpoint Directory does not exist! Making directory {}".format(folder))
            os.mkdir(folder)
        else:
            print("Checkpoint Directory exists! ")
            print("saving to path '%s", filepath)
        torch.save({
            'state_dict': self.nnet.state_dict(),
        }, filepath)

    def load_checkpoint(self, folder, filename):
        # https://github.com/pytorch/examples/blob/master/imagenet/main.py#L98
        filepath = folder + filename
        print("Trying to load checkpoint")
        if not os.path.exists(filepath):
            raise ("No model in path {}".format(filepath))
        map_location = None if args.cuda else 'cpu'
        checkpoint = torch.load(filepath, map_location=map_location)
        self.nnet.load_state_dict(checkpoint['state_dict'])



#arena

In [None]:
import logging

from tqdm import tqdm

log = logging.getLogger(__name__)


class Arena():
    """
    An Arena class where any 2 agents can be pit against each other.
    """

    def __init__(self, player1, player2, game, display=None):
        """
        Input:
            player 1,2: two functions that takes board as input, return action
            game: Game object
            display: a function that takes board as input and prints it (e.g.
                     display in othello/OthelloGame). Is necessary for verbose
                     mode.

        see othello/OthelloPlayers.py for an example. See pit.py for pitting
        human players/other baselines with each other.
        """
        self.player1 = player1
        self.player2 = player2
        self.game = game
        self.display = display

    def playGame(self, verbose=False):
        """
        Executes one episode of a game.

        Returns:
            either
                winner: player who won the game (1 if player1, -1 if player2)
            or
                draw result returned from the game that is neither 1, -1, nor 0.
        """
        players = [self.player2, None, self.player1]
        curPlayer = 1
        board = self.game.getInitBoard()
        it = 0
        while self.game.getGameEnded(board, curPlayer) == 0:
            it += 1
            if verbose:
                assert self.display
                print("Turn ", str(it), "Player ", str(curPlayer))
                self.display(board)
            action = players[curPlayer + 1](self.game.getCanonicalForm(board, curPlayer))

            valids = self.game.getValidMoves(self.game.getCanonicalForm(board, curPlayer), 1)

            if valids[action] == 0:
                log.error(f'Action {action} is not valid!')
                log.debug(f'valids = {valids}')
                assert valids[action] > 0
            board, curPlayer = self.game.getNextState(board, curPlayer, action)
        if verbose:
            assert self.display
            print("Game over: Turn ", str(it), "Result ", str(self.game.getGameEnded(board, 1)))
            self.display(board)
        return curPlayer * self.game.getGameEnded(board, curPlayer)

    def playGames(self, num, verbose=False):
        """
        Plays num games in which player1 starts num/2 games and player2 starts
        num/2 games.

        Returns:
            oneWon: games won by player1
            twoWon: games won by player2
            draws:  games won by nobody
        """

        num = int(num / 2)
        oneWon = 0
        twoWon = 0
        draws = 0
        for _ in tqdm(range(num), desc="Arena.playGames (1)"):
            gameResult = self.playGame(verbose=verbose)
            if gameResult == 1:
                oneWon += 1
            elif gameResult == -1:
                twoWon += 1
            else:
                draws += 1

        self.player1, self.player2 = self.player2, self.player1

        for _ in tqdm(range(num), desc="Arena.playGames (2)"):
            gameResult = self.playGame(verbose=verbose)
            if gameResult == -1:
                oneWon += 1
            elif gameResult == 1:
                twoWon += 1
            else:
                draws += 1

        return oneWon, twoWon, draws


#coach

In [None]:
import logging
import os
import sys
from collections import deque
from pickle import Pickler, Unpickler
from random import shuffle

import numpy as np
from tqdm import tqdm

log = logging.getLogger(__name__)


class Coach():
    """
    This class executes the self-play + learning. It uses the functions defined
    in Game and NeuralNet. args are specified in main.py.
    """

    def __init__(self, game, nnet, args):
        self.game = game
        self.nnet = nnet
        self.pnet = self.nnet.__class__(self.game)  # the competitor network
        self.args = args
        self.mcts = MCTS(self.game, self.nnet, self.args)
        self.trainExamplesHistory = []  # history of examples from args.numItersForTrainExamplesHistory latest iterations
        self.skipFirstSelfPlay = False  # can be overriden in loadTrainExamples()

    def executeEpisode(self):
        """
        This function executes one episode of self-play, starting with player 1.
        As the game is played, each turn is added as a training example to
        trainExamples. The game is played till the game ends. After the game
        ends, the outcome of the game is used to assign values to each example
        in trainExamples.

        It uses a temp=1 if episodeStep < tempThreshold, and thereafter
        uses temp=0.

        Returns:
            trainExamples: a list of examples of the form (canonicalBoard, currPlayer, pi,v)
                           pi is the MCTS informed policy vector, v is +1 if
                           the player eventually won the game, else -1.
        """
        trainExamples = []
        board = self.game.getInitBoard()
        self.curPlayer = 1
        episodeStep = 0

        while True:
            episodeStep += 1
            canonicalBoard = self.game.getCanonicalForm(board, self.curPlayer)
            temp = int(episodeStep < self.args.tempThreshold)

            pi = self.mcts.getActionProb(canonicalBoard, temp=temp)
            sym = self.game.getSymmetries(canonicalBoard, pi)
            for b, p in sym:
                trainExamples.append([b, self.curPlayer, p, None])

            action = np.random.choice(len(pi), p=pi)
            board, self.curPlayer = self.game.getNextState(board, self.curPlayer, action)

            r = self.game.getGameEnded(board, self.curPlayer)

            if r != 0:
                return [(x[0], x[2], r * ((-1) ** (x[1] != self.curPlayer))) for x in trainExamples]

    def learn(self):
        """
        Performs numIters iterations with numEps episodes of self-play in each
        iteration. After every iteration, it retrains neural network with
        examples in trainExamples (which has a maximum length of maxlenofQueue).
        It then pits the new neural network against the old one and accepts it
        only if it wins >= updateThreshold fraction of games.
        """

        for i in range(1, self.args.numIters + 1):
            # bookkeeping
            log.info(f'Starting Iter #{i} ...')
            # examples of the iteration
            if not self.skipFirstSelfPlay or i > 1:
                iterationTrainExamples = deque([], maxlen=self.args.maxlenOfQueue)

                for _ in tqdm(range(self.args.numEps), desc="Self Play"):
                    self.mcts = MCTS(self.game, self.nnet, self.args)  # reset search tree
                    iterationTrainExamples += self.executeEpisode()

                # save the iteration examples to the history
                self.trainExamplesHistory.append(iterationTrainExamples)

            if len(self.trainExamplesHistory) > self.args.numItersForTrainExamplesHistory:
                log.warning(
                    f"Removing the oldest entry in trainExamples. len(trainExamplesHistory) = {len(self.trainExamplesHistory)}")
                self.trainExamplesHistory.pop(0)
            # backup history to a file
            # NB! the examples were collected using the model from the previous iteration, so (i-1)
            self.saveTrainExamples(i - 1)

            # shuffle examples before training
            trainExamples = []
            for e in self.trainExamplesHistory:
                trainExamples.extend(e)
            shuffle(trainExamples)

            # training new network, keeping a copy of the old one
            self.nnet.save_checkpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
            self.pnet.load_checkpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
            pmcts = MCTS(self.game, self.pnet, self.args)

            self.nnet.train(trainExamples)
            nmcts = MCTS(self.game, self.nnet, self.args)

            log.info('PITTING AGAINST PREVIOUS VERSION')
            arena = Arena(lambda x: np.argmax(pmcts.getActionProb(x, temp=0)),
                          lambda x: np.argmax(nmcts.getActionProb(x, temp=0)), self.game)
            pwins, nwins, draws = arena.playGames(self.args.arenaCompare)

            log.info('NEW/PREV WINS : %d / %d ; DRAWS : %d' % (nwins, pwins, draws))
            if pwins + nwins == 0 or float(nwins) / (pwins + nwins) < self.args.updateThreshold:
                log.info('REJECTING NEW MODEL')
                self.nnet.load_checkpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
            else:
                log.info('ACCEPTING NEW MODEL')
                self.nnet.save_checkpoint(folder=self.args.checkpoint, filename=self.getCheckpointFile(i))
                self.nnet.save_checkpoint(folder=self.args.checkpoint, filename='best.pth.tar')

    def getCheckpointFile(self, iteration):
        return 'checkpoint_' + str(iteration) + '.pth.tar'

    def saveTrainExamples(self, iteration):
        folder = self.args.checkpoint
        log.warning('trying to save')
        if not os.path.exists(folder):
            os.makedirs(folder)
        filename = os.path.join(folder, self.getCheckpointFile(iteration) + ".examples")
        with open(filename, "wb+") as f:
            Pickler(f).dump(self.trainExamplesHistory)
        f.closed

    def loadTrainExamples(self):
        log.warning('trying to load examples')
        modelFile = os.path.join(self.args.load_folder_file[0], self.args.load_folder_file[1])
        examplesFile = modelFile + ".examples"
        if not os.path.isfile(examplesFile):
            log.warning(f'File "{examplesFile}" with trainExamples not found!')
            r = input("Continue? [y|n]")
            if r != "y":
                sys.exit()
        else:
            log.info("File with trainExamples found. Loading it...")
            with open(examplesFile, "rb") as f:
                self.trainExamplesHistory = Unpickler(f).load()
            log.info('Loading done!')

            # examples based on the model were already collected (loaded)
            self.skipFirstSelfPlay = True


install log package

In [None]:
pip install coloredlogs



#main

In [None]:
import logging
import coloredlogs
#NOTE -> TO SWITCH BETWEEN KERAS AND PYTORCH, CHANGE NAMES FROM NNET AND NNETWRAPPER
log = logging.getLogger(__name__)

coloredlogs.install(level='INFO')  # Change this to DEBUG to see more info.

args = dotdict({
    'numIters': 10,             # default 1000 -> takes too long
    'numEps': 100,              # Number of complete self-play games to simulate during a new iteration. default 100
    'tempThreshold': 12,        # default 15
    'updateThreshold': 0.6,     # During arena playoff, new neural net will be accepted if threshold or more of games are won. default 0.6
    'maxlenOfQueue': 200000,    # Number of game examples to train the neural networks. default 200000
    'numMCTSSims': 25,          # Number of games moves for MCTS to simulate. default 25
    'arenaCompare': 40,         # Number of games to play during arena play to determine if new net will be accepted. default 40
    'cpuct': 1,                 # default 1

    'checkpoint': '/content/drive/My Drive/Bachelorarbeit/training/checkpoint',
    'load_model': True,
    'load_folder_file': ('/content/drive/My Drive/Bachelorarbeit/training/20it/','22it.pth.tar'),
    'numItersForTrainExamplesHistory': 20,

    'lr': 0.005, #default 0.001
    'dropout': 0.3,
    'epochs': 15, #default 10 -> try 15 or 20
    'batch_size': 64,
    #'cuda': False,
    'cuda': torch.cuda.is_available(),
    'num_channels': 512,

})


def main():
    log.info('Loading %s...', NineMensMorrisGame.__name__)
    g = NineMensMorrisGame()

    log.info('Loading %s...', NNetWrapper.__name__)
    nnet = NNetWrapper(g)
    log.info('cuda available "%s"', torch.cuda.is_available())
    if args.load_model:
        log.info('Loading checkpoint "%s/%s"...', args.load_folder_file[0], args.load_folder_file[1])
        nnet.load_checkpoint(args.load_folder_file[0], args.load_folder_file[1])
    else:
       log.warning('Not loading a checkpoint!')

    log.info('Loading the Coach...')
    c = Coach(g, nnet, args)

    if args.load_model:
        log.info("Loading 'trainExamples' from file...")
        c.loadTrainExamples()

    log.info('Starting the learning process 🎉')
    c.learn()


if __name__ == "__main__":
    main()


2023-07-22 13:43:36 0f17dae9fd9c __main__[3690] INFO Loading NineMensMorrisGame...
2023-07-22 13:43:37 0f17dae9fd9c __main__[3690] INFO Loading NNetWrapper...
2023-07-22 13:43:37 0f17dae9fd9c __main__[3690] INFO cuda available "True"
2023-07-22 13:43:37 0f17dae9fd9c __main__[3690] INFO Loading checkpoint "/content/drive/My Drive/Bachelorarbeit/training/20it//22it.pth.tar"...
2023-07-22 13:43:38 0f17dae9fd9c __main__[3690] INFO Loading the Coach...
2023-07-22 13:43:38 0f17dae9fd9c __main__[3690] INFO Loading 'trainExamples' from file...
2023-07-22 13:43:38 0f17dae9fd9c __main__[3690] INFO File with trainExamples found. Loading it...


Trying to load checkpoint


2023-07-22 13:44:15 0f17dae9fd9c __main__[3690] INFO Loading done!
2023-07-22 13:44:15 0f17dae9fd9c __main__[3690] INFO Starting the learning process 🎉
2023-07-22 13:44:15 0f17dae9fd9c __main__[3690] INFO Starting Iter #1 ...


Checkpoint Directory exists! 
saving to path '%s /content/drive/My Drive/Bachelorarbeit/training/checkpointtemp.pth.tar
Trying to load checkpoint
EPOCH ::: 1


Training Net: 100%|██████████| 2214/2214 [00:27<00:00, 81.64it/s, Loss_pi=1.50e+00, Loss_v=1.58e-01] 


EPOCH ::: 2


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 107.40it/s, Loss_pi=1.36e+00, Loss_v=1.46e-01]


EPOCH ::: 3


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 107.80it/s, Loss_pi=1.25e+00, Loss_v=1.40e-01]


EPOCH ::: 4


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 106.92it/s, Loss_pi=1.14e+00, Loss_v=1.35e-01]


EPOCH ::: 5


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 107.05it/s, Loss_pi=1.06e+00, Loss_v=1.29e-01]


EPOCH ::: 6


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 109.71it/s, Loss_pi=9.81e-01, Loss_v=1.27e-01]


EPOCH ::: 7


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 110.26it/s, Loss_pi=9.09e-01, Loss_v=1.23e-01]


EPOCH ::: 8


Training Net: 100%|██████████| 2214/2214 [00:19<00:00, 110.74it/s, Loss_pi=8.47e-01, Loss_v=1.19e-01]


EPOCH ::: 9


Training Net: 100%|██████████| 2214/2214 [00:19<00:00, 110.79it/s, Loss_pi=7.89e-01, Loss_v=1.17e-01]


EPOCH ::: 10


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 109.98it/s, Loss_pi=7.40e-01, Loss_v=1.15e-01]


EPOCH ::: 11


Training Net: 100%|██████████| 2214/2214 [00:19<00:00, 110.95it/s, Loss_pi=7.00e-01, Loss_v=1.12e-01]


EPOCH ::: 12


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 109.70it/s, Loss_pi=6.45e-01, Loss_v=1.11e-01]


EPOCH ::: 13


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 110.15it/s, Loss_pi=6.14e-01, Loss_v=1.09e-01]


EPOCH ::: 14


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 110.37it/s, Loss_pi=5.82e-01, Loss_v=1.09e-01]


EPOCH ::: 15


Training Net: 100%|██████████| 2214/2214 [00:20<00:00, 110.55it/s, Loss_pi=5.51e-01, Loss_v=1.08e-01]
2023-07-22 13:49:47 0f17dae9fd9c __main__[3690] INFO PITTING AGAINST PREVIOUS VERSION
Arena.playGames (1): 100%|██████████| 20/20 [08:22<00:00, 25.11s/it]
Arena.playGames (2): 100%|██████████| 20/20 [07:53<00:00, 23.69s/it]
2023-07-22 14:06:03 0f17dae9fd9c __main__[3690] INFO NEW/PREV WINS : 3 / 26 ; DRAWS : 11
2023-07-22 14:06:03 0f17dae9fd9c __main__[3690] INFO REJECTING NEW MODEL
2023-07-22 14:06:03 0f17dae9fd9c __main__[3690] INFO Starting Iter #2 ...


Trying to load checkpoint


Self Play: 100%|██████████| 100/100 [55:15<00:00, 33.15s/it]


Checkpoint Directory exists! 
saving to path '%s /content/drive/My Drive/Bachelorarbeit/training/checkpointtemp.pth.tar
Trying to load checkpoint
EPOCH ::: 1


Training Net: 100%|██████████| 2666/2666 [00:25<00:00, 103.81it/s, Loss_pi=2.12e+00, Loss_v=2.28e-01]


EPOCH ::: 2


Training Net: 100%|██████████| 2666/2666 [00:25<00:00, 105.00it/s, Loss_pi=1.90e+00, Loss_v=1.98e-01]


EPOCH ::: 3


Training Net: 100%|██████████| 2666/2666 [00:25<00:00, 104.83it/s, Loss_pi=1.73e+00, Loss_v=1.81e-01]


EPOCH ::: 4


Training Net: 100%|██████████| 2666/2666 [00:25<00:00, 104.32it/s, Loss_pi=1.59e+00, Loss_v=1.71e-01]


EPOCH ::: 5


Training Net: 100%|██████████| 2666/2666 [00:25<00:00, 103.79it/s, Loss_pi=1.47e+00, Loss_v=1.61e-01]


EPOCH ::: 6


Training Net: 100%|██████████| 2666/2666 [00:25<00:00, 105.64it/s, Loss_pi=1.36e+00, Loss_v=1.54e-01]


EPOCH ::: 7


Training Net: 100%|██████████| 2666/2666 [00:24<00:00, 107.65it/s, Loss_pi=1.26e+00, Loss_v=1.51e-01]


EPOCH ::: 8


Training Net: 100%|██████████| 2666/2666 [00:24<00:00, 107.15it/s, Loss_pi=1.16e+00, Loss_v=1.44e-01]


EPOCH ::: 9


Training Net: 100%|██████████| 2666/2666 [00:24<00:00, 107.82it/s, Loss_pi=1.09e+00, Loss_v=1.42e-01]


EPOCH ::: 10


Training Net: 100%|██████████| 2666/2666 [00:24<00:00, 107.40it/s, Loss_pi=1.01e+00, Loss_v=1.37e-01]


EPOCH ::: 11


Training Net: 100%|██████████| 2666/2666 [00:24<00:00, 107.61it/s, Loss_pi=9.50e-01, Loss_v=1.34e-01]


EPOCH ::: 12


Training Net: 100%|██████████| 2666/2666 [00:24<00:00, 107.87it/s, Loss_pi=8.96e-01, Loss_v=1.31e-01]


EPOCH ::: 13


Training Net: 100%|██████████| 2666/2666 [00:24<00:00, 107.24it/s, Loss_pi=8.39e-01, Loss_v=1.29e-01]


EPOCH ::: 14


Training Net: 100%|██████████| 2666/2666 [00:25<00:00, 105.36it/s, Loss_pi=7.88e-01, Loss_v=1.25e-01]


EPOCH ::: 15


Training Net: 100%|██████████| 2666/2666 [00:25<00:00, 106.62it/s, Loss_pi=7.53e-01, Loss_v=1.24e-01]
2023-07-22 15:08:00 0f17dae9fd9c __main__[3690] INFO PITTING AGAINST PREVIOUS VERSION
Arena.playGames (1): 100%|██████████| 20/20 [06:46<00:00, 20.32s/it]
Arena.playGames (2): 100%|██████████| 20/20 [08:55<00:00, 26.76s/it]
2023-07-22 15:23:42 0f17dae9fd9c __main__[3690] INFO NEW/PREV WINS : 7 / 15 ; DRAWS : 18
2023-07-22 15:23:42 0f17dae9fd9c __main__[3690] INFO REJECTING NEW MODEL
2023-07-22 15:23:42 0f17dae9fd9c __main__[3690] INFO Starting Iter #3 ...


Trying to load checkpoint


Self Play: 100%|██████████| 100/100 [56:38<00:00, 33.99s/it]


Checkpoint Directory exists! 
saving to path '%s /content/drive/My Drive/Bachelorarbeit/training/checkpointtemp.pth.tar
Trying to load checkpoint
EPOCH ::: 1


Training Net: 100%|██████████| 3117/3117 [00:30<00:00, 103.40it/s, Loss_pi=2.45e+00, Loss_v=2.70e-01]


EPOCH ::: 2


Training Net: 100%|██████████| 3117/3117 [00:30<00:00, 102.56it/s, Loss_pi=2.19e+00, Loss_v=2.32e-01]


EPOCH ::: 3


Training Net: 100%|██████████| 3117/3117 [00:30<00:00, 103.00it/s, Loss_pi=2.01e+00, Loss_v=2.10e-01]


EPOCH ::: 4


Training Net: 100%|██████████| 3117/3117 [00:30<00:00, 103.43it/s, Loss_pi=1.86e+00, Loss_v=1.96e-01]


EPOCH ::: 5


Training Net: 100%|██████████| 3117/3117 [00:30<00:00, 103.79it/s, Loss_pi=1.72e+00, Loss_v=1.85e-01]


EPOCH ::: 6


Training Net: 100%|██████████| 3117/3117 [00:30<00:00, 103.89it/s, Loss_pi=1.59e+00, Loss_v=1.75e-01]


EPOCH ::: 7


Training Net: 100%|██████████| 3117/3117 [00:29<00:00, 104.37it/s, Loss_pi=1.48e+00, Loss_v=1.68e-01]


EPOCH ::: 8


Training Net: 100%|██████████| 3117/3117 [00:29<00:00, 104.43it/s, Loss_pi=1.39e+00, Loss_v=1.63e-01]


EPOCH ::: 9


Training Net: 100%|██████████| 3117/3117 [00:29<00:00, 104.71it/s, Loss_pi=1.30e+00, Loss_v=1.58e-01]


EPOCH ::: 10


Training Net: 100%|██████████| 3117/3117 [00:29<00:00, 106.04it/s, Loss_pi=1.21e+00, Loss_v=1.52e-01]


EPOCH ::: 11


Training Net: 100%|██████████| 3117/3117 [00:29<00:00, 106.65it/s, Loss_pi=1.14e+00, Loss_v=1.49e-01]


EPOCH ::: 12


Training Net: 100%|██████████| 3117/3117 [00:29<00:00, 105.83it/s, Loss_pi=1.08e+00, Loss_v=1.47e-01]


EPOCH ::: 13


Training Net: 100%|██████████| 3117/3117 [00:29<00:00, 106.25it/s, Loss_pi=1.00e+00, Loss_v=1.42e-01]


EPOCH ::: 14


Training Net: 100%|██████████| 3117/3117 [00:29<00:00, 103.95it/s, Loss_pi=9.51e-01, Loss_v=1.40e-01]


EPOCH ::: 15


Training Net: 100%|██████████| 3117/3117 [00:30<00:00, 101.24it/s, Loss_pi=8.96e-01, Loss_v=1.38e-01]
2023-07-22 16:28:19 0f17dae9fd9c __main__[3690] INFO PITTING AGAINST PREVIOUS VERSION
Arena.playGames (1): 100%|██████████| 20/20 [08:06<00:00, 24.31s/it]
Arena.playGames (2): 100%|██████████| 20/20 [07:50<00:00, 23.53s/it]
2023-07-22 16:44:16 0f17dae9fd9c __main__[3690] INFO NEW/PREV WINS : 10 / 16 ; DRAWS : 14
2023-07-22 16:44:16 0f17dae9fd9c __main__[3690] INFO REJECTING NEW MODEL
2023-07-22 16:44:16 0f17dae9fd9c __main__[3690] INFO Starting Iter #4 ...


Trying to load checkpoint


Self Play: 100%|██████████| 100/100 [55:56<00:00, 33.56s/it]


Checkpoint Directory exists! 
saving to path '%s /content/drive/My Drive/Bachelorarbeit/training/checkpointtemp.pth.tar
Trying to load checkpoint
EPOCH ::: 1


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.24it/s, Loss_pi=2.66e+00, Loss_v=3.03e-01]


EPOCH ::: 2


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.35it/s, Loss_pi=2.37e+00, Loss_v=2.60e-01]


EPOCH ::: 3


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.28it/s, Loss_pi=2.19e+00, Loss_v=2.30e-01]


EPOCH ::: 4


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.34it/s, Loss_pi=2.03e+00, Loss_v=2.16e-01]


EPOCH ::: 5


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.81it/s, Loss_pi=1.89e+00, Loss_v=2.02e-01]


EPOCH ::: 6


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 103.66it/s, Loss_pi=1.77e+00, Loss_v=1.89e-01]


EPOCH ::: 7


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.27it/s, Loss_pi=1.65e+00, Loss_v=1.83e-01]


EPOCH ::: 8


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.66it/s, Loss_pi=1.54e+00, Loss_v=1.76e-01]


EPOCH ::: 9


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.56it/s, Loss_pi=1.45e+00, Loss_v=1.68e-01]


EPOCH ::: 10


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 103.17it/s, Loss_pi=1.35e+00, Loss_v=1.65e-01]


EPOCH ::: 11


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.69it/s, Loss_pi=1.28e+00, Loss_v=1.60e-01]


EPOCH ::: 12


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.89it/s, Loss_pi=1.21e+00, Loss_v=1.57e-01]


EPOCH ::: 13


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 102.89it/s, Loss_pi=1.14e+00, Loss_v=1.53e-01]


EPOCH ::: 14


Training Net: 100%|██████████| 3564/3564 [00:34<00:00, 103.13it/s, Loss_pi=1.08e+00, Loss_v=1.51e-01]


EPOCH ::: 15


Training Net: 100%|██████████| 3564/3564 [00:35<00:00, 101.55it/s, Loss_pi=1.02e+00, Loss_v=1.47e-01]
2023-07-22 17:49:25 0f17dae9fd9c __main__[3690] INFO PITTING AGAINST PREVIOUS VERSION
Arena.playGames (1): 100%|██████████| 20/20 [07:40<00:00, 23.00s/it]
Arena.playGames (2):  25%|██▌       | 5/20 [02:27<07:41, 30.74s/it]