In [1]:
%reload_ext Cython

In [2]:
import typing
import copy
import pdb
from IPython.display import clear_output
import numpy as np
import time
from colorama import Fore
import sys
import cProfile
from multiprocessing import Pool
import chess 
import boardlib
import chess.uci
import tensorflow as tf

In [3]:
all_squares = [[i,j] for i in range(8) for j in range(8)]
all_moves = [i+j for i in all_squares for j in all_squares]
pieces = ["Empty","Rook","Knight","Bishop","Queen","King","Pawn","Pawn","King","Queen","Bishop","Knight","Rook"]

piece_owner = [0,1,1,1,1,1,1,-1,-1,-1,-1,-1,-1]
list_letters = ['a','b','c','d','e','f','g','h']
list_numbers = list(map(str,list((range(1,9)))))
legal_outputs = ["pawn_move","pawn_move,en-passant","king_move",'king_move,castling,kingside','king_move,castling,queenside','en-passant','']
en_passant = ["c7c5","a2a3","c5c4","b2b4","c4b3"]
zero_line = [0 for _ in range(8)]
move_zero_arr = [0 for i in range(8*8)]
flattened_zero_arr = np.zeros((8,8))
flattened_board = np.zeros((8,8,7),dtype=int)
rights_zero_arr = np.zeros((8,8,2),dtype=int)

In [4]:
#%%cython
#Creates a new board
def create_board(): 
    #pdb.set_trace()
    val = 0
    board = np.array([[val for j in range(8)] for i in range(8)])
    for i in range(8):
        board[1,i] = 6
        board[6,i] = -6
        
    board[0,4] = 5
    board[7,4] =  -5

    board[0,1] = 2
    board[7,1] = -2

    board[0,6] = 2
    board[7,6] = -2

    board[0,2] = 3
    board[7,2] = -3

    board[0,5] = 3 
    board[7,5] = -3

    board[0,3] = 4
    board[7,3] = -4

    board[0,4] = 5
    board[7,4] = -5
    
    board[0,0] = 1
    board[7,0] = -1 
    
    board[0,7] = 1
    board[7,7] = -1
    return [board]

def create_rights():
    rights = np.array([[0 for j in range(8)] for i in range(8)])
    rights[0,4] = 1
    rights[7,4] = -1
    
    rights[0,0] = 1
    rights[7,0] = -1
    rights[0,7] = 1
    rights[7,7] = -1
    return [rights]

def create_player_positions():
    positions_1 = []
    for i in range(8):
        positions_1.append([0,i])
        positions_1.append([1,i])
    
    positions_2 = []
    for i in range(8):
        positions_2.append([6,i])
        positions_2.append([7,i])
        
    return [[positions_1] + [positions_2]]
    
def create_chess_status():
    return [False]

def create_king_positions():
    king_positions = [[0,4],[7,4]]
    return [king_positions]

def get_from_square_player(i,j,board):
    val = board[i,j]
    return val / abs(val)


def pp_board(board):
    for i in range(8):
        print(Fore.BLACK + list_numbers[7-i],end="\t")
        for j in range(8):
            player = piece_owner[board[i,j]]
            if player == 0:
                print(Fore.BLACK + "_",end="\t")
            else: 
                piece = pieces[board[i,j]]
                if player== 1:
                    print(Fore.BLUE + piece,end="\t")
                else:
                    print(Fore.BLACK + piece,end="\t")
        print("\n")
    print("",end="\t")
    for i in list_letters:
        print(Fore.BLACK + i,end="\t")
    sys.stdout.flush()

In [5]:
class ChessBoard:
    def __init__(self,player):
        self.board = create_board()
        self.rights = create_rights()
        self.king_positions = create_king_positions()
        self.player_positions = create_player_positions()
        self.chess_status = create_chess_status()
        self.player = player
        self.move_status = ""
        self.capture_status = []
    
    def make_move(self,move):
        self.board ,self.move_status ,self.rights ,self.player ,self.king_positions ,self.player_positions ,self.chess_status,self.capture_status = boardlib.make_move(self.board,
                                                                                                                                          move,
                                                                                                                                          self.rights,
                                                                                                                                          self.player,
                                                                                                                                          self.king_positions,
                                                                                                                                          self.player_positions,
                                                                                                                                          self.chess_status)
    def unmake_move(self):
        self.board,self.move_status,self.rights,self.player,self.king_positions,self.player_positions,self.chess_status = boardlib.unmake_move(self.board,
                                                                                                                                            self.rights,
                                                                                                                                            self.player,
                                                                                                                                            self.king_positions,
                                                                                                                                            self.player_positions,
                                                                                                                                            self.chess_status)
    def one_hot_encode_state(self):
        one_hot_board = self.one_hot_encode_board()
        one_hot_rights_arr = self.one_hot_encode_rights()
        return np.append(one_hot_board,one_hot_rights_arr)
    
    def one_hot_encode_rights(self):
        rights_zero_arr_cp = np.zeros((8,8,2),dtype=int)
        val = self.rights[-1]
        for i in range(8):
            for j in range(8):
                owner = val[i,j]
                if owner == 0:
                    continue
                rights_zero_arr_cp[i,j,owner] = 1
        return rights_zero_arr_cp.flatten()
    def one_hot_encode_board(self):   
        array = self.board[-1]
        flattened_board_copy = np.zeros((8,8,13),dtype=int) 
        for i in range(8):
            for j in range(8):
                piece_index = int(abs(array[i,j] ))
                sign = boardlib.c_reduce_to_sign(piece_index)
                offset = 0
                if sign == -1:
                    offset = 6
                flattened_board_copy[i,j,int(abs(piece_index))+offset] = sign
        
        return flattened_board_copy.flatten()
    
    def one_hot_encode_move(self,move):
        x_1,y_1,x_2,y_2 = boardlib.algebraic_to_arr_indices(move)
        array_from = copy.deepcopy(move_zero_arr)
        array_to = copy.deepcopy(move_zero_arr)
        index_from = x_1 * 8 + y_1
        index_to = x_2 * 8 + y_2   
        array_from[index_from]  = 1
        array_to[index_to]  = 1   
        return array_from,array_to
 
    def show_board(self):
        pp_board(self.board[-1])
    
    def generate_legal_moves(self):
        return board.generate_all_legal_moves(self.board,self.rights,self.player,
                                        self.king_positions,self.player_positions,
                                        self.chess_status,board.all_moves)
        

In [6]:
#b = ChessBoard(1)
#b.one_hot_encode_state().shape

In [7]:
#Credit to Hvass Laboratories
input_height = 32
input_width = 30
input_size = input_height * input_width
num_squares = 8 *8 
x = tf.placeholder(tf.float32,[None,input_size])
y_true = tf.placeholder(tf.float32,[None,2,num_squares])
y_true_cls = tf.argmax(y_true,axis=2)
filter_size1 = 5
num_filters1 = 16

filter_size2 = 5
num_filters2 = 36


fc_size = 128
learning_rate = 1e-4

In [8]:
#credit to Hvass Laboratories

def new_weights(shape):
    return tf.Variable(tf.truncated_normal(shape,stddev=0.05))

def new_biases(length):
    return tf.Variable(tf.constant(0.5,shape=[length]))

def new_conv_layer(input,              # The previous layer.
                   num_input_channels, # Num. channels in prev. layer.
                   filter_size,        # Width and height of each filter.
                   num_filters,        # Number of filters.
                   use_pooling=True):  # Use 2x2 max-pooling.
    shape = [filter_size,filter_size,num_input_channels,num_filters]
    weights = new_weights(shape=shape)
    biases = new_biases(length=num_filters)
    
    layer = tf.nn.conv2d(input=input,
                         filter=weights,
                         strides=[1,1,1,1],
                         padding='SAME')
    layer += biases
    
    if use_pooling:
        layer = tf.nn.max_pool(value=layer,
                     ksize=[1,2,2,1],
                     strides=[1,2,2,1],padding='SAME')
    layer = tf.nn.relu(layer) 
    return layer,weights

def flatten_layer(layer):
    
    layer_shape = layer.get_shape()
    
    num_features = layer_shape[1:4].num_elements()
    
    layer_falt = tf.reshape(layer,[-1,num_features])
    
    return layer_falt, num_features

def new_fc_layer(input,num_inputs,num_outputs,use_relu=True):
    #num_inputs column size needed to process input, num_outputs, are the amount of columns needed, are the amount of results calculated, each sent out by their channel.
    weights = new_weights(shape=[num_inputs,num_outputs])
    biases = new_biases(length=num_outputs)
    
    layer = tf.matmul(input,weights) + biases
    
    if use_relu:
        layer = tf.nn.relu(layer)
    
    return layer

In [9]:
#%%cython
def curate_pgn_string_from_file(s):
    s = s.replace("\n"," ")
    s = s.replace("+","")
    s = s.replace("Q","")
    s = s.replace("N","")
    s = s.replace("-","")
    s_list = s.split(' ')
    s_list
    list_remove = [s_list[i] for i in range(len(s_list)) if i%3 == 0]
    for i in list_remove:
        s_list.remove(i)
    try:
        s_list.remove("")
    except:
        s_list = s_list
    return s_list

def curate_pgn_string(s):
    f = ["\n","+","Q","N","-","R","x","B","K","#"]
    for i in f:
        s = s.replace(i,"")
    s_list = s.split(' ')
    try:
        s_list.remove("")
    except:
        s_list = s_list
    return s_list
#path = "../extracted/two.txt"
#f = open(path,"r")
#s = f.read()
#curated = curate_pgn_string_from_file(s)

In [10]:
## Test cell. Here are the functions that test the engine

def perft(b,depth):
    if depth == 0:
        return 1
    nodes = 0
    moves = b.generate_legal_moves()
    
    for i in moves:
        b.make_move(i)
        nodes = nodes + perft(b,depth-1)
        b.unmake_move()
    return nodes

def perft_pool(b,depth):
    if depth == 0:
        return 1
    nodes = 0
    moves = b.generate_legal_moves()
    board_list = []
    for i in moves:
        b.make_move(i)
        board_list.append((copy.deepcopy(b),depth-1))
        #nodes = nodes + perft(b,depth-1)
        b.unmake_move()
    p = Pool()
    retvals = p.starmap_async(perft,board_list)
    #print(retvals.get())
    return sum(retvals.get()) 

def perft_test(depth):
    b = ChessBoard(-1)
    start = time.time()
    val = perft_pool(b,depth)
    end = time.time() 
    return val,end-start

def wrong_moves(correct_board,my_board, depth,path):
    if depth== 0:
        return 1

    nodes = 0
    #pdb.set_trace()
    correct_legal_moves = list(map(correct_board.lan,correct_board.legal_moves))
    my_legal_moves = my_board.generate_legal_moves()
    concated_moves = ""
    for i in correct_legal_moves:
        concated_moves = concated_moves + i + ' '
    correct_legal_moves = curate_pgn_string(concated_moves)
    
    false_positive = ["-" + i for i in my_legal_moves if i not in correct_legal_moves]
    false_negative = ["+" + i for i in correct_legal_moves if i not in my_legal_moves]
    
    correct_legal_moves.sort()
    my_legal_moves.sort()
    if correct_legal_moves != my_legal_moves:
        print("error")
    errors = len(false_positive)+ len(false_negative)
    if errors == 0:
        for i in my_legal_moves:
            path_temp = path + " " + i
            move = chess.Move.from_uci(i)
            
            correct_board.push(move)
            my_board.make_move(i)
            
            nodes = nodes + wrong_moves(correct_board,my_board,depth-1,path_temp)
            
            correct_board.pop()
            my_board.unmake_move()
    else:
        print(path + "\n" + str(false_positive) + str(false_negative) + "\n")
        return 1
    return nodes

In [11]:
#%%cython
def play_game_until_error(curated_pgn):
    b = ChessBoard(-1)
    count = 0
    for move in curated_pgn:
        #if count == 3:
            #pdb.set_trace()
        b.make_move(move)
        
#        b,s,r,p,k,player_pos,chess= make_move(b,move,r,p,k,player_pos,chess)
        count += 1
        if not(b.move_status in legal_outputs):#== "castling,k" or s == "castling,q" or s == "" or s == "en-passant"):
            print(count,s)
            print(move)
            board.show_board()
            #pp_board(b[-1])
            break
    success = (b.move_status in legal_outputs)# == "castling,k" or s == "castling,q" or s == "" or s == "en-passant")
    return count, move,success,s
#play_game_until_error(curated)

In [12]:
#%%cython
def start_game(player,moves=[],watching=False,fast_play=False):
    #pdb.set_trace()
    b = ChessBoard(player)
    if fast_play == True:
        for i in moves:
            b.make_move(i)
    count = 0
    do = "m"
    i = 0
    prev = "m"
    stop = False
    b.show_board()
    s = ""
    while not stop:
        do = input()
#        if do == "":
#            do = prev
#            
        if watching:
            if i == len(moves):
                stop = True
                
            if do == "m":
                b.make_move(moves[i])
                prev == "m"
                
                print(moves[i])
            
            elif do == "u":
                b.unmake_move()
                if s == "n":
                    continue
                i = i-2
                prev = "u"
                
                print(moves[i])
            elif do == "exit":
                stop = True
            else:
                print("wrong input")
                continue
        else:  
            if do == "u":
                b.unmake_move()
            
            else:
                b.make_move(do)
                #pdb.set_trace()
                if not(s in legal_outputs):
                    print(s)
                    print(len(s))
                    print("wrong input")
                    sys.stdout.flush()


                    continue

        #clear_output()
        b.show_board()

        sys.stdout.flush()
        count += 1
        i = i + 1
    clear_output()
    b.show_board()

In [13]:
l = ["a2a4","h7h6","a4a5","h6h5","a5a6","h5h4","a6b7","h4h3","b7a8q","h3g2","a8a7","g2f1"]


#cProfile.run('perft_test(3)')
#perft_test(3)
#wrong_moves(chess.Board(),ChessBoard(-1),4,"")
#perft_2(chess.Board(),3)
#start_game(-1,l,watching=True,fast_play=False)

In [14]:
def record_stockfish_game():
    board = chess.Board()
    engine = chess.uci.popen_engine("/usr/local/bin/stockfish")
    engine.uci()
    engine.author
    moves = []
    playing = True
    while playing:
        engine.position(board)
        move = engine.go()[0]
        if move == None:
            playing = False
        else:
            board.push(move)
        moves.append(str(move))
    return moves

def save_recording(l,path):
    file = open(path,"a")
    length = len(l)
    counter = 0
    for i in l:
        file.write("%s " % i )
        counter = counter + 1
    file.close()

def add_game_to_file(path):
    moves = record_stockfish_game()
    save_recording(moves,path)
    
    
def file_to_training_data(path):
    train_x = []
    train_y = []
    file = open(path,"r")
    file_string = file.read()
    file_string = file_string.replace("\n","")
    file_string = file_string.replace("q","")   
    file_array = file_string.split(" ")
    board = ChessBoard(-1)
    for i in file_array:
        if i == "None":
            board = ChessBoard(-1)
            
            continue
        if i == "":
            break
        
        train_x.append(board.one_hot_encode_state())
        board.make_move(i)
        train_y.append(board.one_hot_encode_move(i))
    return train_x,train_y

training_data = file_to_training_data("games.txt")
#training_data = add_game_to_file("data.txt")   

In [15]:
train_x = np.asarray(training_data[0])
train_y = np.asarray(training_data[1])
training_data_reshaped = tf.reshape(train_x,[-1,input_height,input_width,1])
training_data_reshaped_casted = tf.cast(training_data_reshaped,tf.float32)



In [16]:
class TrainingData:
    def __init__(self,train_x,train_y):
        self.train_x = np.asarray(train_x)
        self.train_y = np.asarray(train_y)
        self.x_shape = self.train_x.shape
        self.y_shape = self.train_y.shape       
        self.x_dtype = self.train_x.dtype
        self.y_dtype = self.train_y.dtype
        self.index  = 0
        self.len = len(self.train_x)
        if len(self.train_x) != len(self.train_y):
            raise Exception("x and y are not same length")
    
    def next_batch(self,size):
        if self.index + size >= self.len:
            print("size is larger than the remaining batch")
            return np.zeros(self.x_shape,dtype=self.x_dtype),np.zeros(self.y_shape,dtype=self.y_dtype)
        else:
            retval_x = self.train_x[self.index:self.index+size]
            retval_y = self.train_y[self.index:self.index+size]       
            self.index = self.index + size
            return retval_x,retval_y
        

In [17]:
x_input = tf.reshape(x,[-1,input_height,input_width,1])
layer_conv1, weights_conv1 = new_conv_layer(input=x_input,
                                           num_input_channels=1,
                                           filter_size=filter_size1,
                                           num_filters=num_filters1,
                                           use_pooling=False)

layer_conv2,weights_conv2 = new_conv_layer(input=layer_conv1,
                                           num_input_channels=num_filters1,
                                           filter_size=filter_size2,
                                           num_filters=num_filters2,
                                           use_pooling=False)

layer_flat,num_features = flatten_layer(layer_conv2)

layer_fc1 = new_fc_layer(input=layer_flat,
                        num_inputs=num_features,
                        num_outputs=fc_size,
                        use_relu=True)
layer_fc2 = new_fc_layer(input=layer_fc1,
                        num_inputs = fc_size,
                        num_outputs=num_squares*2,
                        use_relu=True)
layer_fc2_reshaped = tf.reshape(layer_fc2,[-1,2,num_squares])
#layer_fc2_reshaped

y_pred = tf.nn.softmax(layer_fc2_reshaped)
y_pred
y_pred_cls = tf.argmax(y_pred,axis=2)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=layer_fc2_reshaped,
                                                       labels=y_true)

cost = tf.reduce_mean(cross_entropy)

optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)

correct_prediction = tf.equal(y_pred_cls,y_true_cls)

accuracy = tf.reduce_mean(tf.cast(correct_prediction,tf.float32))

In [18]:
session = tf.Session()
session.run(tf.global_variables_initializer())

In [22]:
def optimize(iterations,train_x,train_y,batch_size):
    start_time = time.time()
    train_data = TrainingData(train_x,train_y)

    for i in range(iterations):
        x_batch,y_batch = train_data.next_batch(batch_size)
        
        feed_dict_train = {x:x_batch,y_true:y_batch}
        
        session.run(optimizer,feed_dict=feed_dict_train)
        
        acc = session.run(accuracy, feed_dict=feed_dict_train)
        print("Optimization Iteration "+ str(i) + "Training Accuracy "+str(acc))
    end_time = time.time()
    print("Time used " + str(end_time-start_time))
        



In [23]:
optimize(100,train_x,train_y,18)

Optimization Iteration 0Training Accuracy 0.0277778
Optimization Iteration 1Training Accuracy 0.0277778
Optimization Iteration 2Training Accuracy 0.0
Optimization Iteration 3Training Accuracy 0.0
Optimization Iteration 4Training Accuracy 0.0833333
Optimization Iteration 5Training Accuracy 0.0
Optimization Iteration 6Training Accuracy 0.0555556
Optimization Iteration 7Training Accuracy 0.0555556
Optimization Iteration 8Training Accuracy 0.0277778
Optimization Iteration 9Training Accuracy 0.0
Optimization Iteration 10Training Accuracy 0.0277778
Optimization Iteration 11Training Accuracy 0.0277778
Optimization Iteration 12Training Accuracy 0.0277778
Optimization Iteration 13Training Accuracy 0.0
Optimization Iteration 14Training Accuracy 0.0
Optimization Iteration 15Training Accuracy 0.0
Optimization Iteration 16Training Accuracy 0.0
Optimization Iteration 17Training Accuracy 0.0277778
Optimization Iteration 18Training Accuracy 0.0277778
Optimization Iteration 19Training Accuracy 0.055555

In [21]:
def test_one_hot_encoding():
    b = ChessBoard(-1)
    b_board = b.board[-1]
    b_rights = b.rights[-1]
    one_hot = b.one_hot_encode_board()
    for i in range(8):
        if i not in [0,1,6,7]:
            continue
        for j in range(8):
            index = i * 8*13  + j *13 
            offset = 0
            if index < 0:
                offset = 6
            index = index + int(abs(b_board[i,j]))
            print(one_hot[index],int(abs(b_board[i,j]))+offset)