In [106]:
import chess
from tensorflow.keras.preprocessing.text import Tokenizer
import numpy as np

In [104]:
def fen_to_board(fen):
    board = []
    for row in fen.split('/'):
        brow = []
        for c in row:
            if c == ' ':
                break
            elif c in '12345678':
                brow.extend( ['--'] * int(c) )
            elif c == 'p':
                brow.append( 'bp' )
            elif c == 'P':
                brow.append( 'wp' )
            elif c > 'Z':
                brow.append( 'b'+c.upper() )
            else:
                brow.append( 'w'+c )

        board.append( brow )
    return board

In [188]:
class Envi():
    def __init__(self,intial_fen):
        if intial_fen == "":
            self.board = chess.Board()
        else:
            self.board = chess.Board(intial_fen)
        fen=self.board.fen()
        text=fen_to_board(fen)
        self.tokenizer=Tokenizer()
        self.tokenizer.fit_on_texts(text)

    def step(self,action):
        self.board.push(action)
        self.brd=self.state()
        if self.board.is_checkmate():
            return 10000,self.brd,1
        elif self.board.is_check():
            return 100,self.brd,0
        else:
            return 0,self.brd,0
        
    def state(self):
        fen=self.board.fen()
        text=fen_to_board(fen)
        seq=self.tokenizer.texts_to_sequences(text)
        self.brd=np.array(seq).reshape(64)
        return self.brd
        
    def legal(self):
        self.pmoves=[x for x in self.board.legal_moves]
        return self.pmoves
    
    def reset(self):
        self.board.reset()
        return self.board.fen()

In [200]:
import tensorflow as tf
import numpy as np
import utils
import time
import random
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.losses import MSE
from tensorflow.keras.optimizers import Adam
from collections import deque, namedtuple

In [128]:
MEMORY_SIZE = 100_000     # size of memory buffer
GAMMA = 0.995             # discount factor
ALPHA = 1e-3              # learning rate  
NUM_STEPS_FOR_UPDATE = 4  # perform a learning update every C time steps

In [168]:
q_network = Sequential([
    Input(shape=(64,)),                      
    Dense(units=64, activation='relu'),            
    Dense(units=64, activation='relu'),            
    Dense(units=1, activation='linear'),
    ])

target_q_network = Sequential([
    Input(shape=(64,)),                      
    Dense(units=64, activation='relu'),            
    Dense(units=64, activation='relu'),            
    Dense(units=1, activation='linear'),
    ])

optimizer =Adam(learning_rate=ALPHA)

In [169]:
q_network.summary()

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_18 (Dense)            (None, 64)                4160      
                                                                 
 dense_19 (Dense)            (None, 64)                4160      
                                                                 
 dense_20 (Dense)            (None, 1)                 65        
                                                                 
Total params: 8,385
Trainable params: 8,385
Non-trainable params: 0
_________________________________________________________________


In [134]:
experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

In [135]:
def compute_loss(experiences, gamma, q_network, target_q_network):

    states, actions, rewards, next_states, done_vals = experiences
    max_qsa = tf.reduce_max(target_q_network(next_states), axis=-1)
    y_targets =rewards + (gamma * max_qsa * (1 - done_vals))
    q_values = q_network(states)
    q_values = tf.gather_nd(q_values, tf.stack([tf.range(q_values.shape[0]),tf.cast(actions, tf.int32)], axis=1))    
    loss = MSE(y_targets, q_values)
    
    return loss

In [136]:
@tf.function
def agent_learn(experiences, gamma):
    with tf.GradientTape() as tape:
        loss = compute_loss(experiences, gamma, q_network, target_q_network)

    gradients = tape.gradient(loss, q_network.trainable_variables)
    optimizer.apply_gradients(zip(gradients, q_network.trainable_variables))

    utils.update_target_network(q_network, target_q_network)

In [205]:
num_games=1
num_moves=1000

total_point_history = []

num_p_av=100
epsilon=1.0

target_update_interval=100

memory_buffer=deque(maxlen=MEMORY_SIZE)

target_q_network.set_weights(q_network.get_weights())

In [206]:
def getaction(env,epsilon):
    q = []
    for i in env.legal():
        new_state=env.step(i)[1]
        state_qn = np.expand_dims(new_state, axis=0)
        env.board.pop()
        q.append(q_network(state_qn))
    if np.random.rand() < epsilon:
        return env.legal()[np.random.randint(len(env.legal()))]
    else:
        return env.legal()[np.argmax(q)]
    

In [207]:
def qupdate(t):
    if t % target_update_interval == 0:
        return True
    else:
        return False
        

In [220]:
def get_experiences(memory_buffer):
    batch=random.sample(memory_buffer,10)
    batch=tf.convert_to_tensor(batch)
    return batch

In [221]:
def get_new_eps(eps):
    return eps * 0.99

In [222]:
for i in range(num_games):
    env=Envi("")
    state=env.state()
    total_points=0

    for t in range(1,num_moves):
        action=getaction(env,epsilon)
        next_state,reward,done=env.step(action)
        memory_buffer.append((state,action,reward,next_state,done))

        update=qupdate(t)

        if update:
            experiences=get_experiences(memory_buffer)

            agent_learn(experiences,GAMMA)

            env.board

        state=next_state
        total_points+=reward

        if done:
            break
    
    total_point_history.append(total_points)
    av_latest_points=np.mean(total_point_history[-100:])

    epsilon=get_new_eps(epsilon)

    print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}", end="")

    if (i+1) % num_p_av == 0:
        print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}")

    if av_latest_points >= 200.0:
        print(f"\n\nEnvironment solved in {i+1} episodes!")
        q_network.save('chs-v1.h5')
        break

ValueError: Can't convert Python sequence with mixed types to Tensor.