<a href="https://colab.research.google.com/github/anarlavrenov/n1/blob/main/n1_tln_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [None]:
import torch

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
import sys
def init_packages() -> None:

  functions_path = "/PATH_TO_YOUR_PROJECT"
  sys.path.append(functions_path)

init_packages()

In [None]:
# import locale
# locale.getpreferredencoding = lambda: "UTF-8"

!pip install datasets --quiet

from datasets import load_dataset
import pandas as pd
from typing import Tuple, List

def create_dataset(n_train_samples: int, n_valid_samples: int) -> Tuple[List, List]:

  dataset = load_dataset("Helsinki-NLP/opus-100", "en-uk", split="train[:30%]")

  train_df = pd.DataFrame(dataset["translation"])[:n_train_samples]
  valid_df = pd.DataFrame(dataset["translation"])[n_train_samples: n_train_samples + n_valid_samples]

  return train_df, valid_df

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/510.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━[0m [32m337.9/510.5 kB[0m [31m9.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m510.5/510.5 kB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m18.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.1/194.1 kB[0m [31m25.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m19.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import re

def preprocess_text(row: str) -> str:

  row = re.sub(r'https?:\/\/\S+|www\.[a-zA-Z0-9\-\.]+\.[a-zA-Z]+', '', row)
  row = re.sub(r'\s+', ' ', row).strip()
  row = row.replace("/", "").strip()
  row = row.replace(")", "").strip()
  row = row.replace("(", "").strip()

  return row

In [None]:
from utils import PositionalEncoding
import math

class Encoder(torch.nn.Module):
  def __init__(self, num_layers: int, d_model: int, nhead: int,
               dff: int, ntokens_src: int, dropout: float = 0.5):
    super(Encoder, self).__init__()

    self.embedding = torch.nn.Embedding(num_embeddings=ntokens_src,
                                        embedding_dim=d_model,
                                        padding_idx=0)

    self.pos_encoding = PositionalEncoding(d_model=d_model,
                                           dropout=dropout)

    encoder_layer = torch.nn.TransformerEncoderLayer(d_model=d_model,
                                                           nhead=nhead,
                                                           dim_feedforward=dff,
                                                           dropout=dropout,
                                                           norm_first=True)

    self.encoder = torch.nn.TransformerEncoder(encoder_layer=encoder_layer,
                                               num_layers=num_layers)


    self.d_model = d_model

    self.linear_glu = torch.nn.Linear(in_features=d_model,
                    out_features=d_model * 2)

  def forward(self, src: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
    # src -> src_seq_len, batch_size
    src = self.embedding(src) * math.sqrt(self.d_model)
    src = self.pos_encoding(src)

    src = torch.nn.functional.glu(self.linear_glu(src), dim=-1) # Застосування GLU

    if mask is None:
      mask = torch.nn.Transformer.generate_square_subsequent_mask(sz=len(src)).to(device)

    encoder_output = self.encoder(src, mask)

    return encoder_output # -> Tensor shape: src_seq_len, batch_size, d_model


class Decoder(torch.nn.Module):
  def __init__(self, num_layers: int, d_model: int, nhead: int,
               dff: int, ntokens_tgt: int, dropout: float = 0.5):
    super(Decoder, self).__init__()

    self.embedding = torch.nn.Embedding(num_embeddings=ntokens_tgt,
                                        embedding_dim=d_model,
                                        padding_idx=0)

    self.pos_encoding = PositionalEncoding(d_model=d_model,
                                           dropout=dropout)

    decoder_layer = torch.nn.TransformerDecoderLayer(d_model=d_model,
                                                      nhead=nhead,
                                                      dim_feedforward=dff,
                                                      dropout=dropout,
                                                      norm_first=True)

    self.decoder = torch.nn.TransformerDecoder(decoder_layer=decoder_layer,
                                               num_layers=num_layers)


    self.fc = torch.nn.Linear(in_features=d_model,
                              out_features=ntokens_tgt)

    self.d_model = d_model

    self.linear_glu = torch.nn.Linear(in_features=d_model,
                    out_features=d_model * 2)

  def forward(self, tgt: torch.Tensor, memory: torch.Tensor,
              tgt_mask: torch.Tensor = None, memory_mask: torch.Tensor = None):

    tgt = self.embedding(tgt) * math.sqrt(self.d_model)
    tgt = self.pos_encoding(tgt)

    tgt = torch.nn.functional.glu(self.linear_glu(tgt), dim=-1) # Застосування GLU

    if tgt_mask is None:
      tgt_mask = torch.nn.Transformer.generate_square_subsequent_mask(len(tgt)).to(device)

    if memory_mask is None:
      memory_mask = torch.zeros((tgt.size(1), memory.size(0))).to(device)

    decoder_output = self.decoder(tgt, memory,
                                  tgt_mask=tgt_mask, memory_key_padding_mask=memory_mask)


    output = self.fc(decoder_output) # -> Tensor shape: tgt_seq_len, batch_size, ntokens

    return output


class Transformer(torch.nn.Module):
  def __init__(self, num_layers_encoder: int, num_layers_decoder: int, d_model: int, nhead: int,
               dff: int, ntokens_src: int, ntokens_tgt: int, dropout: float = 0.5):
    super(Transformer, self).__init__()

    self.encoder = Encoder(num_layers_encoder, d_model, nhead, dff, ntokens_src)
    self.decoder = Decoder(num_layers_decoder, d_model, nhead, dff, ntokens_tgt)


  def forward(self, src: torch.Tensor, tgt: torch.Tensor):

    memory = self.encoder(src)
    decoder_output = self.decoder(tgt, memory)

    return decoder_output

In [None]:
train_df, valid_df = create_dataset(n_train_samples=150000,
                                    n_valid_samples=1000)

In [None]:
train_df = train_df.rename(columns={"en": "EN", "uk": "UK"})
valid_df = valid_df.rename(columns={"en": "EN", "uk": "UK"})

In [None]:
# Лімітування довжин текстів через квантиль 90 відсотків для запобігання вибросам

import numpy as np

maxlen_uk = int(np.quantile([len(x.split()) for x in train_df["UK"]], q=0.9))
maxlen_en = int(np.quantile([len(x.split()) for x in train_df["EN"]], q=0.9))

train_df = train_df[train_df["UK"].str.split().str.len() < maxlen_uk]
train_df = train_df[train_df["EN"].str.split().str.len() < maxlen_en]

valid_df = valid_df[valid_df["UK"].str.split().str.len() < maxlen_uk]
valid_df = valid_df[valid_df["EN"].str.split().str.len() < maxlen_en]

In [None]:
train_df["EN"] = [preprocess_text(x) for x in train_df["EN"]]
train_df["UK"] = [preprocess_text(x) for x in train_df["UK"]]


valid_df["EN"] = [preprocess_text(x) for x in valid_df["EN"]]
valid_df["UK"] = [preprocess_text(x) for x in valid_df["UK"]]

In [None]:
train_df.shape, maxlen_uk, maxlen_en

((392780, 2), 16, 19)

In [None]:
# Формування функцій токенизації текстів

# !python -m spacy download uk_core_news_trf
# !python -m spacy download en_core_web_trf

import spacy
import torchtext
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from typing import Callable


def tokenize(input_data: List[str], nlp) -> torch.Tuple[Callable[[str], List[str]], torchtext.vocab.Vocab]:

  def tokenizer(text: str) -> List[str]:
    return [tok.text for tok in nlp.tokenizer(text)]

  data_iter = iter(input_data)
  vocab = build_vocab_from_iterator(map(tokenizer, data_iter), specials=["<unk>"])
  vocab.set_default_index(vocab["<unk>"])

  return tokenizer, vocab

spacy.prefer_gpu()
tokenizer_uk, vocab_uk = tokenize(train_df["UK"], nlp=spacy.load("uk_core_news_trf"))
tokenizer_en, vocab_en = tokenize(train_df["EN"], nlp=spacy.load("en_core_web_trf"))



In [None]:
# Формування датасету PyTorch

class DataWrapper(torch.utils.data.Dataset):
  def __init__(self, uk: List[str], en: List[str]):
    super(DataWrapper, self).__init__()

    start_token_uk = [len(vocab_uk)]
    end_token_uk = [len(vocab_uk) + 1]

    start_token_en = [len(vocab_en)]
    end_token_en = [len(vocab_en) + 1]

    self.uk = uk
    self.en = en

    self.uk_ = [vocab_uk(tokenizer_uk(word)) for word in self.uk]
    self.en_ = [vocab_en(tokenizer_en(word)) for word in self.en]

    self.uk_ = np.asarray([self.pad_sequences(seq, maxlen_uk,
                                                start_token_uk, end_token_uk) for seq in self.uk_])
    self.en_ = np.asarray([self.pad_sequences(seq, maxlen_en,
                                                 start_token_en, end_token_en) for seq in self.en_])

  def __len__(self):

    return len(self.uk_)


  def __getitem__(self, index: int):
    return self.uk_[index], self.en_[index]


  def pad_sequences(self, seq, max_len: int, start_token, end_token):
    if max_len > len(seq):
      padding = [0] * (max_len - len(seq))

      return start_token + seq + end_token + padding

    else:
      return start_token + seq[:max_len] + end_token

In [None]:
train_dataset = DataWrapper(train_df["UK"],
                            train_df["EN"])

valid_dataset = DataWrapper(valid_df["UK"],
                            valid_df["EN"])

In [None]:
# Формування даталоадеру PyTorch

train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=128,
                                           shuffle=True,
                                           num_workers=2,
                                           drop_last=True)

valid_loader = torch.utils.data.DataLoader(valid_dataset,
                                           batch_size=16,
                                           shuffle=False,
                                           num_workers=2,
                                           drop_last=True)

In [None]:
# Ініціалізація трансформеру

num_layers_encoder = 2
num_layers_decoder = 2
d_model = 256
nhead = 8
dff = 512
ntokens_src = len(vocab_uk) + 2
ntokens_tgt = len(vocab_en) + 2
dropout = 0.5

model = Transformer(num_layers_encoder, num_layers_decoder,
                    d_model, nhead, dff, ntokens_src, ntokens_tgt, dropout=dropout)

for param in model.parameters():
  if param.dim() > 1:
    torch.nn.init.xavier_uniform_(param)

model = model.to(device)

In [None]:
src = next(iter(train_loader))[0].long().to(device)
tgt = next(iter(train_loader))[1].long().to(device)

In [None]:
print(f" Вихідний розмір прогноза трансформеру: {dec_res.shape}, Початковий розмір таргету: {tgt.permute(1, 0).shape} \n"
      f" Такий розмір повинен мати pred: {dec_res.view(-1, ntokens).shape} "
      f"і таргет: {tgt.permute(1, 0).reshape(-1).shape} для функциї CrossEntropy")

 Выход прогноза модели имеет размерность: torch.Size([21, 128, 143400]), изначальная размерность таргета: torch.Size([21, 128]) 
 Такую размерность должен иметь pred: torch.Size([2688, 143400]) и таргет: torch.Size([2688]) для функции CrossEntropy


In [None]:
lr = 0.1

criterion = torch.nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.AdamW(model.parameters())
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.9)

In [None]:
# Функція інференсу після навчання моделі

def summarize(string: str, model: torch.nn.Module,
              repetition_penalty: float = 1.2) -> torch.Tensor:

  model.eval()

  start_token_uk = [len(vocab_uk)]
  end_token_uk = [len(vocab_uk) + 1]

  start_token_en = [len(vocab_en)]
  end_token_en = [len(vocab_en) + 1]

  string = torch.IntTensor(start_token_uk + [vocab_uk(tokenizer_uk(word))[0] for word in string.split()] + end_token_uk).unsqueeze(0).to(device)
  output = torch.IntTensor(start_token_en).unsqueeze(0).to(device)

  with torch.no_grad():

    for i in range(maxlen_en):

      prediction = model(string.permute(1, 0), output.permute(1, 0))

      prediction = prediction[-1:, :, :]

      if i > 1:
        # repetition penalty
        for token_id in set(output.squeeze().tolist()):
          prediction[0, 0, token_id] /= repetition_penalty

      predicted_id = torch.argmax(prediction, dim=-1)

      if predicted_id[0] == end_token_en[0]:
        return output.squeeze(0)

      output = torch.cat([output, predicted_id.permute(1, 0)], dim=-1)

    return output.squeeze(0)

In [None]:
# Функції навчання моделі на трейні та валідації

from tqdm import tqdm

def train(loader: torch.Tensor) -> float:

  model.train()

  total_loss = 0

  for batch in tqdm(loader):

    optimizer.zero_grad()

    src, tgt = batch[0].to(device), batch[1].to(device)

    tgt_inp = tgt[:, :-1].permute(1, 0)
    tgt_real = tgt[:, 1:].permute(1, 0)

    outputs = model(src.permute(1, 0), tgt_inp)
    loss = criterion(outputs.view(-1, ntokens_tgt), tgt_real.reshape(-1))

    total_loss += loss.item()

    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.7)
    optimizer.step()

  return total_loss / len(loader)


def eval_(loader: torch.Tensor) -> float:

  model.eval()

  total_loss = 0

  with torch.no_grad():

    for batch in tqdm(loader):

      src, tgt = batch[0].to(device), batch[1].to(device)

      tgt_inp = tgt[:, :-1].permute(1, 0)
      tgt_real = tgt[:, 1:].permute(1, 0)

      outputs = model(src.permute(1, 0), tgt_inp)
      loss = criterion(outputs.view(-1, ntokens_tgt), tgt_real.reshape(-1))

      total_loss += loss.item()

  res = summarize(valid_df["UK"].iloc[15], model=model)

  # Відпринтовування поточного результату інференса моделі на даній епосі навчання
  print(" ".join([vocab_en.get_itos()[word] for word in res[1:]]))

  return total_loss / len(loader)

In [None]:
# Запуск циклу навчання трансформерної моделі

epochs = 5

for epoch in range(epochs):
  loss = train(train_loader)
  valid_loss = eval_(valid_loader)
  print(f"epoch: {epoch + 1} | loss: {loss:.3f} | valid_loss: {valid_loss:.3f}")

  scheduler.step()

100%|██████████| 3068/3068 [04:29<00:00, 11.36it/s]
100%|██████████| 27/27 [00:00<00:00, 56.83it/s]


The external operation and implementation of the educational programmes shall be carried out in the main principles of those
epoch: 1 | loss: 4.464 | valid_loss: 3.345


100%|██████████| 3068/3068 [04:30<00:00, 11.33it/s]
100%|██████████| 27/27 [00:00<00:00, 56.62it/s]


The operation and implementation of the National Register shall be carried out with the basic principles .
epoch: 2 | loss: 3.122 | valid_loss: 2.865


100%|██████████| 3068/3068 [04:30<00:00, 11.34it/s]
100%|██████████| 27/27 [00:00<00:00, 56.25it/s]


The management and implementation of the National programmes shall be carried out with such basic principles .
epoch: 3 | loss: 2.535 | valid_loss: 2.715


100%|██████████| 3068/3068 [04:30<00:00, 11.35it/s]
100%|██████████| 27/27 [00:00<00:00, 55.57it/s]


Formation and implementation of the National Programme shall be performed with the following basic principles :
epoch: 4 | loss: 2.160 | valid_loss: 2.667


100%|██████████| 3068/3068 [04:30<00:00, 11.36it/s]
100%|██████████| 27/27 [00:00<00:00, 57.29it/s]


Formation and implementation of the National Programme shall be carried out with such principles .
epoch: 5 | loss: 1.896 | valid_loss: 2.656


In [None]:
# Перевірка інференсу

res = summarize(valid_df["UK"].iloc[330], model=model)

" ".join([vocab_en.get_itos()[word] for word in res[1:]])

'Documents confirming his parents who supported him wash their rivals .'

In [None]:
valid_df["EN"].iloc[330]

'documents confirming their legal succession;'

In [None]:
# Зберігання результатів

import dill

torch.save(model, "/YOUR_PROJECT_PATH/model.pth")

torch.save(optimizer.state_dict(), "/YOUR_PROJECT_PATH/optimizer_state_dict.pth")

with open("/YOUR_PROJECT_PATH/vocab_en.pkl", "wb") as f:
  dill.dump(vocab_en, f)


with open("/YOUR_PROJECT_PATH/vocab_uk.pkl", "wb") as f:
  dill.dump(vocab_uk, f)