In [56]:
import chess
import chess.pgn
import numpy as np
from copy import deepcopy
import random
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable

In [57]:
class Value_Network(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.input_shape = 384
        self.action_space = 1
        self.hidden1 = 1024
        self.hidden2 = 512
        
        self.fc1 = nn.Linear(self.input_shape, self.hidden1)
        self.fc2 = nn.Linear(self.hidden1, self.hidden2)
        self.fc3 = nn.Linear(self.hidden2, self.action_space)

        self.optimizer = optim.Adam(self.parameters(), lr=0.001)
        self.loss = nn.MSELoss()
        self.to("cpu")
    
    def forward(self, x):
        x = x.flatten()
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)

        return x

value_network = Value_Network()
board = torch.Tensor(np.zeros((6, 8, 8)))
# print(network.forward(board))

In [58]:
class Policy_Network(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.input_shape = 49
        self.action_space = 1
        self.hidden1 = 1024
        self.hidden2 = 8*8*76
        
        self.fc0 = nn.Conv3d(6, 1, kernel_size=(2, 2, 1), padding=0)
        self.fc1 = nn.Linear(self.input_shape, self.hidden1)
        self.fc2 = nn.Linear(self.hidden1, self.hidden2)

        self.optimizer = optim.Adam(self.parameters(), lr=0.001)
        self.loss = nn.MSELoss()
        self.to("cpu")
    
    def forward(self, x):
        x = x.reshape(1, 6, 8, 8, 1)
        x = F.relu(self.fc0(x))
        x = x.flatten()
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = x.reshape(1, 8, 8, 76)

        return x

policy_network = Policy_Network()
board = torch.Tensor(np.zeros((6,8,8)))
board = board.unsqueeze(0)
# print(network.forward(board))

In [59]:
class Creating_Data():
    def __init__(self):
        self.pieces = ["P", "R", "N", "B", "Q", "K"]

    def fen_to_array(self, fen):
        numpy_board = np.zeros((8, 8))
        row = 0
        column = 0

        for char in range(len(fen)):
            number = 0
            if fen[char] == " " or row == 8:
                break
            if fen[char] == "/":
                row += 1
                column = -1

            if fen[char].isalpha():
                character = fen[char]
                adjusted = character.upper()

                number = self.pieces.index(adjusted) + 1
                if character.islower(): number += 8
            
            if fen[char].isnumeric():
                if int(fen[char]) != 8:
                    column += int(fen[char]) - 1

                    if column > 7:
                        row += 1
                        column = -1

            numpy_board[row][column] = number
            column += 1
        
        return numpy_board

    def fen_to_color(self, fen):
        for char in range(len(fen)):
            if fen[char] == " ":
                if fen[char+1] == "w":
                    return 0
                if fen[char+1] == "b":
                    return 1

    def fen_to_movenumber(self, fen):
        counter = 0
        for char in range(len(fen)):
            if fen[char] == " ":
                counter += 1
            
            if counter == 5:
                return int(fen[char:len(fen)])
            

In [60]:
import chess

board = chess.Board()
for move in board.legal_moves: # letters are columns, numbers are rows
    move_s, move_e = str(move)[0:2], str(move)[2:4]
    
    print(move)
    print(move_s)
    
    
    
    break

g1h3
g1


In [61]:
class Probability_Matrix():
    def __init__(self):
        self.creating_data = Creating_Data()

    def stack_generation(self, board, player):
        # make the 8x8x6 stack thats passed in
        numpy_board = self.creating_data.fen_to_array(board.fen())
        player = np.full((8, 8), player)
        
        white_king_castling = int(board.has_kingside_castling_rights(color=chess.WHITE))
        white_king_castling = np.full((8, 8), white_king_castling)
        white_queen_castling = int(board.has_queenside_castling_rights(color=chess.WHITE))
        white_queen_castling = np.full((8, 8), white_queen_castling)
        black_king_castling = int(board.has_kingside_castling_rights(color=chess.BLACK))
        black_king_castling = np.full((8, 8), black_king_castling)
        black_queen_castling = int(board.has_queenside_castling_rights(color=chess.BLACK))
        black_queen_castling = np.full((8, 8), black_queen_castling)
        
        stack = []
        stack.extend([deepcopy(numpy_board), deepcopy(player)])
        stack.extend([deepcopy(white_king_castling), deepcopy(white_queen_castling)])
        stack.extend([deepcopy(black_king_castling), deepcopy(black_queen_castling)])
        stack = np.stack(stack)
        return stack
    
    def move_value(self, move, stack, board):        
        # network will output 8x8x76
        # figure out the value of the corresponding move played by the child
        # stack is returned and move given and we want to find value
        
        stack = stack
        stack = stack.squeeze(0)
        move_s = str(move)[0:2]
        move_e = str(move)[2:4]
        
        file_s, rank_s = int(ord(move_s[0]) - 97), int(move_s[1])-1
        file_e, rank_e = int(ord(move_e[0]) - 97), int(move_e[1])-1

        square_s = chess.parse_square(chess.square_name(chess.square(file_s, rank_s))) # converts file rank into square type
        square_e = chess.parse_square(chess.square_name(chess.square(file_e, rank_e))) # converts file rank into square type
        piece = str(board.piece_at(square_s)).lower() # piece at square location
        distance = chess.square_distance(square_s, square_e)
        
        if piece == "n":
            if rank_s - rank_e == -2: # up the board
                if file_s - file_e == 1: # left
                    return stack[rank_s][file_s][56]
                if file_s - file_e == -1: # right
                    return stack[rank_s][file_s][57]
            if rank_s - rank_e == 2: # down the board
                if file_s - file_e == 1: # left
                    return stack[rank_s][file_s][58] 
                if file_s - file_e == -1: # right
                    return stack[rank_s][file_s][59] 
            if file_s - file_e == -2: # right
                if rank_s - rank_e == 1:
                    return stack[rank_s][file_s][60] 
                if rank_s - rank_e == -1:
                    return stack[rank_s][file_s][61] 
            if file_s - file_e == 2: # left
                if rank_s - rank_e == 1:
                    return stack[rank_s][file_s][62] 
                if rank_s - rank_e == -1:
                    return stack[rank_s][file_s][63] 
                
        if piece == "p":
            if len(str(move)) == 5:
                if rank_s - rank_e == -1: # capture up
                    if file_s - final_e == -1: # capture left
                        if move[4] == "r": # promote to rook
                            return stack[rank_s][file_s][64]
                        if move[4] == "b": # promote to bishop
                            return stack[rank_s][file_s][65] 
                        if move[4] == "n": # promote to knight
                            return stack[rank_s][file_s][66] 
                    if file_s - final_e == 1: # capture right
                        if move[4] == "r": # promote to rook
                            return stack[rank_s][file_s][67] 
                        if move[4] == "b": # promote to bishop
                            return stack[rank_s][file_s][68] 
                        if move[4] == "n": # promote to knight
                            return stack[rank_s][file_s][69] 
                if rank_s - rank_e == 1: # capture down
                    if file_s - final_e == -1: # capture left
                        if move[4] == "r": # promote to rook
                            return stack[rank_s][file_s][70] 
                        if move[4] == "b": # promote to bishop
                            return stack[rank_s][file_s][71] 
                        if move[4] == "n": # promote to knight
                            return stack[rank_s][file_s][72] 
                    if file_s - final_e == 1: # capture right
                        if move[4] == "r": # promote to rook
                            return stack[rank_s][file_s][73] 
                        if move[4] == "b": # promote to bishop
                            return stack[rank_s][file_s][74] 
                        if move[4] == "n": # promote to knight
                            return stack[rank_s][file_s][75] 
                    
        if rank_s > rank_e and file_s - file_e == 0: # N
            return stack[rank_s][file_s][distance - 1]
        if rank_s > rank_e and file_s > file_e: # NE
            return stack[rank_s][file_s][7 * 1 + distance - 1] 
        if rank_s == rank_e and file_s < file_e: # E
            return stack[rank_s][file_s][7 * 2 + distance - 1] 
        if rank_s < rank_e and file_s < file_e: # SE
            return stack[rank_s][file_s][7 * 3 + distance - 1]
        if rank_s < rank_e and file_s - file_e == 0: # S
            return stack[rank_s][file_s][7 * 4 + distance - 1] 
        if rank_s < rank_e and file_s < file_e: # SW
            return stack[rank_s][file_s][7 * 5 + distance - 1] 
        if rank_s == rank_e and file_s > file_e: # W
            return stack[rank_s][file_s][7 * 6 + distance - 1] 
        if rank_s > rank_e and file_s < file_e: # NW
            return stack[rank_s][file_s][7 * 7 + distance - 1]
        return 0
               
    def fill_stack(self, stack, move, value, board):
        # fill up the array based on what the calculated values are
        # given a stack, move, and value
        
        stack = stack
        
        move_s = str(move)[0:2]
        move_e = str(move)[2:4]
        
        file_s, rank_s = int(ord(move_s[0]) - 97), int(move_s[1])-1
        file_e, rank_e = int(ord(move_e[0]) - 97), int(move_e[1])-1

        square_s = chess.parse_square(chess.square_name(chess.square(file_s, rank_s))) # converts file rank into square type
        square_e = chess.parse_square(chess.square_name(chess.square(file_e, rank_e))) # converts file rank into square type
        piece = str(board.piece_at(square_s)).lower() # piece at square location
        distance = chess.square_distance(square_s, square_e)
        
        if piece == "n":
            if rank_s - rank_e == -2: # up the board
                if file_s - file_e == 1: # left
                    stack[rank_s][file_s][56] = value
                if file_s - file_e == -1: # right
                    stack[rank_s][file_s][57] = value
            if rank_s - rank_e == 2: # down the board
                if file_s - file_e == 1: # left
                    stack[rank_s][file_s][58] = value
                if file_s - file_e == -1: # right
                    stack[rank_s][file_s][59] = value
            if file_s - file_e == -2: # right
                if rank_s - rank_e == 1:
                    stack[rank_s][file_s][60] = value
                if rank_s - rank_e == -1:
                    stack[rank_s][file_s][61] = value
            if file_s - file_e == 2: # left
                if rank_s - rank_e == 1:
                    stack[rank_s][file_s][62] = value
                if rank_s - rank_e == -1:
                    stack[rank_s][file_s][63] = value
                
        if piece == "p":
            if len(str(move)) == 5:
                if rank_s - rank_e == -1: # capture up
                    if file_s - final_e == -1: # capture left
                        if move[4] == "r": # promote to rook
                            stack[rank_s][file_s][64] = value
                        if move[4] == "b": # promote to bishop
                            stack[rank_s][file_s][65] = value
                        if move[4] == "n": # promote to knight
                            stack[rank_s][file_s][66] = value
                    if file_s - final_e == 1: # capture right
                        if move[4] == "r": # promote to rook
                            stack[rank_s][file_s][67] = value
                        if move[4] == "b": # promote to bishop
                            stack[rank_s][file_s][68] = value
                        if move[4] == "n": # promote to knight
                            stack[rank_s][file_s][69] = value
                if rank_s - rank_e == 1: # capture down
                    if file_s - final_e == -1: # capture left
                        if move[4] == "r": # promote to rook
                            stack[rank_s][file_s][70] = value
                        if move[4] == "b": # promote to bishop
                            stack[rank_s][file_s][71] = value
                        if move[4] == "n": # promote to knight
                            stack[rank_s][file_s][72] = value
                    if file_s - final_e == 1: # capture right
                        if move[4] == "r": # promote to rook
                            stack[rank_s][file_s][73] = value
                        if move[4] == "b": # promote to bishop
                            stack[rank_s][file_s][74] = value
                        if move[4] == "n": # promote to knight
                            stack[rank_s][file_s][75] = value
                    
        if rank_s > rank_e and file_s - file_e == 0: # N
            stack[rank_s][file_s][distance - 1] = value
        if rank_s > rank_e and file_s > file_e: # NE
            stack[rank_s][file_s][7 * 1 + distance - 1] = value
        if rank_s == rank_e and file_s < file_e: # E
            stack[rank_s][file_s][7 * 2 + distance - 1] = value
        if rank_s < rank_e and file_s < file_e: # SE
            stack[rank_s][file_s][7 * 3 + distance - 1] = value
        if rank_s < rank_e and file_s - file_e == 0: # S
            stack[rank_s][file_s][7 * 4 + distance - 1] = value 
        if rank_s < rank_e and file_s < file_e: # SW
            stack[rank_s][file_s][7 * 5 + distance - 1] = value 
        if rank_s == rank_e and file_s > file_e: # W
            stack[rank_s][file_s][7 * 6 + distance - 1] = value 
        if rank_s > rank_e and file_s < file_e: # NW
             stack[rank_s][file_s][7 * 7 + distance - 1] = value
        
        return stack

probability_matrix = Probability_Matrix()

board = chess.Board()
player = 1

stack = probability_matrix.stack_generation(board, player)
print(stack.shape)

(6, 8, 8)


In [62]:
MEM_SIZE = 1000
BATCH_SIZE = 10

class ReplayBuffer:
    def __init__(self):
        self.mem_count = 0
        
        self.value_states = np.zeros((MEM_SIZE, *(6, 8, 8)),dtype=np.float32)
        self.values = np.zeros(MEM_SIZE, dtype=np.int64)
        
        self.policy_states = np.zeros((MEM_SIZE, *(8, 8, 76)),dtype=np.float32)
        self.policys = np.zeros((MEM_SIZE, *(8, 8, 76)),dtype=np.float32)
        
    
    def add_value(self, value_state, value):
        mem_index = self.mem_count % MEM_SIZE
        
        self.value_states[mem_index]  = value_state
        self.values[mem_index] = value

        self.mem_count += 1
    
    def add_policy(self, policy_state, policy):
        mem_index = self.mem_count % MEM_SIZE
        
        self.policy_states[mem_index]  = policy_state.detach().numpy()
        self.policys[mem_index] = policy
        
        self.mem_count += 1
    
    def sample(self):
        MEM_MAX = min(self.mem_count, MEM_SIZE)
        batch_indices = np.random.choice(MEM_MAX, BATCH_SIZE, replace=True)
        
        value_states  = self.value_states[batch_indices]
        values = self.values[batch_indices]
        policy_before = self.policy_states[batch_indices]
        policy_computed = self.policys[batch_indices]

        return value_states, values, policy_before, policy_computed

replay_buffer = ReplayBuffer()

In [63]:
class Node:
    def __init__(self, parent, state):
        self.parent = parent
        self.state = state
        self.player = None
        self.children = None
        self.move = None

        self.value = 0
        self.prob = 0
        self.visits = 0
        self.exploration_constant = 2
        self.probability_matrix =  Probability_Matrix()
    
    def choose_node(self):
        best_ucb = float('-inf')
        best_node = None

        for child in self.children:
            if child.visits > 0:

                if child.prob == 0:
                    # run network on stack to get prob -> given board find prob of move
                    # run policy network and then store                    
                    board = self.probability_matrix.stack_generation(self.state, self.player)
                    board = torch.Tensor(board)
                    board = board.unsqueeze(0)
                    
                    stack = policy_network.forward(board)
                    child.prob = self.probability_matrix.move_value(child.move, stack, self.state)                    

                ucb = child.value/child.visits + child.prob/child.visits
            else:
                ucb = float('inf')

            if ucb > best_ucb:
                best_ucb = ucb
                best_node = child

        return best_node
    
    def create_children(self):
        list_of_children = []

        legal_moves = self.state.legal_moves

        for move in legal_moves:
            temporary_board = deepcopy(self.state)
            temporary_board.push_san(str(move))

            temporary_node = Node(self, deepcopy(temporary_board))
            temporary_node.player = 3 - self.player
            temporary_node.move = move
            list_of_children.append(temporary_node)
        
        self.children = list_of_children
        
board = chess.Board()
node = Node(None, board)
node.player = 2

node.create_children()
"""
for child in node.children:
    print("")
    print(child.state)
"""

'\nfor child in node.children:\n    print("")\n    print(child.state)\n'

In [64]:
class MCTS:
    def __init__(self):
        self.search_length = 10
        self.simulation_length = 10
        self.simulation_depth = 10
        
        self.probability_matrix = Probability_Matrix()
    
    def search(self, state, player):
        starting_node = Node(None, state)
        starting_node.player = 3 - player
        starting_node.visits = 1
        starting_node.create_children()
        starting_node.state = state

        for i in range(self.search_length):
            new_node = self.selection(starting_node)

            for i in range(self.simulation_length):
                new_node.value += self.simulation(new_node)
            
            self.backpropogation(new_node, new_node.value)
        
        if player == 1:
            best_action_value = float("-inf")
        else:
            best_action_value = float("inf")
        
        stack = np.zeros((8, 8, 76))
        input_stack = probability_matrix.stack_generation(state, 3-player)
        output_stack = policy_network.forward(torch.Tensor(input_stack))
        
        # network stack/output
        
        for child in starting_node.children:
            if child.visits != 0:
                value = child.value/child.visits
                # this value needs to be put into an array that gets trained later
                stack = probability_matrix.fill_stack(stack, child.move, value, starting_node.state)
                if player == 1:
                    if value > best_action_value:
                        state = child.state
                        move = child.move
                        best_action_value = value
                else:
                    if value < best_action_value:
                        state = child.state
                        move = child.move
                        best_action_value = value
                    
        replay_buffer.add_policy(output_stack, stack)   
        
        return state, value, move
            
    def selection(self, node):
        while not node.state.is_checkmate():
            if node.children == None:
                if node.visits == 0:
                    return node

                node.create_children()
                return node.children[0]
            
            else:
                node = node.choose_node()
        
        return node

    def simulation(self, node):
        for i in range(self.simulation_depth):
            if node.state.is_checkmate():
                stack = torch.Tensor(probability_matrix.stack_generation(node.state, node.player))
                score = value_network.forward(stack)
                
                return score

            possible_boards = []
            legal_moves = node.state.legal_moves

            for move in legal_moves:
                temporary_board = deepcopy(node.state)
                temporary_board.push_san(str(move))
                possible_boards.append(temporary_board)
            
            state = possible_boards[random.randint(0, len(possible_boards)-1)]
            player = 3 - node.player
            node = Node(self, deepcopy(state))
            node.player = player
            
        stack = torch.Tensor(probability_matrix.stack_generation(node.state, node.player))
        score = value_network.forward(stack)
            
        return score

    def backpropogation(self, node, score):
        # set past node scores to value net average

        while node.parent != None:
            node.visits += 1
            node.value += score
            node = node.parent

#mcts = MCTS()
#print("hi")
#mcts.search(chess.Board(), 2)

In [65]:
# code to retrain the networks
class Learn:
    def retrain(self):
        value_states, values, policy_before, policy_computed = replay_buffer.sample()
        
        value_states = torch.Tensor(value_states)
        values = torch.Tensor(values)
        policy_before = torch.Tensor(policy_before)
        policy_computed = torch.Tensor(policy_computed)
        
        value_states_values = []
        for value_state in value_states:
            value_states_values.append(value_network.forward(value_state))
        value_states_values = torch.Tensor(value_states_values)
        
        value_loss = ((values - value_states_values)**2).mean()
        policy_loss = ((policy_before - policy_computed)**2).mean()
        print(value_loss, policy_loss)
        
        value_loss = Variable(value_loss, requires_grad = True)
        policy_loss = Variable(policy_loss, requires_grad = True)
        
        
        policy_network.optimizer.zero_grad()
        policy_loss.backward()
        policy_network.optimizer.step()
            
        value_network.optimizer.zero_grad()
        value_loss.backward()
        value_network.optimizer.step()
        
        return value_states, values, policy_before, policy_computed 

learn = Learn()

In [None]:
mcts = MCTS()
probability_matrix = Probability_Matrix()

max_moves = 30
episodes = 1000
w_wins, b_wins, draws = 0, 0, 0

for episode in range(episodes):
    state = chess.Board()
    player = 2

    move_counter = 1
    boards = []
    moves = ""
    game = chess.pgn.Game()
    game.headers["Event"] = "Example"
    
    print(f"episode {episode} white wins {w_wins} black wins {b_wins} draws {draws}")
    while not state.is_checkmate():
        state, value, move = mcts.search(state, player)
        print("--")
        print(state)
        print(move, move_counter)
        moves += str(move)
        boards.append(probability_matrix.stack_generation(state, player))
        player = 3 - player
        move_counter += 1


        if move_counter == max_moves:
            break
    
    print(moves)
    value = 0
    if node.state.is_checkmate():
        score = node.state.result()
        p1_score, p2_score = int(score[0]), int(score[2])

        if p1_score == 1:
            value = 1
            w_wins += 1
        if p2_score == 1:
            value = -1
            b_wins += 1
    if value == 0:
        draws += 1

    for board in boards:
        replay_buffer.add_value(board, value)

    learn.retrain()



episode 0 white wins 0 black wins 0 draws 0
--
r n b q k b n r
p p p p p p p p
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . P . .
P P P P P . P P
R N B Q K B N R
f2f3 1
--
r n b q k b . r
p p p p p p p p
. . . . . . . n
. . . . . . . .
. . . . . . . .
. . . . . P . .
P P P P P . P P
R N B Q K B N R
g8h6 2
--
r n b q k b . r
p p p p p p p p
. . . . . . . n
. . . . . . . .
. . . . . . . .
. . P . . P . .
P P . P P . P P
R N B Q K B N R
c2c3 3
--
r n b q k b . r
p p p p p . p p
. . . . . p . n
. . . . . . . .
. . . . . . . .
. . P . . P . .
P P . P P . P P
R N B Q K B N R
f7f6 4
--
r n b q k b . r
p p p p p . p p
. . . . . p . n
. . . . . . . .
. . . . . . . .
. . P . . P . P
P P . P P . P .
R N B Q K B N R
h2h3 5
--
r n b q k b r .
p p p p p . p p
. . . . . p . n
. . . . . . . .
. . . . . . . .
. . P . . P . P
P P . P P . P .
R N B Q K B N R
h8g8 6
--
r n b q k b r .
p p p p p . p p
. . . . . p . n
. . . . . . . .
. . P . . . . .
. . . . . P . P
P P . P P . P .
R N B Q K B N

In [None]:
# mcts search done now we need to retrain the network -> create dataset - lets retrain the network and then clear the memory
# policy network needs to be retrained and value net
# value net over - expected/actually played
# policy net over the value distributions of every move that is calculated vs what the network outputs
# create dataset of policy -> assign legal move to place in array

# once collected enough - retrain the network, and clear the memory
# the array has to be a few moves back too for value net

# ----
# record policy net into memory - what it outputs vs what mcts calculates each move value to be
# once reached memory capacity, retrain the network 

In [None]:
"""
todo

time to retrain networks!

policy net!
    fill in array with values
    save to memory
value net!!
    trace back the final outcome of the game to each state - put in memory
    
"""