In [1]:
import os
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

In [2]:
import gym
import gym_chess
import chess

In [3]:
def list_generator(length):
    lst = []
    for i in range(length):
        lst.append(0)
    return lst

In [4]:
#white pieces: rook (r) = 1, knight (n) = 2, bishop (b) = 3, queen (q) = 4, king (k) = 5, pawn (p) = 6
#black pieces: rook (R) = 11, knight (N) = 12, bishop (B) = 13, queen (Q) = 14, king (K) = 15, pawn (P) = 16
#switching ranks: / = 9
#empty squares: number = same number of zeros. Ex: 6 = 0, 0, 0, 0, 0, 0
#results are too long
#try having a list (int_board) with 8 elements, one for each rank
#split fen at '/'
def board_to_int(board):
    if isinstance(board, list):
        real_board = board[0]
        if isinstance(real_board, int):
            return board
        else:
            board = real_board
    fen = board.board_fen()
    fen_by_rank = fen.split('/')
    int_board = []
    for rank in fen_by_rank:
        int_rank = ''
        for symbol in rank:
            #white pieces
            if symbol == 'r':
                int_rank +='1'
            elif symbol == 'n':
                int_rank += '2'
            elif symbol == 'b':
                int_rank += '3'
            elif symbol == 'q':
                int_rank += '4'
            elif symbol == 'k':
                int_rank += '5'
            elif symbol == 'p':
                int_rank += '6'
        
            #black pieces
            elif symbol == 'R':
                int_rank += '11'
            elif symbol == 'N':
                int_rank += '12'
            elif symbol == 'B':
                int_rank += '13'
            elif symbol == 'Q':
                int_rank += '14'
            elif symbol == 'K':
                int_rank += '15'
            elif symbol == 'P':
                int_rank += '16'
            
            #empty squares and lines between ranks
            else:
                for i in range(int(symbol)):
                    int_rank += '0'
        int_board.append(int(int_rank))
    return int_board

In [5]:
# a = 1, b = 2, c = 3, d = 4, e = 5, f = 6, g = 7, h = 8 (n for knight = 0)
def move_to_int(move):
    if isinstance(move, int):
        return move
    if isinstance(move, list):
        move = move[0]
        if isinstance(move, int):
            return move
    uci = move.uci()
    int_move = ''
    for symbol in uci:
        if symbol == 'a':
            int_move += '1'
        elif symbol == 'b':
            int_move += '2'
        elif symbol == 'c':
            int_move += '3'
        elif symbol == 'd':
            int_move += '4'
        elif symbol == 'e':
            int_move += '5'
        elif symbol == 'f':
            int_move += '6'
        elif symbol == 'g':
            int_move += '7'
        elif symbol == 'h':
            int_move += '8'
        elif symbol == 'r' or symbol == 'R':
            int_move += '10'
        elif symbol == 'n' or symbol == 'N':
            int_move += '11'
        elif symbol == 'b' or symbol == 'B':
            int_move += '12'
        elif symbol == 'q' or symbol == 'Q':
            int_move += '13'
        else:
            int_move += symbol
    int_move_final = int(int_move)
    return int_move_final

In [6]:
def int_to_move_white(number):
    if isinstance(number, int):
        uci = ''
        num_str = str(number)
        
        first = num_str[0] #will be letter
        if first == '1':
            uci += 'a'
        elif first == '2':
            uci += 'b'
        elif first == '3':
            uci += 'c'
        elif first == '4':
            uci += 'd'
        elif first == '5':
            uci += 'e'
        elif first == '6':
            uci += 'f'
        elif first == '7':
            uci += 'g'
        elif first == '8':
            uci += 'h'
        
        uci += num_str[1]
        
        third = num_str[2] #will be letter
        if first == '1':
            uci += 'a'
        elif third == '2':
            uci += 'b'
        elif third == '3':
            uci += 'c'
        elif third == '4':
            uci += 'd'
        elif third == '5':
            uci += 'e'
        elif third == '6':
            uci += 'f'
        elif third == '7':
            uci += 'g'
        elif third == '8':
            uci += 'h'
        
        uci += num_str[3]
        
        if len(num_str) == 6:
            last = num_str[4:]
            if last == '10':
                uci += 'R'
            elif last == '11':
                uci += 'N'
            elif last == '12':
                uci += 'B'
            elif last == '13':
                uci += 'Q'
        move = chess.Move.from_uci(uci)
        return move
    else:
        return number

In [7]:
def int_to_move_black(number):
    if isinstance(number, int):
        uci = ''
        num_str = str(number)
        
        first = num_str[0] #will be letter
        if first == '1':
            uci += 'a'
        elif first == '2':
            uci += 'b'
        elif first == '3':
            uci += 'c'
        elif first == '4':
            uci += 'd'
        elif first == '5':
            uci += 'e'
        elif first == '6':
            uci += 'f'
        elif first == '7':
            uci += 'g'
        elif first == '8':
            uci += 'h'
        
        uci += num_str[1]
        
        third = num_str[2] #will be letter
        if first == '1':
            uci += 'a'
        elif third == '2':
            uci += 'b'
        elif third == '3':
            uci += 'c'
        elif third == '4':
            uci += 'd'
        elif third == '5':
            uci += 'e'
        elif third == '6':
            uci += 'f'
        elif third == '7':
            uci += 'g'
        elif third == '8':
            uci += 'h'
        
        uci += num_str[3]
        
        if len(num_str) == 6:
            last = num_str[4:]
            if last == '10':
                uci += 'r'
            elif last == '11':
                uci += 'n'
            elif last == '12':
                uci += 'b'
            elif last == '13':
                uci += 'q'
        move = chess.Move.from_uci(uci)
        return move
    else:
        return number

In [8]:
class ReplayBuffer:
    def __init__(self, memory_size):
        self.memory_size = memory_size
        self.memory_counter = 0
        
        self.state_memory = list_generator(self.memory_size)
        self.new_state_memory = list_generator(self.memory_size)
        self.action_memory = list_generator(self.memory_size)
        self.reward_memory = list_generator(self.memory_size)
        self.terminal_memory = list_generator(self.memory_size)
    
    def store_game(self, state, action, reward, new_state, done):
        index = self.memory_counter % self.memory_size
        self.state_memory[index] = state
        self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.new_state_memory[index] = new_state
        self.terminal_memory[index] = done
        self.memory_counter += 1

    def sample_buffer(self, batch_size):
        max_memory = min(self.memory_counter, self.memory_size)
        batch = np.random.choice(max_memory, batch_size, replace=False)
        
        states = []
        actions = []
        rewards = []
        new_states = []
        terminal = []
        
        for i in batch:
            states.append(self.state_memory[i])
            actions.append(self.action_memory[i])
            rewards.append(self.reward_memory[i])
            new_states.append(self.new_state_memory[i])
            terminal.append(self.terminal_memory[i])

        return states, actions, rewards, new_states, terminal

In [9]:
class Network(nn.Module):
    def __init__(self, lr, n_actions, name, input_dims, checkpoint_directory):
        super(Network, self).__init__()

        self.layer1 = nn.Linear(input_dims, 128)
        self.layer2 = nn.Linear(128, 128)
        self.value_layer = nn.Linear(128, 1)
        self.advantage_layer = nn.Linear(128, n_actions)

        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.MSELoss()
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)
        self.checkpoint_directory = checkpoint_directory
        self.checkpoint_file = os.path.join(self.checkpoint_directory, name+'_dqn.zip')

    def forward(self, state):
        layer_1_output = F.relu(self.layer1(state))
        layer_2_output = F.relu(self.layer2(layer_1_output))
        value = self.value_layer(layer_2_output)
        advantage = self.advantage_layer(layer_2_output)

        return value, advantage
  
    def save_checkpoint(self):
        print('saving checkpoint')
        T.save(self.state_dict(), self.checkpoint_file)
  
    def load_checkpoint(self):
        print('loading checkpoint')
        self.load_state_dict(T.load(self.checkpoint_file))

In [10]:
class Agent:
    def __init__(self, discount_rate, epsilon, lr, n_actions, input_dims, memory_size, batch_size, 
                 checkpoint_directory, epsilon_min=0.01, epsilon_dec=5e-7, replace=1000):
        self.discount_rate = discount_rate
        self.epsilon = epsilon
        self.lr = lr
        self.epsilon_min = epsilon_min
        self.epsilon_dec = epsilon_dec
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.replace = replace

        self.action_space = [i for i in range(n_actions)]
        self.memory = ReplayBuffer(memory_size)

        self.eval_network = Network(lr, n_actions, 'eval_network', input_dims, checkpoint_directory)
        self.next_network = Network(lr, n_actions, 'next_network', input_dims, checkpoint_directory)

    def store_transition(self, state, action, reward, new_state, done):
        game_states = []
        game_actions = []
        game_rewards = []
        game_new_states = []
        game_dones = []

        game_states.append(state)
        game_actions.append(action)
        game_rewards.append(reward)
        game_new_states.append(new_state)
        game_dones.append(done)
        if done == 1:
            self.memory.store_game(game_states, game_actions, game_rewards, game_new_states, done)

    def choose_action(self, observation):
        if np.random.random() > self.epsilon:
            state = T.FloatTensor([board_to_int(observation)]).to(self.eval_network.device)
            _, advantage = self.eval_network.forward(state)
            action = T.argmax(advantage).item()
            if observation.turn:
                try:
                    action = int_to_move_white(action)
                except:
                    return action
            else:
                try:
                    action = int_to_move_black(action)
                except:
                    return action
        else:
            action = np.random.choice(env.legal_moves)
        
        return action
  
    def decrement_epsilon(self):
        if self.epsilon > self.epsilon_min:
            if (self.epsilon - self.epsilon_dec) <= self.epsilon_min:
                self.epsilon = self.epsilon_min
            else:
                self.epsilon = self.epsilon - self.epsilon_dec
                
    def replace_target_network(self):
        if  self.replace is not None and self.learn_step_counter % self.replace == 0:
            self.next_network.load_state_dict(self.eval_network.state_dict())


    def save_models(self):
        self.eval_network.save_checkpoint()
        self.next_network.save_checkpoint()

    def load_models(self):
        self.eval_network.load_checkpoint()
        self.next_network.load_checkpoint()

    def learn(self):
        if self.memory.memory_counter < self.batch_size:
            return

        self.eval_network.optimizer.zero_grad()
    
        self.replace_target_network()

        state, action, reward, new_state, done = self.memory.sample_buffer(self.batch_size)
        
        formatted_state = []
        for thing in state:
            formatted_thing = board_to_int(thing)
            formatted_state.append(formatted_thing)
        states = T.FloatTensor(formatted_state).to(self.eval_network.device)
        
        formatted_action = []
        for thing in action:
            formatted_thing = move_to_int(thing)
            formatted_action.append(formatted_thing)
        actions = T.tensor(formatted_action).to(self.eval_network.device)
        
        formatted_reward = []
        for thing in reward:
            formatted_thing = thing[0]
            formatted_reward.append(formatted_thing)
        rewards = T.tensor(formatted_reward).to(self.eval_network.device)
        
        formatted_new_state = []
        for thing in new_state:
            formatted_thing = board_to_int(thing)
            formatted_new_state.append(formatted_thing)
        new_states = T.FloatTensor(formatted_new_state).to(self.eval_network.device)
        
        dones = T.tensor(done).to(self.eval_network.device)
        
        indices = np.arange(self.batch_size)
        
        value_states = []
        advantage_states = []
        for observation in states:
            value_observation, advantage_observation = self.eval_network.forward(T.stack([observation]))
            value_states.append(value_observation)
            advantage_states.append(advantage_observation)
            
        advantage_states_max = []
        for item in advantage_states:
            max_item = item.max()
            advantage_states_max.append(max_item)
        
        value_states = T.tensor(value_states)
        advantage_states = T.tensor(advantage_states_max)
        
        value_new_states = []
        advantage_new_states = []
        for observation in new_states:
            value_new_observation, advantage_new_observation = self.next_network.forward(T.stack([observation]))
            value_new_states.append(value_new_observation)
            advantage_new_states.append(advantage_new_observation)
        
        advantage_new_states_max = []
        for item in advantage_new_states:
            max_item = item.max()
            advantage_new_states_max.append(max_item)
        
        value_new_states = T.tensor(value_new_states)
        advantage_new_states = T.tensor(advantage_new_states_max)

        value_new_states_eval = []
        advantage_new_states_eval = []
        for observation in new_states:
            value_new_observation_eval, advantage_new_observation_eval = self.eval_network.forward(T.stack([observation]))
            value_new_states_eval.append(value_new_observation_eval)
            advantage_new_states_eval.append(advantage_new_observation_eval)
        
        advantage_new_states_eval_max = []
        for item in advantage_new_states_eval:
            max_item = item.max()
            advantage_new_states_eval_max.append(max_item)
        
        value_new_states_eval = T.tensor(value_new_states_eval)
        advantage_new_states_eval = T.tensor(advantage_new_states_eval_max)

        q_pred = T.add(value_states, (advantage_states - advantage_states.mean(dim=0, keepdim=True)))[indices]
        q_next = T.add(value_new_states, (advantage_new_states - advantage_new_states.mean(dim=0, keepdim=True)))
        
        q_eval = T.add(value_new_states_eval, (advantage_new_states_eval - advantage_new_states_eval.mean(dim=0, keepdim=True)))
        
        max_actions = T.argmax(q_eval, dim=0)

        q_target = rewards + self.discount_rate*q_next
        q_target[dones] = 0.0
        
        loss = self.eval_network.loss(q_target, q_pred)
        loss.requires_grad_()
        loss.backward()
        self.eval_network.optimizer.step()
        self.learn_step_counter += 1

        self.decrement_epsilon()

In [11]:
env = gym.make('Chess-v0')
num_games = 20
load_checkpoint = True

In [12]:
agent = Agent(discount_rate=0.95, epsilon=1.0, lr=5e-4, n_actions=64**2, input_dims=8, memory_size=1000000,
              batch_size=64, checkpoint_directory=r'C:\Users\cghat\Documents\Reinforcement Learning Project\ChessFolder')

In [13]:
if load_checkpoint:
    agent.load_models()

loading checkpoint
loading checkpoint


In [14]:
for i in range(num_games):
    print(i)
    done = 0
    observation = board_to_int(env.reset())
    
    while done != 1:
        if observation == [12345321, 66666666, 0, 0, 0, 0, 1616161616161616, 1112131415131211]:
            observation = chess.Board()
        action = agent.choose_action(observation)
        if isinstance(action, int):
            new_observation = observation
            reward = -1000000
            done = 1
            agent.store_transition(observation, action, reward, new_observation, 1)
            agent.learn()
        else:
            try:
                new_observation, reward, done, info = env.step(action)
                agent.store_transition(observation, action, reward, new_observation, int(done))
                agent.learn()
                observation = new_observation
            except ValueError:
                new_observation = observation
                reward = -1000000
                done = 1
                agent.store_transition(observation, action, reward, new_observation, 1)
                agent.learn()
    
    if i > 0 and i % 10 == 0:
        agent.save_models()

0
1
2
3
4
5
6
7
8
9
10
saving checkpoint
saving checkpoint
11
12
13
14
15
16
17
18
19


In [15]:
#AI is white
def play_AI_white():
    agent.load_models()
    observation = board_to_int(env.reset())
    done = 0
    
    while done != 1:
        if observation == [12345321, 66666666, 0, 0, 0, 0, 1616161616161616, 1112131415131211]:
            observation = chess.Board()
        if observation.turn:
            action = agent.choose_action(observation)
            print(action)
            try:
                new_observation, reward, done, info = env.step(action)
                agent.store_transition(observation, action, reward, new_observation, int(done))
                observation = new_observation
            except ValueError:
                raise ValueError('Agent made an invalid move')
        else:
            action = chess.Move.from_uci(input('Black\'s move: '))
            try:
                new_observation, reward, done, info = env.step(action)
                agent.store_transition(observation, action, reward, new_observation, int(done))
                observation = new_observation
            except ValueError:
                raise ValueError('User made an invalid move')

In [16]:
#AI is black
def play_AI_black():
    agent.load_models()
    observation = board_to_int(env.reset())
    done = 0
    
    while done != 1:
        if observation == [12345321, 66666666, 0, 0, 0, 0, 1616161616161616, 1112131415131211]:
            observation = chess.Board()
        if observation.turn:
            action = chess.Move.from_uci(input('White\'s move: '))
            try:
                new_observation, reward, done, info = env.step(action)
                agent.store_transition(observation, action, reward, new_observation, int(done))
                observation = new_observation
            except ValueError:
                raise ValueError('User made an invalid move')
        else:
            action = agent.choose_action(observation)
            print(action)
            try:
                new_observation, reward, done, info = env.step(action)
                agent.store_transition(observation, action, reward, new_observation, int(done))
                observation = new_observation
            except ValueError:
                raise ValueError('Agent made an invalid move')

In [17]:
#playing with human sequence (switch function depending on which color you want AI to be)
play_AI_black()

loading checkpoint
loading checkpoint
White's move: c2c4
e7e6
White's move: g2g3
h7h6
White's move: f1g2
g8f6
White's move: d2d3
f6d5
White's move: c4d5
b8a6
White's move: d5e6
d7d6
White's move: e6f7
e8d7
White's move: g1f3
g7g6
White's move: g2h3
d7c6
White's move: d1c2
a6c5
White's move: f3d4
c6d5
White's move: c2c4
d5e5
White's move: c1f4
e5f6
White's move: h3c8
b7b5
White's move: c4b5
c5d3
White's move: e2d3
g6g5
White's move: b5f5
f6g7
White's move: d4e6
