In [189]:
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import chess
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

board = chess.Board() #initialize Board

cuda


In [119]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [120]:
class MultiDimLinear(torch.nn.Linear):
    def __init__(self, in_features, out_shape, **kwargs):
        self.out_shape = out_shape
        out_features = np.prod(out_shape)
        super().__init__(in_features, out_features, **kwargs)

    def forward(self, x):
        out = super().forward(x)
        return out.reshape((len(x), *self.out_shape))

In [138]:
class DQN(nn.Module):

    def __init__(self, n_observations):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = MultiDimLinear(in_features=128, out_shape=(2, 64))

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        output = F.softmax(self.layer3(x),dim=2)
        return output


In [199]:
mapped = {
        'P': 1,     # White Pawn
        'p': -1,    # Black Pawn
        'N': 2,     # White Knight
        'n': -2,    # Black Knight
        'B': 3,     # White Bishop
        'b': -3,    # Black Bishop
        'R': 4,     # White Rook
        'r': -4,    # Black Rook
        'Q': 5,     # White Queen
        'q': -5,    # Black Queen
        'K': 6,     # White King
        'k': -6     # Black King
        }

In [216]:
def make_matrix(board)->np.array: #type(board) == chess.Board()
    pgn = board.epd()
    array = []  #Final board
    pieces = pgn.split(" ", 1)[0]
    rows = pieces.split("/")
    for row in rows:
        array2 = []  #This is the row I make
        for thing in row:
            if thing.isdigit():
                for i in range(0, int(thing)):
                    array2.append(0)
            else:
                array2.append(mapped[thing])
        array.append(array2)
    return np.array(array,dtype=np.int16)

In [219]:
matrix_board: np.array = make_matrix(board)


128


In [113]:
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

In [141]:

policy_net = DQN(n_observations).to(device)
target_net = DQN(n_observations).to(device)
target_net.load_state_dict(policy_net.state_dict())

<All keys matched successfully>

In [174]:
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)

steps_done = 0

In [182]:
def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
                    math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(dim=2).indices.view(2)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)




  state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)


tensor([50, 27], device='cuda:0')

In [114]:
def make_move(board,move_str:str)->None:
    move = chess.Move.from_uci(move_str)
    if move in board.legal_moves:
        board.push(move)
    else:
        print("not legal move")
    
def get_legal_moves(board):
    legal_moves = [str(move) for move in list(board.legal_moves)]
    legal_moves = np.array(legal_moves,dtype='object')
    return legal_moves
    
def state(board):
    return board.epd()

def next_state(board,move_str):
    move = chess.Move.from_uci(move_str)
    if move in board.legal_moves:
        board.push(move)
    else:
        print("not legal move")
    next_state = state(board)
    board.pop()
    return next_state



    

In [101]:
class ChessBoardEnv():
    def __init__(self,board:chess.Board):
        self.black_mapped = {
         #black pieces
        'p': 1,    
        'n': 3,    
        'b': 3,    
        'r': 5,    
        'q': 9
        }
        self.white_mapped = {
            'P':1,
            'N':3,
            'B':3,
            'R':5,
            'Q':9
        }
        self.white_reward_pool =  torch.tensor(3.9000, dtype=torch.float32)
        self.black_reward_pool = torch.tensor(3.9000, dtype=torch.float32)
        self.board = board
         
    def calculate_reward(color, move):
        color = 'white' if self.board.turn else 'black'
        
        if board.is_checkmate():
            if color == 'white':
                return torch.tensor(100.0, dtype=torch.float32)
                
            else:
                self.black_reward_pool += 100.0
        else:
            # If the game is not over, return a reward based on the current state of the board
            if color == 'white':
                self.white_reward_pool = self.evaluate_score()
            else:
                self.black_reward_pool = self.evaluate_score()
            
    def reset(self):
        self.board.reset()
        return self.board.fen

In [92]:
board = chess.Board()
board2 = chess.Board()

In [106]:
make_move(board2,'e7e6')

In [111]:
color = 'white' if board.turn else 'black'
print(color)

black


In [58]:

class ChessRewardFunction:
    def __init__(self):
        self.black_mapped = {
         #black pieces
        'p': 1,    
        'n': 3,    
        'b': 3,    
        'r': 5,    
        'q': 9
        }
        self.white_mapped = {
            'P':1,
            'N':3,
            'B':3,
            'R':5,
            'Q':9
        }
        self.white_reward_pool =  torch.tensor(3.9000, dtype=torch.float32)
        self.black_reward_pool = torch.tensor(3.9000, dtype=torch.float32)

    def assign_scores():
        
    
    def calculate_reward(board, color):
        
        if board.is_checkmate():
            if color == 'white':
                return torch.tensor(100.0, dtype=torch.float32)
                
            else:
                self.black_reward_pool += 100.0
        else:
            # If the game is not over, return a reward based on the current state of the board
            if color == 'white':
                self.white_reward_pool = self.evaluate_score()
            else:
                self.black_reward_pool = self.evaluate_score()
    def evaluate_score(self, board, color):
        '''
        Pawn - 1 point
        Knight - 3 points
        Bishop - 3 points
        Rook - 5 points
        Queen - 9 points
        '''
        epd_string = board.board_fen()
        scores = []
        if color == "white":
            for i in epd_string:
                if i != "/" and i in self.white_mapped:
                    scores.append(self.white_mapped[i])
        else:
            for i in epd_string:
                if i != "/" and i in self.black_mapped:
                    scores.append(self.black_mapped[i])
        return torch.tensor(sum(scores)*0.1,dtype=torch.float32)

    
            
        
        




In [59]:
reward = ChessRewardFunction()

tensor(3.9000)