# Leitura e filtro das bases

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Treinando o transformer do HuggingFace

[Tutorial](https://huggingface.co/blog/how-to-train)

## Treinando um novo tokenizer

In [2]:
linhas = ['1', '2', '3', '4', '5', '6', '7', '8']
colunas = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']

pecas = ['N', 'B', 'R', 'Q', 'K']
variacoes = ['+', '#']

casas = []

for coluna in colunas:
    for linha in linhas:
        casas.append(coluna + linha)

# Movimentos comuns

movimentos_comuns = []
movimentos_comuns_com_indicador = []

movimentos_comuns_captura = []
movimentos_comuns_captura_com_indicador = []

for peca in pecas:
    for casa in casas:
        movimentos_comuns.append(peca + casa) # movimentos comuns

        movimentos_comuns_captura.append(peca + 'x' + casa) # movimentos com captura

        if peca in ('N', 'R'):
            for indicador in linhas + colunas:
                movimentos_comuns_com_indicador.append(peca + indicador + casa)
                movimentos_comuns_captura_com_indicador.append(peca + indicador + 'x' + casa)

# Movimentos peoes

movimentos_peoes = casas.copy()
movimentos_peoes_com_captura = []
movimentos_peoes_promocao = []

for coluna in colunas:
    for casa in casas:
        movimentos_peoes_com_captura.append(coluna + 'x' + casa)

for movimento_peao in movimentos_peoes + movimentos_peoes_com_captura:
    if movimento_peao[-1] in ('1', '8'):
        for promocao in pecas:
            if promocao != 'K':
                movimentos_peoes_promocao.append(movimento_peao + '=' + promocao)

print(f'{len(movimentos_comuns)} movimentos comuns')
print(f'{len(movimentos_comuns_com_indicador)} movimentos comuns com indicador')
print(f'{len(movimentos_comuns_captura)} movimentos comuns com captura')
print(f'{len(movimentos_comuns_captura_com_indicador)} movimentos comuns com captura com indicador')
print(f'{len(movimentos_peoes)} movimentos de peões')
print(f'{len(movimentos_peoes_com_captura)} movimentos de peões com captura')
print(f'{len(movimentos_peoes_promocao)} movimentos de peões com promoção')

roques = ['O-O-O', 'O-O']

todos_movimentos = movimentos_comuns + movimentos_comuns_captura + movimentos_comuns_captura_com_indicador +\
 movimentos_comuns_com_indicador + movimentos_peoes + movimentos_peoes_com_captura + movimentos_peoes_promocao + roques

cheques = [movimento + '+' for movimento in todos_movimentos]
mates = [movimento + '#' for movimento in todos_movimentos]

especiais = ['[UNK]', '[CLS]', '[SEP]', '[PAD]', '[MASK]']

todos_movimentos = especiais + todos_movimentos + cheques + mates

print(f'{len(todos_movimentos)} movimentos mapeados')

chess_voc = dict(zip(todos_movimentos, range(len(todos_movimentos))))

320 movimentos comuns
2048 movimentos comuns com indicador
320 movimentos comuns com captura
2048 movimentos comuns com captura com indicador
64 movimentos de peões
512 movimentos de peões com captura
576 movimentos de peões com promoção
17675 movimentos mapeados


In [3]:
# We won't need TensorFlow here
!pip uninstall -y tensorflow
# Install `transformers` from master
!pip install git+https://github.com/huggingface/transformers
!pip list | grep -E 'transformers|tokenizers'

Found existing installation: tensorflow 2.7.0
Uninstalling tensorflow-2.7.0:
  Successfully uninstalled tensorflow-2.7.0
Collecting git+https://github.com/huggingface/transformers
  Cloning https://github.com/huggingface/transformers to /tmp/pip-req-build-wxgfw4dt
  Running command git clone -q https://github.com/huggingface/transformers /tmp/pip-req-build-wxgfw4dt
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
Collecting tokenizers!=0.11.3,>=0.10.1
  Downloading tokenizers-0.11.4-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (6.8 MB)
[K     |████████████████████████████████| 6.8 MB 9.6 MB/s 
[?25hCollecting sacremoses
  Downloading sacremoses-0.0.47-py2.py3-none-any.whl (895 kB)
[K     |████████████████████████████████| 895 kB 58.8 MB/s 
[?25hCollecting huggingface-hub<1.0,>=0.1.0
  Downloading huggingface_hub-0.4.0-py3-none-any.whl (67 kB)
[K     |██████

In [4]:
from tokenizers import Tokenizer, normalizers, pre_tokenizers
from tokenizers.models import WordLevel
from tokenizers.pre_tokenizers import Digits, WhitespaceSplit
from tokenizers.processors import TemplateProcessing
from tokenizers.trainers import WordLevelTrainer

tokenizer = Tokenizer(WordLevel(vocab=chess_voc, unk_token="[UNK]"))

tokenizer.pre_tokenizer = WhitespaceSplit()

In [5]:
tokenizer.encode('e4 e5')

Encoding(num_tokens=2, attributes=[ids, type_ids, tokens, offsets, attention_mask, special_tokens_mask, overflowing])

In [6]:
tokenizer.encode('e4 e5 Nf3 f6 Nxe5 fxe5 Qh5+ g6 Qxe5+ Qe7').tokens

['e4', 'e5', 'Nf3', 'f6', 'Nxe5', 'fxe5', 'Qh5+', 'g6', 'Qxe5+', 'Qe7']

In [7]:
from transformers import GPT2Config, TFGPT2LMHeadModel, GPT2Tokenizer, GPT2LMHeadModel, PreTrainedTokenizerFast
# loading tokenizer from the saved model path
fast_tokenizer = PreTrainedTokenizerFast(tokenizer_object=tokenizer)

fast_tokenizer.add_special_tokens({'pad_token': '[PAD]'})

0

In [8]:
config = GPT2Config(
  vocab_size=fast_tokenizer.vocab_size,
  bos_token_id=fast_tokenizer.bos_token_id,
  eos_token_id=fast_tokenizer.eos_token_id
)
# creating the model
model = GPT2LMHeadModel(config)

model.num_parameters()
# => 57 million parameters

99416832

In [9]:
# Check that we have a GPU
!nvidia-smi

Mon Jan 31 22:11:49 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.46       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   37C    P0    26W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [10]:
# Check that PyTorch sees it
import torch
torch.cuda.is_available()

True

In [11]:
from torch.utils.data import Dataset
import os

class LineByLineTextDataset(Dataset):
    """
    This will be superseded by a framework-agnostic approach soon.
    """

    def __init__(self, tokenizer, file_path: str, block_size: int, initial_position: bool=True):
        if os.path.isfile(file_path) is False:
            raise ValueError(f"Input file path {file_path} not found")
        # Here, we do not cache the features, operating under the assumption
        # that we will soon use fast multithreaded tokenizers from the
        # `tokenizers` repo everywhere =)

        self.tokenizer = tokenizer
        self.block_size = block_size
        self.initial_position = initial_position

        with open(file_path, encoding="utf-8") as f:
            self.lines = [line for line in f.read().splitlines() if (len(line) > 0 and not line.isspace())]

    def __len__(self):
        return len(self.lines)

    def __getitem__(self, i):

        if self.initial_position:
            start = 'a2 a7 b2 b7 c2 c7 d2 d7 e2 e7 f2 f7 g2 g7 h2 h7 Ra1 Ra8 Nb1 Nb8 Bc1 Bc8 Qd1 Qd8 Ke1 Ke8 Bf1 Bf8 Ng1 Ng8 Rh1 Rh8 '
        else:
            start = ''

        encoding = self.tokenizer.encode(start + self.lines[i], add_special_tokens=True, truncation=True, max_length=self.block_size)

        encoding = {"input_ids": torch.tensor(encoding, dtype=torch.long)}

        return encoding

In [12]:
%%time

dataset = LineByLineTextDataset(
    tokenizer=fast_tokenizer,
    file_path="drive/MyDrive/chess_dataset/chess_files/games_train.txt",
    block_size=256,
)

# Without initial position

# dataset = LineByLineTextDataset(
#     tokenizer=fast_tokenizer,
#     file_path="drive/MyDrive/chess_dataset/chess_files/games_train.txt",
#     block_size=256,
#     initial_position=False,
# )

CPU times: user 1.84 s, sys: 1.35 s, total: 3.19 s
Wall time: 8.16 s


In [13]:
%%time

dataset_valid = LineByLineTextDataset(
    tokenizer=fast_tokenizer,
    file_path="drive/MyDrive/chess_dataset/chess_files/games_valid.txt",
    block_size=256,
)


# Without initial position

# dataset_valid = LineByLineTextDataset(
#     tokenizer=fast_tokenizer,
#     file_path="drive/MyDrive/chess_dataset/chess_files/games_valid.txt",
#     block_size=256,
#     initial_position=False,
# )

CPU times: user 218 ms, sys: 95.2 ms, total: 314 ms
Wall time: 1.16 s


In [None]:
from transformers import DataCollatorForLanguageModeling

data_collator = DataCollatorForLanguageModeling(
    tokenizer=fast_tokenizer, mlm=False
)

In [None]:
from transformers import Trainer, TrainingArguments

training_args = TrainingArguments(
    output_dir="./chess_model_gpt2_v0",
    overwrite_output_dir=True,
    num_train_epochs=1,
    per_gpu_train_batch_size=16,
    per_device_eval_batch_size=16,
    save_steps=10_000,
    save_total_limit=2,
    prediction_loss_only=True,
    evaluation_strategy='steps',
    eval_steps=40000,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
    eval_dataset=dataset_valid,
    data_collator=data_collator,
)

In [None]:
%%time
trainer.train()

Using deprecated `--per_gpu_train_batch_size` argument which will be removed in a future version. Using `--per_device_train_batch_size` is preferred.
Using deprecated `--per_gpu_train_batch_size` argument which will be removed in a future version. Using `--per_device_train_batch_size` is preferred.
***** Running training *****
  Num examples = 2246434
  Num Epochs = 1
  Instantaneous batch size per device = 8
  Total train batch size (w. parallel, distributed & accumulation) = 16
  Gradient Accumulation steps = 1
  Total optimization steps = 140403
Using deprecated `--per_gpu_train_batch_size` argument which will be removed in a future version. Using `--per_device_train_batch_size` is preferred.


Step,Training Loss,Validation Loss
40000,1.7374,1.656804
80000,1.5659,1.491542
120000,1.4863,1.413581


Saving model checkpoint to ./chess_model_gpt2_v0/checkpoint-10000
Configuration saved in ./chess_model_gpt2_v0/checkpoint-10000/config.json
Model weights saved in ./chess_model_gpt2_v0/checkpoint-10000/pytorch_model.bin
Saving model checkpoint to ./chess_model_gpt2_v0/checkpoint-20000
Configuration saved in ./chess_model_gpt2_v0/checkpoint-20000/config.json
Model weights saved in ./chess_model_gpt2_v0/checkpoint-20000/pytorch_model.bin
Saving model checkpoint to ./chess_model_gpt2_v0/checkpoint-30000
Configuration saved in ./chess_model_gpt2_v0/checkpoint-30000/config.json
Model weights saved in ./chess_model_gpt2_v0/checkpoint-30000/pytorch_model.bin
Deleting older checkpoint [chess_model_gpt2_v0/checkpoint-10000] due to args.save_total_limit
***** Running Evaluation *****
  Num examples = 249621
  Batch size = 16
Saving model checkpoint to ./chess_model_gpt2_v0/checkpoint-40000
Configuration saved in ./chess_model_gpt2_v0/checkpoint-40000/config.json
Model weights saved in ./chess_mo

CPU times: user 15h 47min 48s, sys: 33min 18s, total: 16h 21min 6s
Wall time: 16h 16min 13s


TrainOutput(global_step=140403, training_loss=1.7069447675529046, metrics={'train_runtime': 58573.464, 'train_samples_per_second': 38.352, 'train_steps_per_second': 2.397, 'total_flos': 2.03888558900736e+17, 'train_loss': 1.7069447675529046, 'epoch': 1.0})

In [None]:
# Save the model without the starts

# trainer.save_model('drive/MyDrive/gpt2_chess')

In [None]:
trainer.save_model("drive/MyDrive/gpt2_chess_with_starts_")

Saving model checkpoint to drive/MyDrive/gpt2_chess_with_starts
Configuration saved in drive/MyDrive/gpt2_chess_with_starts/config.json
Model weights saved in drive/MyDrive/gpt2_chess_with_starts/pytorch_model.bin


In [None]:
# Load model withou the starts

# model = GPT2LMHeadModel.from_pretrained('drive/MyDrive/gpt2_chess').to('cuda:0')

In [14]:
model = GPT2LMHeadModel.from_pretrained('drive/MyDrive/gpt2_chess_with_starts').to('cuda:0')

In [15]:
from transformers import top_k_top_p_filtering
import torch
from torch.nn import functional as F

class ChessGPT2Player:
    def __init__(self, model, tokenizer, device='cuda:0'):
        self.model = model
        self.tokenizer = tokenizer
        self._device = device
        self.ilegal_moves = []

    def next_move(self, preview_moves, legal_moves=[]):
        inputs = self.tokenizer.encode(preview_moves, return_tensors='pt').to(self._device)

        legal_moves_ids = [fast_tokenizer.encode(move)[0] for move in legal_moves]

        # Get logits from last layer
        last_layer_logits = self.model(inputs).logits[:, -1, :]

        legal_move = legal_moves_ids[0]
        legal_move_prob = -torch.inf

        for move in legal_moves_ids:
            if last_layer_logits[0][move].item() >= legal_move_prob:
                legal_move = move
                legal_move_prob = last_layer_logits[0][move].item()

        top_logits = top_k_top_p_filtering(last_layer_logits, top_k=last_layer_logits.shape[1], top_p=1.0)

        # Softmax the logits into probabilities
        probabilities = F.softmax(top_logits, dim=-1)

        # Generate next token
        generated_next_token = torch.multinomial(probabilities, num_samples=1)
        generated = torch.cat([inputs, generated_next_token], dim=-1)

        generated_legal = torch.cat([inputs, torch.Tensor([legal_move]).unsqueeze(-1).to(self._device)], dim=-1)

        # Get result
        result_string = tokenizer.decode(generated.tolist()[0]).split(' ')[-1]

        # Get result
        result_string_legal = tokenizer.decode(generated_legal.long().tolist()[0]).split(' ')[-1]

        if result_string_legal != result_string:
            print(f'Ilegal move registered: {result_string}')
            self.ilegal_moves.append((preview_moves, result_string))

        return result_string_legal

In [16]:
chess_player = ChessGPT2Player(model, fast_tokenizer)

# Validando através do jogo

In [17]:
!pip install chess

Collecting chess
  Downloading chess-1.8.0-py3-none-any.whl (147 kB)
[?25l[K     |██▎                             | 10 kB 22.6 MB/s eta 0:00:01[K     |████▌                           | 20 kB 18.9 MB/s eta 0:00:01[K     |██████▊                         | 30 kB 14.9 MB/s eta 0:00:01[K     |█████████                       | 40 kB 13.0 MB/s eta 0:00:01[K     |███████████▏                    | 51 kB 7.7 MB/s eta 0:00:01[K     |█████████████▍                  | 61 kB 7.7 MB/s eta 0:00:01[K     |███████████████▋                | 71 kB 8.5 MB/s eta 0:00:01[K     |█████████████████▉              | 81 kB 9.4 MB/s eta 0:00:01[K     |████████████████████            | 92 kB 9.2 MB/s eta 0:00:01[K     |██████████████████████▎         | 102 kB 7.5 MB/s eta 0:00:01[K     |████████████████████████▌       | 112 kB 7.5 MB/s eta 0:00:01[K     |██████████████████████████▊     | 122 kB 7.5 MB/s eta 0:00:01[K     |█████████████████████████████   | 133 kB 7.5 MB/s eta 0:00:01[K   

In [22]:
class HumanPlayer:
    def __init__(self):
        pass

    def next_move(self, preview_moves='', legal_moves=[]):

        while True:
            move = input('Digite o seu movimento: \n')

            if move not in legal_moves:
                print('Movimento inválido')
            else:
                break
                
        return move

In [19]:
import random

class RandomPlayer:
    def __init__(self):
        pass

    def next_move(self, preview_moves='', legal_moves=[]):
        return random.choice(legal_moves)

In [20]:
import chess
from IPython.display import display, HTML, clear_output
import time
import random

class ChessGame:
    def __init__(self, player_white, player_black, do_first_move=False, first_moves=['e4'], trained_with_initial_positions=True):
        if trained_with_initial_positions:
            self.game_repr = 'a2 a7 b2 b7 c2 c7 d2 d7 e2 e7 f2 f7 g2 g7 h2 h7 Ra1 Ra8 Nb1 Nb8 Bc1 Bc8 Qd1 Qd8 Ke1 Ke8 Bf1 Bf8 Ng1 Ng8 Rh1 Rh8'
        else:
            self.game_repr = ''
        self.player_white = player_white
        self.player_black = player_black
        self.board = chess.Board()

        if do_first_move:
            for move in first_moves:
                self.board.push_san(move)
            
            self.game_repr += ' ' + ' '.join(first_moves)


    def start(self, visual='svg', pause=0.1):

        try:
            while not self.board.is_game_over(claim_draw=True):
                if self.board.turn == chess.WHITE:
                    san = self.player_white.next_move(self.game_repr, self.get_legal_moves())
                else:
                    san = self.player_black.next_move(self.game_repr, self.get_legal_moves())

                if self.game_repr == '':
                    self.game_repr += san
                else:
                    self.game_repr += ' ' + san

                self.board.push_san(san)
                board_stop = display_board(self.board)
                html = "<b>Move %s %s, Play '%s':</b><br/>%s" % (
                        len(self.board.move_stack), cor(self.board.turn), san, board_stop)
                if visual is not None:
                    if visual == "svg":
                        clear_output(wait=True)
                    display(HTML(html))
                    if visual == "svg":
                        time.sleep(pause)
        except KeyboardInterrupt:
            msg = "Game interrupted!"
            return (None, msg, self.board)

        result = None
        if self.board.is_checkmate():
            msg = "checkmate: " + cor(not self.board.turn) + " wins!"
            result = not self.board.turn
        elif self.board.is_stalemate():
            msg = "draw: stalemate"
        elif self.board.is_fivefold_repetition():
            msg = "draw: 5-fold repetition"
        elif self.board.is_insufficient_material():
            msg = "draw: insufficient material"
        elif self.board.can_claim_draw():
            msg = "draw: claim"
        if visual is not None:
            print(msg)
        return (result, msg, self.board)

    def get_legal_moves(self):
        legal_moves = self.board.legal_moves.__repr__()

        legal_moves = legal_moves[legal_moves.find('(')+1:legal_moves.find(')')]

        legal_moves = legal_moves.replace(' ', '').split(',')

        return legal_moves
    

def display_board(board, use_svg=True):
    if use_svg:
        return board._repr_svg_()
    else:
        return "<pre>" + str(board) + "</pre>"

def cor(jogador):
    return 'White' if jogador == chess.WHITE else 'Black'

In [23]:
# Play against computer

human_player = HumanPlayer()

chess_game = ChessGame(human_player, chess_player, trained_with_initial_positions=True)

chess_game.start()

checkmate: White wins!


(True,
 'checkmate: White wins!',
 Board('3R2k1/5pp1/p6p/1p2p3/4B3/8/Pr3PPP/6K1 b - - 0 27'))

In [25]:
chess_game.game_repr

'a2 a7 b2 b7 c2 c7 d2 d7 e2 e7 f2 f7 g2 g7 h2 h7 Ra1 Ra8 Nb1 Nb8 Bc1 Bc8 Qd1 Qd8 Ke1 Ke8 Bf1 Bf8 Ng1 Ng8 Rh1 Rh8 e4 c5 Nf3 d6 d4 cxd4 Nxd4 Nf6 Nc3 a6 Be2 e5 Nb3 Be7 O-O O-O Be3 Be6 Qd2 Nbd7 Rfd1 Qc7 Nd5 Bxd5 exd5 b5 Na5 Nb6 Nc6 Nbxd5 Nxe7+ Qxe7 Bg5 Rfd8 Qxd5 h6 Bxf6 Qxf6 Rd2 Rac8 Rad1 Rc5 Qf3 Qxf3 Bxf3 Rdc8 Rxd6 Rxc2 Be4 Rxb2 Rd8+ Rxd8 Rxd8#'

In [26]:
# Computer agains random

random_player = RandomPlayer()

chess_game = ChessGame(chess_player, random_player, do_first_move=True)

chess_game.start()

checkmate: White wins!


(True,
 'checkmate: White wins!',
 Board('r1bqk1B1/3pPQ2/np5B/3Np3/6pp/8/PPP1NPPP/2KR3R b q - 2 16'))

In [27]:
chess_game.game_repr

'a2 a7 b2 b7 c2 c7 d2 d7 e2 e7 f2 f7 g2 g7 h2 h7 Ra1 Ra8 Nb1 Nb8 Bc1 Bc8 Qd1 Qd8 Ke1 Ke8 Bf1 Bf8 Ng1 Ng8 Rh1 Rh8 e4 h5 d4 f6 Bc4 g6 Bxg8 Rh6 Bxh6 b6 Qf3 c5 dxc5 e6 cxb6 axb6 Nc3 Be7 Nge2 f5 exf5 g5 O-O-O e5 Nd5 h4 f6 g4 fxe7 Na6 Qf7#'