In [1]:
%%capture
!pip install datasets
!pip install tokenizers
!pip install torchmetrics

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torch.optim.lr_scheduler import LambdaLR
import torchtext.datasets as datasets
import math

import warnings
from tqdm import tqdm
import os
from pathlib import Path

from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace

import torchmetrics

In [3]:
class NormalizationLayer(nn.Module):
    def __init__(self, features: int, eps:float=10**-6) -> None:
        super().__init__()
        self.eps = eps
        self.alpha = nn.Parameter(torch.ones(features))
        self.bias = nn.Parameter(torch.zeros(features))
    def forward(self, x):
        # x: (batch, seq_len, hidden_size)
        mean = x.mean(dim = -1, keepdim = True) # (batch, seq_len, 1)
        std = x.std(dim = -1, keepdim = True) # (batch, seq_len, 1)
        return self.alpha * (x - mean) / (std + self.eps) + self.bias

class FeedForwardBlock(nn.Module):
    def __init__(self, d_model: int, d_hidden: int, dropout: float) -> None:
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_hidden)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_hidden, d_model)
    def forward(self, x):
        # (batch, seq_len, d_model) --> (batch, seq_len, d_hidden) --> (batch, seq_len, d_model)
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class InputEmbBlock(nn.Module):
    def __init__(self, d_model: int, vocab_size: int) -> None:
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        # (batch, seq_len) --> (batch, seq_len, d_model)
        return self.embedding(x) * math.sqrt(self.d_model)

class PositionalEncodingBlock(nn.Module):
    def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)
        # Создаем тензор (seq_len, d_model)
        pe = torch.zeros(seq_len, d_model)
        # Создаем тензор (seq_len)
        pos = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)
        # Создаем тензор (d_model)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        # Применяем синус к четным индексам
        pe[:, 0::2] = torch.sin(pos * div_term)
        # Применяем косинус к нечетным индексам
        pe[:, 1::2] = torch.cos(pos * div_term)
        # Добавьте размер батча в позиционное кодирование
        pe = pe.unsqueeze(0) # (1, seq_len, d_model)
        # Зарегистрируем позиционное кодирование как буфер
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False) # (batch, seq_len, d_model)
        return self.dropout(x)

class ResConnection(nn.Module):
        def __init__(self, features: int, dropout: float) -> None:
            super().__init__()
            self.dropout = nn.Dropout(dropout)
            self.norm = NormalizationLayer(features)
        def forward(self, x, sublayer):
            return x + self.dropout(sublayer(self.norm(x)))

class MultiHeadAttention(nn.Module):

    def __init__(self, d_model: int, heads_count: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.heads_count = heads_count # количество "голов"
        assert d_model % heads_count == 0, "d_model не делится на heads_count"

        self.d_k = d_model // heads_count
        self.w_query = nn.Linear(d_model, d_model, bias=False) # Wq
        self.w_key = nn.Linear(d_model, d_model, bias=False) # Wk
        self.w_value = nn.Linear(d_model, d_model, bias=False) # Wv
        self.w_o = nn.Linear(d_model, d_model, bias=False) # Wo
        self.dropout = nn.Dropout(dropout)

    @staticmethod
    def attention(query, key, value, mask, dropout: nn.Dropout):
        d_query = query.shape[-1]
        # (batch, heads_count, seq_len, d_k) --> (batch, heads_count, seq_len, seq_len)
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_query)
        if mask is not None:
            attention_scores.masked_fill_(mask == 0, -1e9)
        attention_scores = attention_scores.softmax(dim=-1) # (batch, heads_count, seq_len, seq_len)
        if dropout is not None:
            attention_scores = dropout(attention_scores)
        # (batch, heads_count, seq_len, seq_len) --> (batch, heads_count, seq_len, d_k)
        return (attention_scores @ value), attention_scores

    def forward(self, q, k, v, mask):
        query = self.w_query(q) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        key = self.w_key(k) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        value = self.w_value(v) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        # (batch, seq_len, d_model) --> (batch, seq_len, heads_count, d_k) --> (batch, heads_count, seq_len, d_k)
        query = query.view(query.shape[0], query.shape[1], self.heads_count, self.d_k).transpose(1, 2)
        key = key.view(key.shape[0], key.shape[1], self.heads_count, self.d_k).transpose(1, 2)
        value = value.view(value.shape[0], value.shape[1], self.heads_count, self.d_k).transpose(1, 2)
        # вычисляем внимание
        x, self.attention_scores = MultiHeadAttention.attention(query, key, value, mask, self.dropout)
        # собираем все "головы" вместе
        # (batch, heads_count, seq_len, d_k) --> (batch, seq_len, heads_count, d_k) --> (batch, seq_len, d_model)
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.heads_count * self.d_k)
        # Умножаем на Wo
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        return self.w_o(x)

class EncoderBlock(nn.Module):
    def __init__(self, features: int, self_attention_block: MultiHeadAttention, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResConnection(features, dropout) for _ in range(2)])
    def forward(self, x, src_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))
        x = self.residual_connections[1](x, self.feed_forward_block)
        return x

class Encoder(nn.Module):
    def __init__(self, features: int, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = NormalizationLayer(features)
    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

class DecoderBlock(nn.Module):
    def __init__(self, features: int, self_attention_block: MultiHeadAttention, cross_attention_block: MultiHeadAttention, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResConnection(features, dropout) for _ in range(3)])
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))
        x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x, encoder_output, encoder_output, src_mask))
        x = self.residual_connections[2](x, self.feed_forward_block)
        return x

class Decoder(nn.Module):
    def __init__(self, features: int, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = NormalizationLayer(features)
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

class ProjectionLayer(nn.Module):
    def __init__(self, d_model, vocab_size) -> None:
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)
    def forward(self, x) -> None:
        # (batch, seq_len, d_model) --> (batch, seq_len, vocab_size)
        return self.proj(x)

class Transformer(nn.Module):
    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbBlock, tgt_embed: InputEmbBlock, src_pos: PositionalEncodingBlock, tgt_pos: PositionalEncodingBlock, projection_layer: ProjectionLayer) -> None:
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.src_pos = src_pos
        self.tgt_pos = tgt_pos
        self.projection_layer = projection_layer
    def encode(self, src, src_mask):
        # (batch, seq_len, d_model)
        src = self.src_embed(src)
        src = self.src_pos(src)
        return self.encoder(src, src_mask)
    def decode(self, encoder_output: torch.Tensor, src_mask: torch.Tensor, tgt: torch.Tensor, tgt_mask: torch.Tensor):
        # (batch, seq_len, d_model)
        tgt = self.tgt_embed(tgt)
        tgt = self.tgt_pos(tgt)
        return self.decoder(tgt, encoder_output, src_mask, tgt_mask)
    def project(self, x):
        # (batch, seq_len, vocab_size)
        return self.projection_layer(x)

def build_transformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int, tgt_seq_len: int, d_model: int=512, N: int=6, h: int=8, dropout: float=0.1, d_ff: int=2048) -> Transformer:
    # создаем embedding слой
    src_embed = InputEmbBlock(d_model, src_vocab_size)
    tgt_embed = InputEmbBlock(d_model, tgt_vocab_size)
    # создаем слои позиционного кодирования
    src_pos = PositionalEncodingBlock(d_model, src_seq_len, dropout)
    tgt_pos = PositionalEncodingBlock(d_model, tgt_seq_len, dropout)
    # создаем блоки энкодера
    encoder_blocks = []
    for _ in range(N):
        encoder_self_attention_block = MultiHeadAttention(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        encoder_block = EncoderBlock(d_model, encoder_self_attention_block, feed_forward_block, dropout)
        encoder_blocks.append(encoder_block)
    # Создаем блоки декодера
    decoder_blocks = []
    for _ in range(N):
        decoder_self_attention_block = MultiHeadAttention(d_model, h, dropout)
        decoder_cross_attention_block = MultiHeadAttention(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        decoder_block = DecoderBlock(d_model, decoder_self_attention_block, decoder_cross_attention_block, feed_forward_block, dropout)
        decoder_blocks.append(decoder_block)
    # создаем енкодер и декодер
    encoder = Encoder(d_model, nn.ModuleList(encoder_blocks))
    decoder = Decoder(d_model, nn.ModuleList(decoder_blocks))
    # создем слой проекции
    projection_layer = ProjectionLayer(d_model, tgt_vocab_size)
     # создем трансформер
    transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos, tgt_pos, projection_layer)
    # инициализируем параметры трансформера
    for p in transformer.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)
    return transformer

In [4]:
class TranslatorDataset(Dataset):
    def __init__(self, ds, tok_src, tok_tgt, seq_len):
        super().__init__()
        self.seq_len = seq_len
        self.ds = ds
        self.tok_src = tok_src
        self.tok_tgt = tok_tgt
        self.sos = torch.tensor([tok_tgt.token_to_id("[SOS]")], dtype=torch.int64)
        self.eos = torch.tensor([tok_tgt.token_to_id("[EOS]")], dtype=torch.int64)
        self.pad = torch.tensor([tok_tgt.token_to_id("[PAD]")], dtype=torch.int64)

    def __len__(self):
        return len(self.ds)

    def __getitem__(self, idx):
        src_text = self.ds[idx]['translation']['en']
        tgt_text = self.ds[idx]['translation']['ru']
         # Преобразуем текст в токены
        enc_inp = self.tok_src.encode(src_text).ids
        dec_inp = self.tok_tgt.encode(tgt_text).ids
        # Добавляем sos, eos и padding в каждое предложение
        enc_num_padd = self.seq_len - len(enc_inp) - 2
        dec_num_padd = self.seq_len - len(dec_inp) - 1
        if enc_num_padd < 0 or dec_num_padd < 0:
            raise ValueError("Предложение слишком длинное")
        encoder_input = torch.cat([self.sos,torch.tensor(enc_inp, dtype=torch.int64),
                self.eos,torch.tensor([self.pad] * enc_num_padd, dtype=torch.int64),], dim=0,)
        decoder_input = torch.cat([ self.sos, torch.tensor(dec_inp, dtype=torch.int64),
                torch.tensor([self.pad] * dec_num_padd, dtype=torch.int64),], dim=0,)
        label = torch.cat([torch.tensor(dec_inp, dtype=torch.int64), self.eos,
                torch.tensor([self.pad] * dec_num_padd, dtype=torch.int64),], dim=0,)
        return {
            "encoder_input": encoder_input,
            "decoder_input": decoder_input,
            "encoder_mask": (encoder_input != self.pad).unsqueeze(0).unsqueeze(0).int(), # (1, 1, seq_len)
            "decoder_mask": (decoder_input != self.pad).unsqueeze(0).int() & causal_mask(decoder_input.size(0)), # (1, seq_len) & (1, seq_len, seq_len),
            "label": label,  # (seq_len)
            "src_text": src_text,
            "tgt_text": tgt_text,
        }

def causal_mask(size):
    mask = torch.triu(torch.ones((1, size, size)), diagonal=1).type(torch.int)
    return mask == 0

def get_all_sentences(ds, lang):
    for item in ds:
        yield item['translation'][lang]

def build_tokenizer(config, ds, lang):
    tokenizer_path = Path(config['tokenizer_file'].format(lang))
    tokenizer = Tokenizer(WordLevel(unk_token="[UNK]"))
    tokenizer.pre_tokenizer = Whitespace()
    trainer = WordLevelTrainer(special_tokens=["[UNK]", "[PAD]", "[SOS]", "[EOS]"], min_frequency=2)
    tokenizer.train_from_iterator(get_all_sentences(ds, lang), trainer=trainer)
    tokenizer.save(str(tokenizer_path))
    return tokenizer

def get_dataset(config):
    ds_inintial = load_dataset("opus_books", "en-ru", split='train')
    ds_inintial=ds_inintial.select(range(config['data_count']))
    ds_inintial.to_json("data.json")

    # Создание токенизаторов
    tok_src = build_tokenizer(config, ds_inintial, "en")
    tok_tgt = build_tokenizer(config, ds_inintial, "ru")

    # 90% на обучение, 10% на тест
    train_size = int(0.9 * len(ds_inintial))
    val_size = len(ds_inintial) - train_size
    train_ds_initial, val_ds_initial = random_split(ds_inintial, [train_size, val_size])

    train_ds = TranslatorDataset(train_ds_initial, tok_src, tok_tgt, config['seq_len'])
    val_ds = TranslatorDataset(val_ds_initial, tok_src, tok_tgt,  config['seq_len'])

    train_dataloader = DataLoader(train_ds, batch_size=config['batch_size'], shuffle=True)
    val_dataloader = DataLoader(val_ds, batch_size=1, shuffle=False)

    return train_dataloader, val_dataloader, tok_src, tok_tgt


In [5]:
def get_weights_file_path(config, epoch: str):
    model_folder = 'opus_books_weights'
    model_filename = f"{config['model_basename']}{epoch}.pt"
    return str(Path('.') / model_folder / model_filename)


def get_decode(model, source, src_mask, tok_src, tok_tgt, max_len, device):
    # Предварительно вычисляем выходные данные кодировщика и повторно используем их для каждого шага
    encoder_output = model.encode(source, src_mask)
    # Инициализируем вход декодера с помощью токена sos
    decoder_input = torch.empty(1, 1).fill_(tok_tgt.token_to_id('[SOS]')).type_as(source).to(device)
    while True:
        if decoder_input.size(1) == max_len:
            break
        #построить маску для цели
        decoder_mask = causal_mask(decoder_input.size(1)).type_as(src_mask).to(device)
        # вычисляем выход
        out = model.decode(encoder_output, src_mask, decoder_input, decoder_mask)
        # получаем следующий токен
        _, next_word = torch.max(model.project(out[:, -1]), dim=1)
        decoder_input = torch.cat([decoder_input, torch.empty(1, 1).type_as(source).fill_(next_word.item()).to(device)], dim=1)
        if next_word == tok_tgt.token_to_id('[EOS]'):
            break
    return decoder_input.squeeze(0)

def calculate_metrics(predicted, expected):
  # Compute the char error rate
  metric = torchmetrics.CharErrorRate()
  cer = metric(predicted, expected)
  print('validation cer', cer.item())
  # Compute the word error rate
  metric = torchmetrics.WordErrorRate()
  wer = metric(predicted, expected)
  print('validation wer', wer.item())
  # Compute the BLEU metric
  metric = torchmetrics.BLEUScore()
  bleu = metric(predicted, expected)
  print('validation BLEU', bleu.item())


def make_test(model, validation_ds, tok_src, tok_tgt, max_len, device, num_examples=4):
    model.eval()
    count = 0
    source_texts = []
    expected = []
    predicted = []

    with torch.no_grad():
        for batch in validation_ds:
            count += 1
            encoder_input = batch["encoder_input"].to(device) # (b, seq_len)
            encoder_mask = batch["encoder_mask"].to(device) # (b, 1, 1, seq_len)

            model_out = get_decode(model, encoder_input, encoder_mask, tok_src, tok_tgt, max_len, device)

            src_text = batch["src_text"][0]
            tgt_text = batch["tgt_text"][0]
            model_out = tok_tgt.decode(model_out.detach().cpu().numpy())

            source_texts.append(src_text)
            expected.append(tgt_text)
            predicted.append(model_out)

            print(f"{f'Текст English : ':>12}{src_text}")
            print(f"{f'Текст Russian: ':>12}{tgt_text}")
            print(f"{f'Прогноз трансформера: ':>12}{model_out}")
            if count == num_examples:
                print('-------------------------------------------------')
                break
    calculate_metrics(predicted, expected)

def train_model(config):
    # Define the device
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print("Using device:", device)
    device = torch.device(device)
    Path('opus_books_weights').mkdir(parents=True, exist_ok=True)

    train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt = get_dataset(config)
    model = build_transformer(tokenizer_src.get_vocab_size(), tokenizer_tgt.get_vocab_size(), config["seq_len"], config['seq_len'], d_model=config['d_model'])
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'], eps=1e-9)
    loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer_src.token_to_id('[PAD]'), label_smoothing=0.1).to(device)

    for epoch in range(0, config['num_epochs']):
        torch.cuda.empty_cache()
        model.train()
        batch_iterator = tqdm(train_dataloader, desc=f"Processing Epoch {epoch:02d}")
        for batch in batch_iterator:
            encoder_input = batch['encoder_input'].to(device) # (b, seq_len)
            decoder_input = batch['decoder_input'].to(device) # (B, seq_len)
            encoder_mask = batch['encoder_mask'].to(device) # (B, 1, 1, seq_len)
            decoder_mask = batch['decoder_mask'].to(device) # (B, 1, seq_len, seq_len)
            # Пропускаем тензоры через кодер, декодер и слой проекции.
            encoder_output = model.encode(encoder_input, encoder_mask) # (B, seq_len, d_model)
            decoder_output = model.decode(encoder_output, encoder_mask, decoder_input, decoder_mask) # (B, seq_len, d_model)
            proj_output = model.project(decoder_output) # (B, seq_len, vocab_size)
            # Вычисляем потери, используя простую перекрестную энтропию
            label = batch['label'].to(device) # (B, seq_len)
            loss = loss_fn(proj_output.view(-1, tokenizer_tgt.get_vocab_size()), label.view(-1))
            batch_iterator.set_postfix({"loss": f"{loss.item():6.3f}"})

            loss.backward()
            optimizer.step()
            optimizer.zero_grad(set_to_none=True)
        # Выполняем валидацию в конце каждой эпохи
        make_test(model, val_dataloader, tokenizer_src, tokenizer_tgt, config['seq_len'], device)

        # Сохраняем модель в конце каждой эпохи
        model_filename =str(Path('.') /'opus_books_weights'/f"{'model'}{epoch}.pt" )
        torch.save({'model_state_dict': model.state_dict()},model_filename)


In [6]:
config = {
        "batch_size": 8,
        "num_epochs": 20,
        "lr": 10**-4,
        "seq_len": 350,
        "d_model": 512,
        "data_count":15000,
        "tokenizer_file": "tokenizer_{0}.json",
    }
train_model(config)

Using device: cuda


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading readme:   0%|          | 0.00/25.2k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/2.92M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/17496 [00:00<?, ? examples/s]

Creating json from Arrow format:   0%|          | 0/15 [00:00<?, ?ba/s]

Processing Epoch 00:  26%|██▌       | 435/1688 [02:45<07:57,  2.62it/s, loss=5.899]


KeyboardInterrupt: 