In [None]:
! pip install keras-rl2
! pip install chess
! pip install python-chess

In [None]:
import numpy as np

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten,\
     Input,BatchNormalization, Dropout, Conv2D, Add, Reshape
from tensorflow.keras.optimizers import Adam

from rl.agents.dqn import DQNAgent
from rl.policy import EpsGreedyQPolicy
from rl.memory import SequentialMemory
from rl.policy import Policy

import chess
import chess.engine
from sys import platform
import os

# import sys
# sys.path.insert(0, '../../bot_model/')
# from  model888 import get_model888

import tensorflow as tf
tf.compat.v1.experimental.output_all_intermediates(True)

from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

In [None]:
from drive.MyDrive.Data.Chess.model888 import get_model888

In [None]:
# %%
os.system('chmod +x drive/MyDrive/Data/Chess/stockfish_14.1_linux_x64')
engine = chess.engine.SimpleEngine.popen_uci(r"drive/MyDrive/Data/Chess/stockfish_14.1_linux_x64")


In [None]:
# # %%
# if platform == "linux" or platform == "linux2":
#     os.system('chmod +x ../stockfish/stockfish_14.1_linux_x64')
#     engine = chess.engine.SimpleEngine.popen_uci(r"../stockfish/stockfish_14.1_linux_x64")
# elif platform == "win32":
#     engine = chess.engine.SimpleEngine.popen_uci(r"../stockfish/stockfish_14.1_win_32bit.exe")

In [None]:
def find_move(env):
    result = engine.play(env.env, chess.engine.Limit(time=0.05))
    return result.move

In [None]:
STATE_SHAPE = (8, 8, 8)
NB_ACTIONS = 4096

In [None]:
def convert(x,square,map):
    for i in range(8):
        for j in range(8):
            if square[i][j]==map:
                x[i][j]=1
            elif square[i][j] == -map:
                x[i][j]=-1   
                 

In [None]:
class ChessEnv:
    '''
    state - obser: ndarray - (65,): [:65] is flatten from int_board; [65] is color of bot; 1 is white and -1 is black
    step: int. step_range = (0, 4096) , is encoded from square A to square B (64 x 64 val)
    reward: int
    '''

    mapped = {
            'P': 10,     # White Pawn
            'p': -10,    # Black Pawn
            'N': 20,     # White Knight
            'n': -20,    # Black Knight
            'B': 30,     # White Bishop
            'b': -30,    # Black Bishop
            'R': 40,     # White Rook
            'r': -40,    # Black Rook
            'Q': 50,     # White Queen
            'q': -50,    # Black Queen
            'K': 900,     # White King
            'k': -900     # Black King
    }
    point=[10,20,30,40,50,900]
    state = None
    model = None
    neg_r_each_step = -1
    
    def __init__(self, model: Sequential, ReplayMem: SequentialMemory, neg_r_each_step = -1, stockfishMem = True) -> None:
        self.env = chess.Board()
        self.ReplayMem = ReplayMem
        self.model = model
        self.lastest_move=[0,0]
        self.state = self.reset()
        self.neg_r_each_step = neg_r_each_step
        self.stockfishMem = stockfishMem

    def is_draw(self):
        if self.env.is_stalemate():
            print("statlemate")
            return True
        if self.env.is_fivefold_repetition():
            print("fivefold repetition")
            return True
        if self.env.is_fifty_moves():
            print("50 moves")
            return True
        if self.env.is_insufficient_material():
            print("Insufficient Material")
            return True
        return False

    def is_checkmate(self):
        # If There is checkmate then it will be TRUE else FALSE.It will be a boolean value.
        return self.env.is_checkmate()

    def convert_board_to_int(self):
        epd_string = self.env.epd()
        list_int = np.empty((0, ))
        for i in epd_string:
            if i == " ":
                list_int = list_int.reshape((8, 8))
                return list_int
            elif i != "/":
                if i in self.mapped:
                    list_int = np.append(list_int, self.mapped[i])
                else:
                    for counter in range(0, int(i)):
                        list_int = np.append(list_int, 0)
        list_int = list_int.reshape((8, 8))
        return list_int

    def get_state(self) -> np.ndarray:
        square=self.convert_board_to_int()
        x=np.zeros([8,8,8])
        for i in range(6):
            convert(x[i],square,self.point[i])
        moves=self.legal_moves()
        for move in moves:
            a= move.from_square
            b= move.to_square
            x[6][7-int(a /8)][a%8]=-1
            x[6][7-int(b /8)][b%8]=1
        a=self.lastest_move[0]
        b=self.lastest_move[1]
        if a!=b:
            x[7][7-a //8][a%8]=-1
            x[7][7-b//8][b%8] =1   
        else: 
            x[7]=[[0]*8 for i in range(8)]
        return x   
        

    def legal_moves(self):
        return list(self.env.legal_moves)

    def legal_moves_encoded(self):
        lg_encoded = []
        for move in (self.env.legal_moves):
            from_square = move.from_square
            to_square = move.to_square
            
            lg_encoded.append(from_square * 64 + to_square)
        return np.array(lg_encoded)

    def encodeMove(self, move_uci:str):
        a, b = chess.parse_square(move_uci[:2]), chess.parse_square(move_uci[2:])
        return a * 64 + b

    def decodeMove(self, move_int:int):
        a, b = move_int//64, move_int%64
        move = self.env.find_move(from_square= a,to_square= b)
        return move

    def render(self):
        print(self.env.unicode())

    def reset(self):
        # random state
        redo = True
        num_sample_steps = 0
        while redo:
            redo = False
            self.env = chess.Board()
            num_sample_steps = np.random.randint(0, 4)
            for i in range (num_sample_steps):
                lg_move = self.legal_moves()
                if len(lg_move) != 0:
                    move = np.random.choice(self.legal_moves())
                    self.env.push(move)
                else:
                    redo = True
                    break
            if len(self.legal_moves()) == 0:
                redo = True

        if len(self.env.move_stack) !=0:
            self.lastest_move[0]= self.env.move_stack[-1].from_square
            self.lastest_move[1]= self.env.move_stack[-1].to_square
        else:
            self.lastest_move[0]=0
            self.lastest_move[1]=0
        if self.env.turn == False:
            self.env=self.env.mirror() 
            for i in range(2):
                a=7-self.lastest_move[i]//8
                b=self.lastest_move[i]%8
                self.lastest_move[i]=8*a+b

        self.state =  self.get_state()

        Q_val = np.sort(self.model.predict(self.state.reshape((1, 1) + STATE_SHAPE)).reshape(-1, ))
        print('Val', num_sample_steps, ':', Q_val[0],  Q_val[-1], Q_val[4050], Q_val[4000], Q_val[3000],Q_val[2000])
        return self.state

    def ifStockfishTurn(self) -> None:
        done = False
        reward = self.neg_r_each_step

        stf_move = find_move(self)

        # location to_square
        to_r, to_c = 7 - stf_move.to_square//8, stf_move.to_square%8
        try:
            reward += self.point[np.where(self.state[:6, to_r, to_c ] != 0)[0][0]]
        except:
            reward += 0

        # action
        self.env.push(stf_move)

        #convert turn
        self.lastest_move[0]= self.env.move_stack[-1].from_square
        self.lastest_move[1]= self.env.move_stack[-1].to_square 
        self.env=self.env.mirror() 
        
        for i in range(2):
            a=7-self.lastest_move[i]//8
            b=self.lastest_move[i]%8
            self.lastest_move[i]=8*a+b

        pseudo_state = self.get_state()

        # check end game
        if self.is_checkmate():
            reward += 900
            done = True
        elif self.is_draw():
            reward += 300
            done = True
        # opponent's turn   
        else:
            done = False

            move = find_move(self)

            # location to_square
            to_r, to_c = 7 - move.to_square//8, move.to_square%8
            try:
                reward -= self.point[np.where(pseudo_state[:6, to_r, to_c ] != 0)[0][0]]
            except:
                reward -= 0

            # action
            self.env.push(move)
            self.env=self.env.mirror() 

            # check end game
            if self.is_checkmate():
                reward -= 900
                done = True
            elif self.is_draw():
                reward += 300
                done = True

        self.ReplayMem.append(self.state, stf_move.from_square * 64 + stf_move.to_square, reward, done)

    def step(self, action: int):
        reward = 0
        done = True
        
        try:
            # move in legal move
            move = self.decodeMove(action)
        except:
            # wrong move
            reward = -5000
            done = True
            print('wrong_move')

            # add in memory action if it was stockfish
            if self.stockfishMem:
                self.ifStockfishTurn()

            return self.state, reward, done, {}

        # neg reward each step
        reward = self.neg_r_each_step

        # location to_square
        to_r, to_c = 7 - move.to_square//8, move.to_square%8
        try:
            reward += self.point[np.where(self.state[:6, to_r, to_c ] != 0)[0][0]]
        except:
            reward += 0

        # action
        self.env.push(move)

        #convert turn
        self.lastest_move[0]= self.env.move_stack[-1].from_square
        self.lastest_move[1]= self.env.move_stack[-1].to_square
        self.env=self.env.mirror() 
        for i in range(2):
            a=7-self.lastest_move[i]//8
            b=self.lastest_move[i]%8
            self.lastest_move[i]=8*a+b

        self.state = self.get_state()

        # check end game
        if self.is_checkmate():
            reward += 900
            done = True
            print('Win')
        elif self.is_draw():
            reward += 300
            done = True

        # opponent's turn   
        else:
            done = False

            move = find_move(self)

            # location to_square
            to_r, to_c = 7 - move.to_square//8, move.to_square%8
            try:
                reward -= self.point[np.where(self.state[:6, to_r, to_c ] != 0)[0][0]]
            except:
                reward -= 0

            # action
            self.env.push(move)

            #convert turn
            self.lastest_move[0]= self.env.move_stack[-1].from_square
            self.lastest_move[1]= self.env.move_stack[-1].to_square 
            self.env=self.env.mirror() 
            for i in range(2):
                a=7-self.lastest_move[i]//8
                b=self.lastest_move[i]%8
                self.lastest_move[i]=8*a+b

            self.state = self.get_state()

            # check end game
            if self.is_checkmate():
                reward -= 900
                done = True
                print('Lose')
            elif self.is_draw():
                reward += 300
                done = True

        # if reward != -5000: 
        #     reward += 10000

        return self.state, reward, done, {}


In [None]:
class LegalMovesPolicy(Policy):
    """Implement the epsilon greedy policy

    Eps Greedy policy either:

    - takes a random action with probability epsilon
    - takes current best action with prob (1 - epsilon)
    """
    def __init__(self, env: ChessEnv, eps=.1 , randomPer = .4, perLegal = 0.5):
        super().__init__()
        self.eps = eps
        self.env = env
        self.randomPer = randomPer
        self.perLegal = perLegal

    def select_action(self, q_values):
        """Return the selected action

        # Arguments
            q_values (np.ndarray): List of the estimations of Q for each action

        # Returns
            Selection action
        """
        assert q_values.ndim == 1
        nb_actions = q_values.shape[0]

        if len(self.env.legal_moves_encoded()) == 0:
            print("not legal_movel")
            action = np.random.randint(0, nb_actions)

        elif np.random.uniform() < self.eps:
            if np.random.uniform() < self.randomPer:
                action = np.random.randint(0, nb_actions)
                
            else:
                action = np.random.choice(self.env.legal_moves_encoded())
                
        else:
            if np.random.uniform() < self.perLegal:
              idx_sorted = np.argsort(q_values)
              for act in idx_sorted:
                try:
                    from_square, to_square = act//64, act%64                   
                    self.env.env.find_move(from_square= from_square,to_square= to_square)
                    action = act
                except:
                    continue
            else:
              action = np.argmax(q_values)
        return action

In [None]:
# model and Mem
memory = SequentialMemory(limit=10000, window_length=1)

model = get_model888(STATE_SHAPE, NB_ACTIONS)
model.summary()

In [None]:
#NOTE
env = ChessEnv(model, memory, neg_r_each_step=-1, stockfishMem= False)

In [None]:
#NOTE
# model.load_weights('superbot_888.h5')
model.load_weights('drive/MyDrive/Data/Chess/superbot_888.h5')


In [None]:
# # Finally, we configure and compile our agent. You can use every built-in Keras optimizer and
# # even the metrics!
# memory = SequentialMemory(limit=10000, window_length=1)
for i in range (10):
  policy = LegalMovesPolicy(env, 0.1, 0.1, 0.99)
  dqn = DQNAgent(model=model, nb_actions=NB_ACTIONS, memory=memory, batch_size = 16, gamma = 0.5, 
                #  enable_double_dqn = True, 
                target_model_update=1e-2, policy=policy, nb_steps_warmup = 500)
  dqn.compile(Adam(lr=1e-1), metrics=['mae'])

  his = dqn.fit(env, nb_steps=7500, visualize=False, verbose=2)
  
  #NOTE
  model.save('superbot_888.h5')
  !cp -r superbot_888.h5 /content/drive/MyDrive/Data/Chess
