In [1]:
%load_ext line_profiler
import line_profiler

In [2]:
%load_ext Cython
from Cython.Compiler.Options import directive_scopes, directive_types
directive_types['linetrace'] = True
directive_types['binding'] = True

In [3]:
import os 
import importlib
import sys
import tensorflow as tf
import itertools
import numpy as np
import numba as nb
from math import sqrt, log, exp
from numpy import unravel_index
from random import choice, random, sample
np.random.seed(1337)  # for reproducibility
from keras.models import Sequential, Model, load_model
import keras.backend as K
import matplotlib.pyplot as plt
K.set_image_dim_ordering('th')
import time

Using TensorFlow backend.


In [4]:
class Ataxx:
    def __init__(self, board=None):
        if board is None:                  # if there is no initialization given
            self.data = np.zeros((7, 7), dtype=np.int8)   # then generate a board with starting init, and black(-1) takes first turn
            self.data[0, 0] = -1           
            self.data[6, 6] = -1
            self.data[0, 6] = 1
            self.data[6, 0] = 1
        else:
            self.data = board.copy()
            
    def reset(self, board=None):
        if board is None:
            self.data = np.zeros((7, 7), dtype=np.int8)
            self.data[0, 0] = -1           
            self.data[6, 6] = -1
            self.data[0, 6] = 1
            self.data[6, 0] = 1
        else:
            self.data = board.copy()
        
    def get_feature_map(self, turn, move):
        out = np.zeros((6, 9, 9), dtype=np.int8)
        # define 1 edge
        
        # edge
        for j in range(9):
            for k in range(9):
                if j == 0 or j == 8 or k == 0 or k == 8:
                    out[0, j, k] = 1
         
        # my pieces
        for j in range(9):
            for k in range(9):
                if j > 0 and j < 8 and k > 0 and k < 8:
                    if self.data[j-1, k-1] == turn:
                        out[1, j, k] = 1
        
        # op pieces
        for j in range(9):
            for k in range(9):
                if j > 0 and j < 8 and k > 0 and k < 8:
                    if self.data[j-1, k-1] == -turn:
                        out[2, j, k] = 1
         
        # last move
        if not move is None:               
            out[3, move[0][0]+1, move[0][1]+1] = 1
            out[4, move[1][0]+1, move[1][1]+1] = 1
            
        # whose first
        if turn == -1:
            for j in range(9):
                for k in range(9):
                    out[5, j, k] = 1
        return np.array(out)
    
    def plot(self, is_next_move=False, turn=None):                        # plot the board
        image = self.data.copy()
        if is_next_move:
            if turn not in [-1, 1]:
                raise ValueError("Turn must be -1 or 1, or Must input a turn for next moves")
            else:
                next_moves = self.get_moves(turn)
                if len(next_moves) == 0:
                    raise ValueError("Game is over already")
                next_pos = list(zip(*next_moves))[1]
                for pos in next_pos:
                    image[pos] = turn / 2
        plt.imshow(image, cmap='gray')
        plt.xticks(range(7), range(7))
        plt.yticks(range(7), range(7))
        plt.show()
        
    def get_greedy_move(self, turn, moves=None):
        best_score = -50
        # get all possible moves if not provided
        if moves is None:
            moves, corr_dict = self.get_moves(turn)
            for item in corr_dict:
                moves.append(item)
        
        if len(moves) == 0:
            raise ValueError('No Possible Moves')
        
        best_moves = []
        # calculate greedy move
        for (x0, y0), (x1, y1) in moves:
            tmp_score = 0
            if abs(x0-x1) <= 1 and abs(y0-y1) <= 1:
                tmp_score += 1
            for dr in range(-1, 2):
                for dc in range(-1, 2):
                    try:
                        if x1+dr >= 0 and y1+dc >= 0:
                            tmp_score += self.data[x1+dr, y1+dc] == -turn
                    except:
                        pass
            if tmp_score > best_score:
                best_moves = [((x0, y0), (x1, y1))]
                best_score = tmp_score
            elif tmp_score == best_score:
                best_moves.append(((x0, y0), (x1, y1)))
        return choice(best_moves)
                
    def is_valid(self, turn, pos, get_pos=False):
        r = pos[0]
        c = pos[1]
        if self.data[r, c] != 0:
            if not get_pos:
                return False
            else:
                return
        else:
            for dr in range(-2, 3):
                for dc in range(-2, 3):
                    new_r = r+dr
                    new_c = c+dc
                    if new_r >= 0 and new_c >= 0 and new_r < 7 and new_c < 7 and self.data[new_r, new_c] == turn:
                        if not get_pos:
                            return True
                        else:
                            yield new_r, new_c, dr, dc
            if not get_pos:
                return False
        
    def get_moves(self, turn):
        action_mask = np.zeros(792, dtype=np.int8)
        next_moves = []
        corr_dict = {}
        for r in range(7):
            for c in range(7):
                has_duplicate_move = False      # move within the radius of one of another friendly piece is called
                for new_r, new_c, dr, dc in self.is_valid(turn, (r, c), True): # duplicate move
                    if new_r >= 0 and new_c >= 0 and new_r < 7 and new_c < 7 and self.data[new_r, new_c] == turn:
                        if abs(dr) <= 1 and abs(dc) <=1:
                            if has_duplicate_move: 
                                cur_move = ((new_r, new_c), (r, c))
                                corr_dict[cur_move] = dup_move
                            elif self.data[new_r, new_c] == turn:
                                dup_move = ((new_r, new_c), (r, c))
                                next_moves.append(dup_move) 
                                has_duplicate_move = True
                        elif self.data[new_r, new_c] == turn:
                            cur_move = ((new_r, new_c), (r, c))
                            next_moves.append(cur_move) 
                        else:
                            continue

        return next_moves, corr_dict
        
    def move_to(self, turn, pos0, pos1):
        x0 = pos0[0]
        y0 = pos0[1]
        x1 = pos1[0]
        y1 = pos1[1]
        
        if not self.is_valid(turn, pos1):
            raise ValueError("This move: " + str((pos0, pos1)) + " of turn: " + str(turn) + " is invalid") 
        elif self.data[x0, y0] != turn:
            raise ValueError("The starting position is not your piece")
        else:
            self.data[x1, y1] = turn
            if abs(x0 - x1) > 1 or abs(y0 - y1) > 1:   # jump move
                self.data[x0, y0] = 0

            for dr in range(-1, 2):                  # infection mode!!!!
                for dc in range(-1, 2):
                    if x1+dr >= 0 and y1+dc >= 0 and x1+dr < 7 and y1+dc < 7:
                        if self.data[x1+dr, y1+dc] == -turn:  # convert any piece of the opponent to 'turn'
                            self.data[x1+dr, y1+dc] = turn
    
    def evaluate(self, turn, this_turn, max_score=1, min_score=0.001):
        turn_no=0
        op_no=0
        for r in range(7):
            for c in range(7):
                if self.data[r, c] == turn:
                    turn_no += 1
                elif self.data[r, c] == -turn:
                    op_no += 1
        if len(self.get_moves(this_turn)[0]) == 0:# if one of them can no longer move, count and end
            if turn_no > op_no:
                return max_score
            else:
                return -max_score
        else:
            value = turn_no - op_no
        return value * min_score
    
    @staticmethod    
    def get_manual_q(turn, board):
        '''consider linear growth of win prob with regard to n_diff
        when diff >= 10, the slope grow a bit
        when diff >= 35, consider win prob close to 1 or -1
        ''' 
        turn_no = 0
        op_no = 0
        max1=0.9
        max2=0.95
        # get no diff of turns
        for r in range(7):
            for c in range(7):
                if board[r, c] == turn:
                    turn_no += 1
                elif board[r, c] == -turn:
                    op_no += 1
        diff = turn_no - op_no
        if abs(diff) > 30:
            return diff / abs(diff)
        else:
            return diff / 30
        
        # ignore the rest for now
        sign = diff
        diff = abs(diff)
        if diff < 35:
            diff = (diff / 35) ** 2 * max1
        else:
            diff = max2

        if sign < 0:
            return -diff
        else:
            return diff

In [5]:
%%cython 
import numpy as np
cimport numpy as np

def is_valid(np.int8_t[:, :] board, turn, pos):
    cdef int dr, dc, r = pos[0], c = pos[1], new_r, new_c
    if board[r, c] != 0:
        return False
    else:
        for dr in range(-2, 3):
            for dc in range(-2, 3):
                new_r = r+dr
                new_c = c+dc
                if new_r >= 0 and new_c >= 0 and new_r < 7 and new_c < 7 and board[new_r, new_c] == turn:
                    return True
        return False 

def get_moves(np.int8_t[:, :] board, turn):
    cdef int r, c, dr, dc, new_r, new_c
    next_moves = []
    for r in range(7):
        for c in range(7):
            has_duplicate_move = False      # move within the radius of one of another friendly piece is called
            if is_valid(board, turn, (r, c)): # duplicate move
                for dr in range(-2, 3):
                    for dc in range(-2, 3):
                        new_r = r+dr
                        new_c = c+dc
                        if new_r >= 0 and new_c >= 0 and new_r < 7 and new_c < 7 and board[new_r, new_c] == turn:
                            if abs(dr) <= 1 and abs(dc) <=1:
                                if not has_duplicate_move and board[new_r, new_c] == turn:
                                    dup_move = ((new_r, new_c), (r, c))
                                    next_moves.append(dup_move) 
                                    has_duplicate_move = True
                            elif board[new_r, new_c] == turn:
                                cur_move = ((new_r, new_c), (r, c))
                                next_moves.append(cur_move) 
                            else:
                                continue
    return next_moves

def get_moves_with_mask(np.int8_t[:, :] board, turn, dict policy_dict):
    cdef int r, c, dr, dc, new_r, new_c
    cdef np.int8_t[:] action_mask = np.zeros(792, dtype=np.int8)
    next_moves = []
    for r in range(7):
        for c in range(7):
            has_duplicate_move = False      # move within the radius of one of another friendly piece is called
            if is_valid(board, turn, (r, c)): # duplicate move
                for dr in range(-2, 3):
                    for dc in range(-2, 3):
                        new_r = r+dr
                        new_c = c+dc
                        if new_r >= 0 and new_c >= 0 and new_r < 7 and new_c < 7 and board[new_r, new_c] == turn:
                            if abs(dr) <= 1 and abs(dc) <=1:
                                cur_move = ((new_r, new_c), (r, c))
                                if not has_duplicate_move:
                                    next_moves.append(cur_move) 
                                    has_duplicate_move = True
                            elif board[new_r, new_c] == turn:
                                cur_move = ((new_r, new_c), (r, c))
                                next_moves.append(cur_move) 
                            action_mask[policy_dict[cur_move]] = 1
    return next_moves, np.array(action_mask)

In [6]:
class PolicyValueNetwork():
    def __init__(self, model_name):
        self._sess = tf.Session(config=tf.ConfigProto(intra_op_parallelism_threads=24))
        K.set_session(self._sess)
        
        self._model = load_model(model_name)
        print("successfully loaded the model")
           
    def predict(self, feature_map, action_mask):        
        return self._sess.run(self._model.outputs, feed_dict={self._model.inputs[0]: feature_map.reshape(-1, 6, 9, 9), \
                                self._model.inputs[1]: action_mask.reshape(-1, 792), K.learning_phase(): 0})

In [7]:
'''These methods are for normal Min max'''
class MinMaxNormal():
    def __init__(self):
        return
    
    @staticmethod
    def evaluate(board, turn):
        turn_no = 0
        op_no = 0
        # get no diff of turns
        for r in range(7):
            for c in range(7):
                if board[r, c] == turn:
                    turn_no += 1
                elif board[r, c] == -turn:
                    op_no += 1
        return (turn_no - op_no)

    @staticmethod
    def move_to(board, turn, pos0, pos1):
        x0 = pos0[0]
        y0 = pos0[1]
        x1 = pos1[0]
        y1 = pos1[1]

        board = board.copy()
        board[x1, y1] = turn
        if abs(x0 - x1) > 1 or abs(y0 - y1) > 1:   # jump move
            board[x0, y0] = 0

        for dr in range(-1, 2):                  # infection mode!!!!
            for dc in range(-1, 2):
                if x1+dr >= 0 and y1+dc >= 0 and x1+dr < 7 and y1+dc < 7:
                    if board[x1+dr, y1+dc] == -turn:  # convert any piece of the opponent to 'turn'
                        board[x1+dr, y1+dc] = turn
        return board

    def min_max(self, board, turn, target_turn, depth, alpha=-100, beta=100, is_max=True, is_root=True):
        '''A recursive alpha beta pruning min_max function
        return: board evaluation, chosen move
        NB. for board evaluation, if the searching was pruned, it will return 100 for a minimizer and -100 for a maximizer'''
        if is_root:
            best_moves = []
        else:
            best_move = ((0, 0), (0, 0))
    
        next_moves = get_moves(board, turn)
        
        if depth == 0 or len(next_moves) == 0: # start to do pruning and selecting once the recursion reaches the end
            result = self.evaluate(board, target_turn)
            return result, None
        else:
            if is_max:
                alpha = -100
            else:
                beta = 100

            for move in next_moves:
                result, _ = self.min_max(self.move_to(board, turn, move[0], move[1]), \
                                    -turn, target_turn, depth-1, alpha, beta, not is_max, False)
                # prun the searching tree or update alpha and beta respectively
                if is_max:
                    if result >= beta:
                        return 100, None
                    elif result > alpha:
                        alpha = result
                        if is_root:
                            best_moves = [move]
                        else:
                            best_move = move
                    elif result == alpha and is_root:
                        best_moves.append(move)
                else:
                    if result <= alpha:
                        return -100, None
                    elif result < beta:
                        beta = result
                        if is_root:
                            best_moves = [move]
                        else:
                            best_move = move
                    elif result == beta and is_root:
                        best_moves.append(move)
            if is_max:
                if is_root:
                    return alpha, choice(best_moves)
                else:
                    return alpha, best_move
            else:
                if is_root:
                    return beta, choice(best_moves)
                else:
                    return beta, best_move

In [8]:
class MinMaxQ():
    def __init__(self, model_name="AtaxxZero.h5"):
        self._evaluator = PolicyValueNetwork(model_name)
        self._policy_dict = self.get_policy_dict()
        
    @staticmethod
    def get_policy_dict():
        '''Get the relation between policy no. and policy'''
        index=0
        policy_dict = {}
        for r in range(7):
            for c in range(7):
                for dr in range(-2, 3):
                    for dc in range(-2, 3):
                        new_r = r + dr
                        new_c = c + dc
                        if (dr != 0 or dc != 0) and (new_r < 7 and new_r >= 0) and (new_c < 7 and new_c >= 0):
                            policy_dict[((r, c), (new_r, new_c))] = index
                            index += 1
        return policy_dict

    @staticmethod
    def get_feature_map(board, turn, pre_move):
        out = np.zeros((6, 9, 9), dtype=np.int8)
        # define 1 edge

        # edge
        out[0, 0, :] = 1
        out[0, 8, :] = 1
        out[0, :, 0] = 1
        out[0, :, 8] = 1

        # my pieces
        out[1, 1:8, 1:8] = (board == turn)

        # op pieces
        out[2, 1:8, 1:8] = (board == -turn)
        
        # last move
        if not pre_move is None:               
            out[3, pre_move[0][0]+1, pre_move[0][1]+1] = 1
            out[4, pre_move[1][0]+1, pre_move[1][1]+1] = 1

        # whose first
        if turn == -1:
            out[5, ...] = 1
        return out
    
    def evaluate(self, feature_map, action_mask, turn, target_turn):
        result = self._evaluator.predict(feature_map, action_mask)
        p = result[0][0]
        if turn == target_turn:
            q = result[1][0][0]
        else:
            q = -result[1][0][0]
        return q
    
    @staticmethod
    def move_to(board, turn, pos0, pos1):
        x0 = pos0[0]
        y0 = pos0[1]
        x1 = pos1[0]
        y1 = pos1[1]

        board = board.copy()
        board[x1, y1] = turn
        if abs(x0 - x1) > 1 or abs(y0 - y1) > 1:   # jump move
            board[x0, y0] = 0

        for dr in range(-1, 2):                  # infection mode!!!!
            for dc in range(-1, 2):
                if x1+dr >= 0 and y1+dc >= 0 and x1+dr < 7 and y1+dc < 7:
                    if board[x1+dr, y1+dc] == -turn:  # convert any piece of the opponent to 'turn'
                        board[x1+dr, y1+dc] = turn
        return board
    
    def min_max(self, board, turn, target_turn, depth=3, alpha=-100, beta=100, is_max=True, is_root=True, \
                pre_move=None, t_lim=1):
        '''A recursive alpha beta pruning min_max function
        return: board evaluation, chosen move
        NB. for board evaluation, if the searching was pruned, it will return 100 for a minimizer and -100 for a maximizer'''
        if is_root:
            best_moves = []
            self._start = time.time()
        else:
            best_move = ((0, 0), (0, 0))
        
        # get next moves
        next_moves, action_mask = get_moves_with_mask(board, turn, self._policy_dict)
        # stop searching if the game is over
        if len(next_moves) == 0:
            diff = (board == target_turn).sum() - (board == -target_turn).sum()
            if diff > 0:
                return 1, None
            elif diff < 0:
                return -1, None
            else:
                turn_no = (board == turn).sum()
                if turn == -target_turn:
                    turn_no = 49 - turn_no # set turn_no to represent the number of target turn pieces
                if turn_no >= 45:
                    return 1, None
                else:
                    return -1, None
        elif depth == 0: # do cluster evaluation of all possible next boards
            feature_map = self.get_feature_map(board, turn, pre_move)
            q = self.evaluate(feature_map, action_mask, turn, target_turn)
            return q, None
        else:
            # generate move corresponding p list
            if is_max:
                alpha = -100
            else:
                beta = 100
                
            for move in next_moves:
                # do searching
                result, _ = self.min_max(self.move_to(board, turn, move[0], move[1]), \
                                    -turn, target_turn, depth-1, alpha, beta, not is_max, False, move, t_lim)
                # record all the move and its alpha value
                if is_root:
                    try:
                        move_value.append((move, result))
                    except:
                        move_value = [(move, result)]
                # prun the searching tree or update alpha and beta respectively
                if is_max:
                    if result >= beta:
                        return 100, None
                    elif result > alpha:
                        alpha = result
                        best_move = move
                else:
                    if result <= alpha:
                        return -100, None
                    elif result < beta:
                        beta = result
                        best_move = move

                if time.time() - self._start >= t_lim:
                    break
                    
            if is_root: # incorporate ramdom characteristic
                move_value = sorted(move_value, key=lambda x: x[1], reverse=True)
                max_value = alpha
                for move, value in move_value:
                    if value >= max_value - 0.1 * abs(max_value):
                        try:
                            best_move.append(move)
                        except:
                            best_move = [move]
                    else:
                        break
                # print(len(best_move))
                best_move = choice(best_move)

            if is_max:
                return alpha, best_move
            else:
                return beta, best_move

In [9]:
class MinMaxQCluster():
    def __init__(self, model_name="AtaxxZero.h5"):
        self._evaluator = PolicyValueNetwork(model_name)
        self._policy_dict = self.get_policy_dict()
        
    @staticmethod
    def get_policy_dict():
        '''Get the relation between policy no. and policy'''
        index=0
        policy_dict = {}
        for r in range(7):
            for c in range(7):
                for dr in range(-2, 3):
                    for dc in range(-2, 3):
                        new_r = r + dr
                        new_c = c + dc
                        if (dr != 0 or dc != 0) and (new_r < 7 and new_r >= 0) and (new_c < 7 and new_c >= 0):
                            policy_dict[((r, c), (new_r, new_c))] = index
                            index += 1
        return policy_dict

    @staticmethod
    def get_feature_map(board, turn, pre_move):
        out = np.zeros((6, 9, 9), dtype=np.int8)
        # define 1 edge

        # edge
        out[0, 0, :] = 1
        out[0, 8, :] = 1
        out[0, :, 0] = 1
        out[0, :, 8] = 1

        # my pieces
        out[1, 1:8, 1:8] = (board == turn)

        # op pieces
        out[2, 1:8, 1:8] = (board == -turn)
        
        # last move
        if not pre_move is None:               
            out[3, pre_move[0][0]+1, pre_move[0][1]+1] = 1
            out[4, pre_move[1][0]+1, pre_move[1][1]+1] = 1

        # whose first
        if turn == -1:
            out[5, ...] = 1
        return out
    
    @staticmethod 
    def judge(board, target_turn):
        '''evaluate the board with rules'''
        diff = (board == target_turn).sum() - (board == -target_turn).sum()
        if diff > 0:
            return 1
        elif diff < 0:
            return -1
        else:
            turn_no = (board == turn).sum()
            if turn == -target_turn:
                turn_no = 49 - turn_no # set turn_no to represent the number of target turn pieces
            if turn_no >= 45:
                return 1
            else:
                return -1
    
    @staticmethod
    def move_to(board, turn, pos0, pos1):
        x0 = pos0[0]
        y0 = pos0[1]
        x1 = pos1[0]
        y1 = pos1[1]

        board = board.copy()
        board[x1, y1] = turn
        if abs(x0 - x1) > 1 or abs(y0 - y1) > 1:   # jump move
            board[x0, y0] = 0

        for dr in range(-1, 2):                  # infection mode!!!!
            for dc in range(-1, 2):
                if x1+dr >= 0 and y1+dc >= 0 and x1+dr < 7 and y1+dc < 7:
                    if board[x1+dr, y1+dc] == -turn:  # convert any piece of the opponent to 'turn'
                        board[x1+dr, y1+dc] = turn
        return board
    
    def min_max(self, board, turn, target_turn, depth=3, alpha=-100, beta=100, is_max=True, is_root=True, \
                pre_move=None, t_lim=1):
        '''A recursive alpha beta pruning min_max function
        return: board evaluation, chosen move
        NB. for board evaluation, if the searching was pruned, it will return 100 for a minimizer and -100 for a maximizer'''
        if is_root:
            best_moves = []
            self._start = time.time()
        else:
            best_move = ((0, 0), (0, 0))
        
        # get next moves
        next_moves, action_mask = get_moves_with_mask(board, turn, self._policy_dict)
        # stop searching if the game is over
        if len(next_moves) == 0:
            return self.judge(board, target_turn), None
        elif depth == 1 and not is_root: # do cluster evaluation of all possible next boards
            new_feature_map = []
            new_action_mask = []
            # prepare all the input for a 
            for move in next_moves:
                temp_board = self.move_to(board, turn, move[0], move[1])
                temp_moves, temp_action_mask = get_moves_with_mask(temp_board, -turn, self._policy_dict)
                if len(temp_moves) == 0:
                    result = self.judge(temp_board, target_turn) == 1
                    if result and is_max:
                        return 1, None
                    elif not result and not is_max:
                        return -1, None
                    else:
                        continue
                new_feature_map.append(self.get_feature_map(temp_board, -turn, move))
                new_action_mask.append(temp_action_mask)
                
            feature_map = np.concatenate(new_feature_map, axis=0)
            action_mask = np.concatenate(new_action_mask, axis=0)
            q = self._evaluator.predict(feature_map, action_mask)[1][:, 0]
            return q.min(), None 
        elif depth == 1 and is_root:
            new_feature_map = []
            new_action_mask = []
            move_index_value = np.zeros(len(next_moves))
            # prepare all the input for a 
            counter = 0
            for move in next_moves:
                temp_board = self.move_to(board, turn, move[0], move[1])
                temp_moves, temp_action_mask = get_moves_with_mask(temp_board, -turn, self._policy_dict)
                if len(temp_moves) == 0:
                    result = self.judge(temp_board, -turn)
                    move_index_value[counter] = result
                    if result == -1:
                        return -result, move
                new_feature_map.append(self.get_feature_map(temp_board, -turn, move))
                new_action_mask.append(temp_action_mask)
                counter += 1
                
            feature_map = np.concatenate(new_feature_map, axis=0)
            action_mask = np.concatenate(new_action_mask, axis=0)
            q = self._evaluator.predict(feature_map, action_mask)[1][:, 0]
            for i in range(len(next_moves)):
                if move_index_value[i] == 0:
                    move_index_value[i] = q[i]
            arg_min = move_index_value.argmin()
            return -move_index_value[arg_min], next_moves[arg_min]
        else:
            # generate move corresponding p list
            if is_max:
                alpha = -100
            else:
                beta = 100
                
            for move in next_moves:
                # do searching
                result, _ = self.min_max(self.move_to(board, turn, move[0], move[1]), \
                                    -turn, target_turn, depth-1, alpha, beta, not is_max, False, move, t_lim)
                # record all the move and its alpha value
                if is_root:
                    try:
                        move_value.append((move, result))
                    except:
                        move_value = [(move, result)]
                # prun the searching tree or update alpha and beta respectively
                if is_max:
                    if result >= beta:
                        return 100, None
                    elif result > alpha:
                        alpha = result
                        best_move = move
                else:
                    if result <= alpha:
                        return -100, None
                    elif result < beta:
                        beta = result
                        best_move = move

                if time.time() - self._start >= t_lim:
                    break
                    
            if is_root: # incorporate ramdom characteristic
                move_value = sorted(move_value, key=lambda x: x[1], reverse=True)
                max_value = alpha
                for move, value in move_value:
                    if value >= max_value - 0.1 * abs(max_value):
                        try:
                            best_move.append(move)
                        except:
                            best_move = [move]
                    else:
                        break
                # print(len(best_move))
                best_move = choice(best_move)

            if is_max:
                return alpha, best_move
            else:
                return beta, best_move

In [10]:
class MinMaxZero():
    def __init__(self, p_thresh, c_thresh, model_name="AtaxxZero.h5"):
        self._evaluator = PolicyValueNetwork(model_name)
        self._policy_dict = self.get_policy_dict()
        self._p_thresh = p_thresh
        self._c_thresh = c_thresh
        
    @staticmethod
    def get_policy_dict():
        '''Get the relation between policy no. and policy'''
        index=0
        policy_dict = {}
        for r in range(7):
            for c in range(7):
                for dr in range(-2, 3):
                    for dc in range(-2, 3):
                        new_r = r + dr
                        new_c = c + dc
                        if (dr != 0 or dc != 0) and (new_r < 7 and new_r >= 0) and (new_c < 7 and new_c >= 0):
                            policy_dict[((r, c), (new_r, new_c))] = index
                            index += 1
        return policy_dict

    @staticmethod
    def get_feature_map(board, turn, pre_move):
        out = np.zeros((6, 9, 9), dtype=np.int8)
        # define 1 edge

        # edge
        out[0, 0, :] = 1
        out[0, 8, :] = 1
        out[0, :, 0] = 1
        out[0, :, 8] = 1

        # my pieces
        out[1, 1:8, 1:8] = (board == turn)

        # op pieces
        out[2, 1:8, 1:8] = (board == -turn)
        
        # last move
        if not pre_move is None:               
            out[3, pre_move[0][0]+1, pre_move[0][1]+1] = 1
            out[4, pre_move[1][0]+1, pre_move[1][1]+1] = 1

        # whose first
        if turn == -1:
            out[5, ...] = 1
        return out
    
    def evaluate(self, feature_map, action_mask, turn, target_turn):
        result = self._evaluator.predict(feature_map, action_mask)
        p = result[0][0]
        if turn == target_turn:
            q = result[1][0][0]
        else:
            q = -result[1][0][0]
        return p, q
    
    @staticmethod
    def normal_evaluate(board, target_turn):
        return (board == target_turn).sum() - (board == -target_turn).sum()
    
    @staticmethod 
    def judge(board, end_turn, target_turn):
        '''evaluate the board with rules'''
        diff = (2 * (board == end_turn).sum() - 49) 
        if target_turn != end_turn:
            diff = -diff
        if diff > 0:
            return 1
        elif diff <= 0:
            return -1
    
    @staticmethod
    def move_to(board, turn, pos0, pos1):
        x0 = pos0[0]
        y0 = pos0[1]
        x1 = pos1[0]
        y1 = pos1[1]

        board = board.copy()
        board[x1, y1] = turn
        if abs(x0 - x1) > 1 or abs(y0 - y1) > 1:   # jump move
            board[x0, y0] = 0

        for dr in range(-1, 2):                  # infection mode!!!!
            for dc in range(-1, 2):
                if x1+dr >= 0 and y1+dc >= 0 and x1+dr < 7 and y1+dc < 7:
                    if board[x1+dr, y1+dc] == -turn:  # convert any piece of the opponent to 'turn'
                        board[x1+dr, y1+dc] = turn
        return board
    
    def min_max(self, board, turn, target_turn, is_normal=False, depth=3, alpha=-100, beta=100, is_max=True, is_root=True, \
                pre_move=None, t_lim=1):
        '''A recursive alpha beta pruning min_max function
        return: board evaluation, chosen move
        NB. for board evaluation, if the searching was pruned, it will return 100 for a minimizer and -100 for a maximizer'''
        if is_root:
            best_moves = []
            self._start = time.time()
        else:
            best_move = ((0, 0), (0, 0))
        
        # get next moves
        next_moves, action_mask = get_moves_with_mask(board, turn, self._policy_dict)
        # stop searching if the game is over
        if len(next_moves) == 0:
            return self.judge(board, turn, target_turn), None
        elif not is_normal: # otherwise calculate p and q and do the NN pruned minmax searching
            feature_map = self.get_feature_map(board, turn, pre_move)
            p, q = self.evaluate(feature_map, action_mask, turn, target_turn)
            
        if depth == 0:
            if not is_normal: # once the recursion reaches the end, return the leaf node value
                return q, None
            else:
                return self.normal_evaluate(board, target_turn), None
        else:
            # generate move corresponding p list
            move_prob = []
            all_prob = 0.0
            if not is_normal:
                for move in next_moves:
                    prob = p[self._policy_dict[move]]
                    move_prob.append((move, prob))
                    all_prob += prob
                move_prob = sorted(move_prob, key=lambda x: x[1], reverse=True)
            else:
                move_prob = [(move, 1) for move in next_moves]

            if is_max:
                alpha = -100
            else:
                beta = 100

            if not is_normal:
                sum_prob = 0.0
                counter = 0
                counter_thresh = len(move_prob) / self._c_thresh
                prob_thresh = all_prob * self._p_thresh
            # display_move_prob(move_prob)
            for move, prob in move_prob:
                if not is_normal:
                    sum_prob += prob
                    counter += 1
                # do searching
                result, _ = self.min_max(self.move_to(board, turn, move[0], move[1]), \
                                    -turn, target_turn, is_normal, depth-1, alpha, beta, not is_max, False, move, t_lim)
                # record all the move and its alpha value
                if is_root:
                    try:
                        move_value.append((move, result))
                    except:
                        move_value = [(move, result)]
                # prun the searching tree or update alpha and beta respectively
                if is_max:
                    if result >= beta:
                        return 100, None
                    elif result > alpha:
                        alpha = result
                        best_move = move
                else:
                    if result <= alpha:
                        return -100, None
                    elif result < beta:
                        beta = result
                        best_move = move

                if time.time() - self._start >= t_lim:
                    break
                elif not is_normal and sum_prob >= prob_thresh and counter >= counter_thresh:
                    break
                    
            if is_root: # incorporate ramdom characteristic
                move_value = sorted(move_value, key=lambda x: x[1], reverse=True)
                max_value = alpha
                for move, value in move_value:
                    if value >= max_value - 0.1 * abs(max_value):
                        try:
                            best_move.append(move)
                        except:
                            best_move = [move]
                    else:
                        break
                # print(len(best_move))
                best_move = choice(best_move)

            if is_max:
                return alpha, best_move
            else:
                return beta, best_move

In [11]:
def actor(mm_obj, board, turn, depth, pre_move):
    try:
        _, best_move = mm_obj.min_max(board, turn, turn, depth=depth, pre_move=pre_move, t_lim=6)
    except:
        _, best_move = mm_obj.min_max(board, turn, turn, depth=depth)
    return best_move

In [12]:
def actor_compare(minmax_common, minmax_normal, minmax_zero, C_D, N_D, Z_D, common_steps, rounds, verbose=False):
    zero_win = 0
    time_zero = []
    time_normal = []
    for r in range(rounds):
        print("Round:", r)
        steps = 0
        a = Ataxx()
        turn = -1
        zero_turn = choice([-1, 1])
        if verbose:
            print("zero chose color", zero_turn)
        best_move = None
        while True:
            steps += 1
            if verbose:
                a.plot()
            if steps > common_steps:
                if turn != zero_turn:
                    s = time.time()
                    best_move = actor(minmax_normal, a.data, turn, N_D, best_move)
                    span = time.time() - s
                    time_normal.append(span)
                else:
                    s = time.time()
                    best_move = actor(minmax_zero, a.data, turn, Z_D, best_move)
                    span = time.time() - s
                    time_zero.append(span)
            else:
                best_move = actor(minmax_common, a.data, turn, C_D, best_move)
            a.move_to(turn, best_move[0], best_move[1])
            turn = -turn
            result = a.evaluate(zero_turn, turn)
            if result == 1:
                print("minmax zero win!!")
                zero_win += 1
                break
            elif result == -1:
                print("minmax normal win!!")
                break
            elif steps > 300:
                print("too many steps, give up round")
                zero_win = 0
                break
    print("In the previous ", rounds, " rounds, zero win ratio is: ", float(zero_win) / float(rounds))
    time_zero = np.array(time_zero)
    time_normal = np.array(time_normal)
    print("On average, for minmax normal with depth", N_D, \
          ", each move takes time: ", time_normal.mean(),\
          "max time elapsed:", time_normal.max())
    print("On average, for minmax zero with depth", Z_D, \
          ", each move takes time: ", time_zero.mean(),\
          "max time elapsed:", time_zero.max())
    return float(zero_win) / float(rounds)

In [23]:
#minmax_normal = MinMaxQ("AtaxxZero_91.2p_82.4q.h5")
#minmax_normal = MinMaxNormal()
minmax_normal = MinMaxZero(0.9, 100, "AtaxxZero_91.2p_82.4q.h5")
minmax_zero = MinMaxZero(0.9, 100, "AtaxxZero_91.2p_82.4q.h5")

verbose = False
rounds = 100
N_D = 2
Z_D = 2
C_S = 30

for common_steps in range(5, 12, 1):
    zero_win = 0
    time_zero = []
    time_normal = []
    for r in range(rounds):
        print("Round:", r)
        steps = 0
        a = Ataxx()
        turn = -1
        zero_turn = choice([-1, 1])
        if verbose:
            print("zero chose color", zero_turn)
        best_move = None
        while True:
            steps += 1
            if verbose:
                a.plot()
            if turn != zero_turn:
                s = time.time()
                _, best_move = minmax_normal.min_max(a.data, turn, turn, steps<=4, N_D+(steps<=4), best_move)
                span = time.time() - s
                time_normal.append(span)
            else:
                s = time.time()
                _, best_move = minmax_zero.min_max(a.data, turn, turn, steps<=common_steps, Z_D+(steps<=common_steps), best_move)
                span = time.time() - s
                time_zero.append(span)
            a.move_to(turn, best_move[0], best_move[1])
            turn = -turn
            result = a.evaluate(zero_turn, turn)
            if result == 1:
                print("minmax zero win!!")
                zero_win += 1
                break
            elif result == -1:
                print("minmax normal win!!")
                break
            elif steps > 300:
                print("too many steps, give up round")
                zero_win = 0
                break
    print("common_steps taken are: ", common_steps)
    print("In the previous ", rounds, " rounds, zero win ratio is: ", float(zero_win) / float(rounds))
    time_zero = np.array(time_zero)
    time_normal = np.array(time_normal)
    print("On average, for minmax normal with depth", N_D, \
          ", each move takes time: ", time_normal.mean(),\
          "max time elapsed:", time_normal.max())
    print("On average, for minmax zero with depth", Z_D, \
          ", each move takes time: ", time_zero.mean(),\
          "max time elapsed:", time_zero.max())

successfully loaded the model
successfully loaded the model
Round: 0
minmax zero win!!
Round: 1
minmax normal win!!
Round: 2
minmax zero win!!
Round: 3
minmax normal win!!
Round: 4
minmax zero win!!
Round: 5
minmax zero win!!
Round: 6
minmax zero win!!
Round: 7
minmax zero win!!
Round: 8
minmax zero win!!
Round: 9
minmax zero win!!
Round: 10
minmax normal win!!
Round: 11
minmax zero win!!
Round: 12
minmax zero win!!
Round: 13
minmax zero win!!
Round: 14
minmax zero win!!
Round: 15
minmax normal win!!
Round: 16
minmax zero win!!
Round: 17
minmax normal win!!
Round: 18
minmax zero win!!
Round: 19
minmax zero win!!
Round: 20
minmax zero win!!
Round: 21
minmax zero win!!
Round: 22
minmax zero win!!
Round: 23
minmax normal win!!
Round: 24
minmax zero win!!
Round: 25
minmax zero win!!
Round: 26
minmax zero win!!
Round: 27
minmax zero win!!
Round: 28
minmax normal win!!
Round: 29
minmax zero win!!
Round: 30
minmax zero win!!
Round: 31
minmax zero win!!
Round: 32
minmax normal win!!
Round: 33


Round: 62
minmax normal win!!
Round: 63
minmax zero win!!
Round: 64
minmax normal win!!
Round: 65
minmax zero win!!
Round: 66
minmax zero win!!
Round: 67
minmax normal win!!
Round: 68
minmax normal win!!
Round: 69
minmax normal win!!
Round: 70
minmax zero win!!
Round: 71
minmax zero win!!
Round: 72
minmax normal win!!
Round: 73
minmax zero win!!
Round: 74
minmax zero win!!
Round: 75
minmax zero win!!
Round: 76
minmax normal win!!
Round: 77
minmax zero win!!
Round: 78
minmax normal win!!
Round: 79
minmax normal win!!
Round: 80
minmax zero win!!
Round: 81
minmax zero win!!
Round: 82
minmax normal win!!
Round: 83
minmax normal win!!
Round: 84
minmax normal win!!
Round: 85
minmax zero win!!
Round: 86
minmax zero win!!
Round: 87
minmax normal win!!
Round: 88
minmax normal win!!
Round: 89
minmax normal win!!
Round: 90
minmax zero win!!
Round: 91
minmax zero win!!
Round: 92
minmax normal win!!
Round: 93
minmax zero win!!
Round: 94
minmax normal win!!
Round: 95
minmax normal win!!
Round: 96
mi

minmax zero win!!
Round: 14
minmax zero win!!
Round: 15
minmax normal win!!
Round: 16
minmax zero win!!
Round: 17
minmax zero win!!
Round: 18
minmax zero win!!
Round: 19
minmax normal win!!
Round: 20
minmax zero win!!
Round: 21
minmax zero win!!
Round: 22
minmax zero win!!
Round: 23
minmax zero win!!
Round: 24
minmax normal win!!
Round: 25
minmax normal win!!
Round: 26
minmax zero win!!
Round: 27
minmax normal win!!
Round: 28
minmax normal win!!
Round: 29
minmax zero win!!
Round: 30
minmax zero win!!
Round: 31
minmax zero win!!
Round: 32
minmax normal win!!
Round: 33
minmax zero win!!
Round: 34
minmax zero win!!
Round: 35
minmax zero win!!
Round: 36
minmax zero win!!
Round: 37
minmax zero win!!
Round: 38
minmax normal win!!
Round: 39
minmax zero win!!
Round: 40
minmax zero win!!
Round: 41
minmax zero win!!
Round: 42
minmax zero win!!
Round: 43
minmax zero win!!
Round: 44
minmax zero win!!
Round: 45
minmax zero win!!
Round: 46
minmax zero win!!
Round: 47
minmax zero win!!
Round: 48
minm

In [28]:
a = [(1, 1), (2, 2)]
sum()

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_11 (InputLayer)           (None, 6, 9, 9)      0                                            
__________________________________________________________________________________________________
conv2d_26 (Conv2D)              (None, 64, 7, 7)     3520        input_11[0][0]                   
__________________________________________________________________________________________________
batch_normalization_26 (BatchNo (None, 64, 7, 7)     256         conv2d_26[0][0]                  
__________________________________________________________________________________________________
activation_31 (Activation)      (None, 64, 7, 7)     0           batch_normalization_26[0][0]     
__________________________________________________________________________________________________
conv2d_27 

## In the following cell, I compared two settings for ataxxzero, and find that it is great if we let AtaxxZero after doing normal minmax for 30 or so steps
1. in the setting where no common mimmax searching init steps were taken, the win ratio of minmaxZeroQ with dep 2 against minmax_normal with dep 3 is 77%
2. in the other setting where normal searching common init steps are taken the win ratio of minmaxZeroQ increased to 92%

__The conclusion that relying on AtaxxZero in the start of the game is not useful, as the Q returned is very small, and therefore inaccuracy were amplified in terms of the order of moves. (a similar error may lead to the result that a worse move have a larger Q returned)__

In [None]:
#minmax_common = MinMaxQ("AtaxxZero_91.2p_82.4q.h5")
#minmax_common = MinMaxNormal()
minmax_common = None
#minmax_normal = MinMaxZero(1, 100, "AtaxxZero_91.2p_82.4q.h5")
minmax_normal = MinMaxQ("AtaxxZero_91.2p_82.4q.h5")
#minmax_normal = MinMaxNormal()
minmax_zero = MinMaxZero(0.9, 100, "AtaxxZero_91.2p_82.4q.h5")
#minmax_zero = MinMaxQ("AtaxxZero_91.2p_82.4q.h5")
#minmax_zero = MinMaxQCluster("AtaxxZero_91.2p_82.4q.h5")
#actor_compare(minmax_common, minmax_normal, minmax_zero, 3, 3, 2, 30, 100, verbose=False)
actor_compare(minmax_common, minmax_normal, minmax_zero, 3, 3, 2, 0, 100, verbose=False)

In [None]:
#minmax_common = MinMaxQ("AtaxxZero_91.2p_82.4q.h5")
minmax_common = MinMaxNormal()
#minmax_normal = MinMaxZero(1, 100, "AtaxxZero_91.2p_82.4q.h5")
minmax_normal = MinMaxQ("AtaxxZero_91.2p_82.4q.h5")
#minmax_normal = MinMaxNormal()
minmax_zero = MinMaxZero(0.9, 100, "AtaxxZero_91.2p_82.4q.h5")
#minmax_zero = MinMaxQ("AtaxxZero_91.2p_82.4q.h5")
#minmax_zero = MinMaxQCluster("AtaxxZero_91.2p_82.4q.h5")
actor_compare(minmax_common, minmax_normal, minmax_zero, 3, 2, 2, 30, 100, verbose=False)
actor_compare(minmax_common, minmax_normal, minmax_zero, 3, 2, 2, 0, 100, verbose=False)

In [27]:
a = PolicyValueNetwork("AtaxxZero_91.2p_82.4q.h5")

successfully loaded the model


In [28]:
def loop_predict(a, N):
    f = np.zeros((1, 6, 9, 9))
    m = np.ones((1, 792))
    for i in range(N):
        a.predict(f, m)

def cluster_predict(a, N):
    f = np.zeros((N, 6, 9, 9))
    m = np.ones((N, 792))
    a.predict(f, m)

In [31]:
%time loop_predict(a, 10)
%time cluster_predict(a, 10)

CPU times: user 34.8 ms, sys: 11.2 ms, total: 46 ms
Wall time: 16.6 ms
CPU times: user 24.3 ms, sys: 2.33 ms, total: 26.6 ms
Wall time: 5.16 ms


In [41]:
#minmax_normal = MinMaxZero(1, 100, "AtaxxZero_91.2p_82.4q.h5")
minmax_normal = MinMaxQ("AtaxxZero_91.2p_82.4q.h5")
#minmax_normal = MinMaxNormal()
result = []
xs = []
for x in np.linspace(0.3, 1, 15):
    print("p_thresh is:", x)
    minmax_zero = MinMaxZero(x, 100, "AtaxxZero_91.2p_82.4q.h5")
    result.append(actor_compare(minmax_normal, minmax_zero, 2, 2, 100, verbose=False))
    xs.append(x)
    
plt.plot(xs, result)
plt.show()

successfully loaded the model
p_thresh is: 0.3
successfully loaded the model
Round: 0


KeyboardInterrupt: 

# Attempting parallelization

In [65]:
from multiprocessing import Pool

def actor_compare_bunch(normal_para, zero_para, N_D, Z_D, rounds):
    minmax_normal = MinMaxZero(*normal_para)
    minmax_zero = MinMaxZero(*zero_para)
    zero_win = 0
    for r in range(rounds):
        print("Round:", r)
        steps = 0
        a = Ataxx()
        turn = -1
        zero_turn = choice([-1, 1])
        if verbose:
            print("zero chose color", zero_turn)
        best_move = None
        while True:
            steps += 1
            if verbose:
                a.plot()
            if turn != zero_turn:
                s = time.time()
                best_move = actor(minmax_normal, a.data, turn, N_D, best_move)
                span = time.time() - s
                time_normal.append(span)
            else:
                s = time.time()
                best_move = actor(minmax_zero, a.data, turn, Z_D, best_move)
                span = time.time() - s
                time_zero.append(span)
            a.move_to(turn, best_move[0], best_move[1])
            turn = -turn
            result = a.evaluate(zero_turn, turn)
            if result == 1:
                print("minmax zero win!!")
                zero_win += 1
                break
            elif result == -1:
                print("minmax normal win!!")
                break
    return float(zero_win) / float(rounds)

def actor_compare_parallel(minmax_normal, minmax_zero, N_D, Z_D, rounds):
    zero_win = 0
    bunch = (rounds / 24)
    rounds = 24 * bunch
    para = [(minmax_normal, minmax_zero, N_D, Z_D, bunch)] * 24
    with Pool(processes=24) as pool:
        result = pool.starmap(actor_compare_bunch, para)
    print("In the previous ", rounds, " rounds, zero win ratio is: ", sum(result) / float(rounds))

In [66]:
normal_para = (1, 100, "AtaxxZero_91.2p_82.4q.h5")
zero_para = (0.9, 100, "AtaxxZero_91.2p_82.4q.h5")

In [67]:
minmax_normal = MinMaxZero(1, 100, "AtaxxZero_91.2p_82.4q.h5")
# minmax_normal = MinMaxNormal()
minmax_zero = MinMaxZero(0.9, 100, "AtaxxZero_91.2p_82.4q.h5")
actor_compare_parallel(normal_para, zero_para, 2, 2, 100)
actor_compare(minmax_normal, minmax_zero, 2, 2, 100, verbose=False)

successfully loaded the model
successfully loaded the model


KeyboardInterrupt: 

In [25]:
a = Ataxx()
%lprun -f minmax_zero.min_max _, best_move = minmax_zero.min_max(a.data, -1, -1, depth=6, pre_move=None, t_lim=100)