# Цель
Написать трансформер с нуля для перевода с английского на русский


# Dependencies

In [None]:
%%capture

# !apt-get install unzip
# !pip install nltk
!pip install keras --upgrade
!pip install tokenizers
!pip install torch==2.0.1
!pip install tokenizers matplotlib scikit-learn

In [None]:
import os
os.environ["KERAS_BACKEND"] = "torch"

import keras
import torch
print(f"{keras.__version__ = }")
print(f"{torch.__version__ = }")

keras.__version__ = '3.1.1'
torch.__version__ = '2.0.1+cu117'


In [None]:
import tensorflow as tf
import keras
from tokenizers import BertWordPieceTokenizer

from tokenizers import Tokenizer
from tokenizers.models import WordPiece
from tokenizers.pre_tokenizers import Whitespace
from tokenizers import normalizers
from tokenizers.normalizers import Lowercase
from tokenizers.trainers import WordPieceTrainer
from tokenizers import decoders

import os
import re
import pandas as pd
import numpy as np
import nltk
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedShuffleSplit, train_test_split
from string import punctuation
from collections import Counter
import matplotlib.pyplot as plt
%matplotlib inline

##preparation

In [None]:
%%capture

!wget https://data.statmt.org/opus-100-corpus/v1.0/supervised/en-ru/opus.en-ru-train.ru
!wget https://data.statmt.org/opus-100-corpus/v1.0/supervised/en-ru/opus.en-ru-train.en
!wget https://data.statmt.org/opus-100-corpus/v1.0/supervised/en-ru/opus.en-ru-test.ru
!wget https://data.statmt.org/opus-100-corpus/v1.0/supervised/en-ru/opus.en-ru-test.en

In [None]:
text = open('opus.en-ru-train.ru').read().replace('\xa0', ' ')
f = open('opus.en-ru-train.ru', 'w')
f.write(text)
f.close()

In [None]:
en_sents = open('opus.en-ru-train.en').read().lower().splitlines()
ru_sents = open('opus.en-ru-train.ru').read().lower().splitlines()

In [None]:
tokenizer_en = Tokenizer(WordPiece(), )
tokenizer_en.normalizer = normalizers.Sequence([Lowercase()])
tokenizer_en.pre_tokenizer = Whitespace()

trainer_en = WordPieceTrainer(
          vocab_size=30000, special_tokens=["[UNK]", "[PAD]"])
tokenizer_en.train(files=["opus.en-ru-train.en"], trainer=trainer_en )

tokenizer_ru = Tokenizer(WordPiece(), )
tokenizer_ru.normalizer = normalizers.Sequence([Lowercase()])
tokenizer_ru.pre_tokenizer = Whitespace()

trainer_ru = WordPieceTrainer(
          vocab_size=30000, special_tokens=["[UNK]", "[PAD]", "[START]", "[END]", ])
tokenizer_ru.train(files=["opus.en-ru-train.ru"], trainer=trainer_ru )

In [None]:
tokenizer_en.decoder = decoders.WordPiece()
tokenizer_ru.decoder = decoders.WordPiece()

In [None]:
tokenizer_en.save('tokenizer_en')
tokenizer_ru.save('tokenizer_ru')

In [None]:
tokenizer_en = Tokenizer.from_file("tokenizer_en")
tokenizer_ru = Tokenizer.from_file("tokenizer_ru")

In [None]:
def encode(text, tokenizer, target=False):
    if target:
        return [tokenizer.token_to_id('[START]')] + tokenizer.encode(text).ids + \
                [tokenizer.token_to_id('[END]')]
    else:
        return tokenizer.encode(text).ids


In [None]:
X_en = [encode(t, tokenizer_en) for t in en_sents]
X_ru = [encode(t, tokenizer_ru, True) for t in ru_sents]

In [None]:
max_len_en = np.max([len(x) for x in X_en])
max_len_ru = np.max([len(x) for x in X_ru])

In [None]:
max_len_en, max_len_ru

(17863, 19389)

In [None]:
# обратите внимание, что в seq2seq длины могут быть разными
max_len_en = int(np.percentile([len(x) for x in X_en], 95))
max_len_ru = int(np.percentile([len(x) for x in X_ru], 95))

In [None]:
PAD_IDX = tokenizer_ru.token_to_id('[PAD]')
PAD_IDX

1

In [None]:
X_en = keras.preprocessing.sequence.pad_sequences(
              X_en, maxlen=max_len_en, padding='post', value=PAD_IDX)

X_ru_out = keras.preprocessing.sequence.pad_sequences(
              [x[1:] for x in X_ru], maxlen=max_len_ru-1, padding='post',
              value=PAD_IDX)

X_ru_dec = keras.preprocessing.sequence.pad_sequences(
              [x[:-1] for x in X_ru], maxlen=max_len_ru-1,
              padding='post', value=PAD_IDX)

In [None]:
X_en.shape, X_ru_out.shape

((1000000, 45), (1000000, 47))

In [None]:
(X_en_train, X_en_valid,
X_ru_dec_train, X_ru_dec_valid,
X_ru_out_train, X_ru_out_valid) = train_test_split(X_en,
                                                  X_ru_dec,
                                                  X_ru_out,
                                                  test_size=0.05)

##transformer code

In [None]:
def scaled_dot_product_attention(query, key, value, mask):
    # Считаем скалярное произведение между запросом (query) и ключом (key), транспонируя ключ
    matmul_qk = keras.ops.matmul(query, keras.ops.transpose(key, axes=[0, 1, 3, 2]))

    # Получаем глубину (размерность) ключа и преобразуем ее во float
    depth = keras.ops.cast(key.shape[-1], torch.float32)

    # Делим результат скалярного произведения на квадратный корень из глубины
    # Это делается для уменьшения влияния больших значений и стабилизации градиентов во время обучения
    logits = matmul_qk / keras.ops.sqrt(depth)

    # Если есть маска, применяем ее к логитам, чтобы обнулить нежелательные значения
    if mask is not None:
        logits += (mask * -1e9)

    # Применяем функцию softmax для получения весов внимания
    attention_weights = keras.ops.softmax(logits, axis=-1)

    # Умножаем веса внимания на значения (value) для получения итогового результата
    output = keras.ops.matmul(attention_weights, value)

    return output

In [None]:
class MultiHeadAttention(keras.layers.Layer):

    def __init__(self, d_model, num_heads, name="multi_head_attention"):
        super(MultiHeadAttention, self).__init__(name=name)
        self.num_heads = num_heads  # количество голов для внимания
        self.d_model = d_model  # размерность вектора модели

        # Убеждаемся, что размерность модели делится нацело на количество голов
        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads  # размерность каждой головы

        # Создаем полносвязные слои для запроса, ключа и значения
        self.query_dense = keras.layers.Dense(units=d_model)
        self.key_dense = keras.layers.Dense(units=d_model)
        self.value_dense = keras.layers.Dense(units=d_model)

        # Создаем последний полносвязный слой
        self.dense = keras.layers.Dense(units=d_model)

    def split_heads(self, inputs, batch_size):
        # Разделяем входные данные на головы
        inputs = keras.ops.reshape(
            inputs, newshape=(batch_size, -1, self.num_heads, self.depth))
        return keras.ops.transpose(inputs, axes=[0, 2, 1, 3])

    def call(self, inputs):
        query, key, value, mask = inputs['query'], inputs['key'], inputs[
            'value'], inputs.get('mask', None)
        batch_size = query.shape[0]

        # Пропускаем запрос, ключ и значение через соответствующие полносвязные слои
        query = self.query_dense(query)
        key = self.key_dense(key)
        value = self.value_dense(value)

        # Разделяем запрос, ключ и значение на головы
        # то есть просто разрезаем вектора на num_heads частей
        # и сравниваем все части между собой
        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        # Выполняем механизм внимания с масштабированным скалярным произведением
        scaled_attention = scaled_dot_product_attention(query, key, value, mask)

        scaled_attention = keras.ops.transpose(scaled_attention, axes=[0, 2, 1, 3])

        # Объединяем головы вместе (склеиваем векторы в один)
        concat_attention = keras.ops.reshape(scaled_attention,
                                      (batch_size, -1, self.d_model))

        # Пропускаем объединенное внимание через дополнительный полносвязный слой
        # Он просто добавляет сложности нашей модели
        outputs = self.dense(concat_attention)

        return outputs


In [None]:
def create_padding_mask(x):
    mask = keras.ops.cast(keras.ops.equal(x, PAD_IDX), torch.float32)
    # (batch_size, 1, 1, sequence length)
    return mask[:, None, None, :]

In [None]:
def create_look_ahead_mask(x):
    # эта функция немножко сложная, но суть у нее достаточно простая
    # нужно создать треугольную маску, с помощью которой мы закроем
    # для каждого токена все последующие токены
    seq_len = x.shape[1]
    ones_mask = keras.ops.ones((1, seq_len, seq_len), dtype="int32")
    row_index = keras.ops.cumsum(ones_mask, axis=-2)
    col_index = keras.ops.cumsum(ones_mask, axis=-1)
    look_ahead_mask = ~ keras.ops.greater_equal(row_index, col_index)
    padding_mask = create_padding_mask(x)
    return keras.ops.maximum(look_ahead_mask, padding_mask)

In [None]:
class PositionalEncoding(keras.layers.Layer):

    def __init__(self, position, d_model):
        super(PositionalEncoding, self).__init__()
        self.pos_encoding = self.positional_encoding(position, d_model)

    def get_angles(self, position, i, d_model):
        angles = 1 / keras.ops.power(10000, (2 * (i // 2)) / d_model)
        return keras.ops.multiply(position, angles)

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(
            position=torch.arange(0, position, dtype=torch.float32)[:, None],
            i=torch.arange(0, d_model, dtype=torch.float32)[None, :],
            d_model=d_model)
        sines = keras.ops.sin(angle_rads[:, 0::2])
        cosines = keras.ops.cos(angle_rads[:, 1::2])

        pos_encoding = keras.ops.concatenate([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[None, ...]
        return keras.ops.cast(pos_encoding, torch.float32)

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :inputs.shape[1], :]

In [None]:
def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
    inputs = keras.Input(shape=(None, d_model), name="inputs")
    padding_mask = keras.Input(shape=(1, 1, None), name="padding_mask")

    #call_mha
    attention = MultiHeadAttention(
      d_model, num_heads, name="attention")({
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': padding_mask
      })
    attention = keras.layers.Dropout(rate=dropout)(attention)
    attention = keras.layers.LayerNormalization(
      epsilon=1e-6)(inputs + attention)

    outputs = keras.layers.Dense(units=units, activation='relu')(attention)
    outputs = keras.layers.Dense(units=d_model)(outputs)
    outputs = keras.layers.Dropout(rate=dropout)(outputs)
    outputs = keras.layers.LayerNormalization(
      epsilon=1e-6)(attention + outputs)

    return keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)

In [None]:
def encoder(vocab_size,
            num_layers,
            units,
            d_model,
            num_heads,
            dropout,
            max_len,
            name="encoder"):
    inputs = keras.Input(shape=(max_len,), name="inputs")
    padding_mask = keras.Input(shape=(1, 1, None), name="padding_mask")

    embeddings = keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= keras.ops.sqrt(keras.ops.cast(d_model, torch.float32))
    embeddings = PositionalEncoding(max_len, d_model)(embeddings)

    #inputs (они тут называются outputs но это просто такой нейминг,
    # этот параметр передается в encoder_layer первым и encoder_layer будет считать его inputs)
    # outputs он тут называется для удобства, так как он будет перезаписываться
    # чтобы на вход следующему блоку подавать уже не эмбединги,
    # а то что получится как результат предыдущего блока
    outputs = keras.layers.Dropout(rate=dropout)(embeddings)

    for i in range(num_layers):
        outputs = encoder_layer(
            units=units,
            d_model=d_model,
            num_heads=num_heads,
            dropout=dropout,
            name="encoder_layer_{}".format(i),
        )([outputs, padding_mask])

    return keras.Model(inputs=[inputs, padding_mask], outputs=outputs, name=name)

In [None]:
def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
    inputs = keras.Input(shape=(None, d_model), name="inputs")
    enc_outputs = keras.Input(shape=(None, d_model), name="encoder_outputs")
    look_ahead_mask = keras.Input(
      shape=(1, None, None), name="look_ahead_mask")
    padding_mask = keras.Input(shape=(1, 1, None), name='padding_mask')

    attention1 = MultiHeadAttention(
      d_model, num_heads, name="attention_1")(inputs={
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': look_ahead_mask
      })
    attention1 = keras.layers.LayerNormalization(
      epsilon=1e-6)(attention1 + inputs)

    attention2 = MultiHeadAttention(
      d_model, num_heads, name="attention_2")(inputs={
          'query': attention1,
          'key': enc_outputs,
          'value': enc_outputs,
          'mask': padding_mask
      })
    attention2 = keras.layers.Dropout(rate=dropout)(attention2)
    attention2 = keras.layers.LayerNormalization(
      epsilon=1e-6)(attention2 + attention1)

    outputs = keras.layers.Dense(units=units, activation='relu')(attention2)
    outputs = keras.layers.Dense(units=d_model)(outputs)
    outputs = keras.layers.Dropout(rate=dropout)(outputs)
    outputs = keras.layers.LayerNormalization(
      epsilon=1e-6)(outputs + attention2)

    return keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)

In [None]:
def decoder(vocab_size,
            num_layers,
            units,
            d_model,
            num_heads,
            dropout,
            max_len,
            name='decoder'):
    inputs = keras.Input(shape=(max_len,), name='inputs')
    enc_outputs = keras.Input(shape=(None, d_model), name='encoder_outputs')
    look_ahead_mask = keras.Input(
      shape=(1, None, None), name='look_ahead_mask')
    padding_mask = keras.Input(shape=(1, 1, None), name='padding_mask')

    embeddings = keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= keras.ops.sqrt(keras.ops.cast(d_model, torch.float32))
    embeddings = PositionalEncoding(max_len, d_model)(embeddings)

    outputs = keras.layers.Dropout(rate=dropout)(embeddings)

    for i in range(num_layers):
        outputs = decoder_layer(
            units=units,
            d_model=d_model,
            num_heads=num_heads,
            dropout=dropout,
            name='decoder_layer_{}'.format(i),
        )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])

    return keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)

In [None]:
def transformer(vocab_size,
                num_layers,
                units,
                d_model,
                num_heads,
                dropout,
                max_len,
                name="transformer"):
    inputs = keras.Input(shape=(max_len[0],), name="inputs")
    dec_inputs = keras.Input(shape=(max_len[1]-1,), name="dec_inputs")

    enc_padding_mask = keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='enc_padding_mask')(inputs)

    look_ahead_mask = keras.layers.Lambda(
      create_look_ahead_mask,
      output_shape=(1, None, None),
      name='look_ahead_mask')(dec_inputs)

    dec_padding_mask = keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='dec_padding_mask')(inputs)

    enc_outputs = encoder(
                          vocab_size=vocab_size[0],
                          num_layers=num_layers,
                          units=units,
                          d_model=d_model,
                          num_heads=num_heads,
                          dropout=dropout,
                          max_len=max_len[0],
                        )(inputs=[inputs, enc_padding_mask])

    dec_outputs = decoder(
                          vocab_size=vocab_size[1],
                          num_layers=num_layers,
                          units=units,
                          d_model=d_model,
                          num_heads=num_heads,
                          dropout=dropout,
                          max_len=max_len[1]-1,
                        )(inputs=[dec_inputs, enc_outputs,
                                  look_ahead_mask, dec_padding_mask])

    outputs = keras.layers.Dense(units=vocab_size[1], name="outputs")(dec_outputs)

    return keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)

In [None]:
L  = keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none',)

def loss_function(y_true, y_pred):
    loss = L(y_true, y_pred)

    mask = keras.ops.cast(keras.ops.not_equal(y_true, PAD_IDX), torch.float32)
    loss = keras.ops.multiply(loss, mask)

    return keras.ops.mean(loss)

##training

In [None]:
keras.backend.clear_session()

# small model
NUM_LAYERS = 2
D_MODEL = 256
NUM_HEADS = 8

UNITS = 512
DROPOUT = 0.1


# average model
# NUM_LAYERS = 6
# D_MODEL = 512
# NUM_HEADS = 8
# UNITS = 2048
# DROPOUT = 0.1


model = transformer(
    vocab_size=(tokenizer_en.get_vocab_size(),tokenizer_ru.get_vocab_size()),
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT,
    max_len=[max_len_en, max_len_ru])


optimizer = keras.optimizers.Adam(
    0.001, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy(y_true, y_pred):
    return keras.metrics.sparse_categorical_accuracy(y_true, y_pred)


model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])
checkpoint = keras.callbacks.ModelCheckpoint('model_ruen.weights.h5',
                                            monitor='val_loss',
                                            verbose=1,
                                            save_weights_only=True,
                                            save_best_only=True,
                                            mode='min',
                                            save_freq='epoch')

In [None]:
import gc
torch.cuda.empty_cache()
gc.collect()

0

In [None]:
# model.load_weights('model_ruen.weights.h5')

In [None]:
model.fit((X_en_train, X_ru_dec_train), X_ru_out_train,
             validation_data=((X_en_valid, X_ru_dec_valid), X_ru_out_valid),
             batch_size=200,
             epochs=1,
             callbacks=[checkpoint]
             )

[1m4750/4750[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 344ms/step - accuracy: 0.0904 - loss: 1.6003
Epoch 1: val_loss improved from inf to 0.99517, saving model to model_ruen.weights.h5
[1m4750/4750[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1682s[0m 354ms/step - accuracy: 0.0904 - loss: 1.6003 - val_accuracy: 0.1474 - val_loss: 0.9952


<keras.src.callbacks.history.History at 0x7fe53fe3f2e0>

## translation

In [None]:
@torch.no_grad()
def translate_optim(texts: list): # я попытался оптимизировать, но так и не понял как генерировать несколько предложений

    input_ids = []

    for text in texts: # закодируем каждый текст
      input_ids.append(
          encode(text, tokenizer_en, target=False)
      )

    input_ids = keras.ops.cast(keras.preprocessing.sequence.pad_sequences(
                                      [input_ids], maxlen=max_len_en, padding='post',
                                       # важно не забыть паддинг с нужным id
                                       value=PAD_IDX), torch.int32)



    output_ids = [tokenizer_ru.token_to_id('[START]') ]

    preds = model((input_ids, keras.ops.cast(keras.preprocessing.sequence.pad_sequences(
                                      [output_ids], maxlen=max_len_ru-1, padding='post',
                                       # важно не забыть паддинг с нужным id
                                       value=PAD_IDX), torch.int32))).cpu().numpy()


    translated = []

    while True:
        if len(output_ids) >= max_len_ru:
          break
        # можно занизить скор тэга UNK чтобы он никогда не генерировался
        preds[:, :, tokenizer_ru.token_to_id('[UNK]')] = -100

        output_ids.append(preds.argmax(2)[0][-1])
        preds = model((input_ids, keras.ops.cast(keras.preprocessing.sequence.pad_sequences(
                                      [output_ids], maxlen=max_len_ru-1, padding='post',
                                       # важно не забыть паддинг с нужным id
                                       value=PAD_IDX), torch.int32))).cpu().numpy()

    translated.append(tokenizer_ru.decode(output_ids[1:],))

    return translated


In [None]:
@torch.no_grad()
def translate(text):
    input_ids = encode(text, tokenizer_en, target=False)
    input_ids = keras.ops.cast(keras.preprocessing.sequence.pad_sequences(
                                      [input_ids], maxlen=max_len_en, padding='post',
                                       # важно не забыть паддинг с нужным id
                                       value=PAD_IDX), torch.int32)


    output_ids = [tokenizer_ru.token_to_id('[START]') ]

    pred = model((input_ids, keras.ops.cast(keras.preprocessing.sequence.pad_sequences(
                                      [output_ids], maxlen=max_len_ru-1, padding='post',
                                       # важно не забыть паддинг с нужным id
                                       value=PAD_IDX), torch.int32))).cpu().numpy()

    while pred.argmax(2)[0][-1] not in [tokenizer_ru.token_to_id('[END]')]:
        if len(output_ids) >= max_len_ru:
            break
        # можно занизить скор тэга UNK чтобы он никогда не генерировался
        pred[:, :, tokenizer_ru.token_to_id('[UNK]')] = -100

        output_ids.append(pred.argmax(2)[0][-1])
        pred = model((input_ids, keras.ops.cast(keras.preprocessing.sequence.pad_sequences(
                                      [output_ids], maxlen=max_len_ru-1, padding='post',
                                       # важно не забыть паддинг с нужным id
                                       value=PAD_IDX), torch.int32))).cpu().numpy()

    return tokenizer_ru.decode(output_ids[1:], )


In [None]:
translate("Transformer")

'mod'

In [None]:
translate("can you translate this sentence?")

'можете перевести это??'

In [None]:
translate("please translate this sentence into russian")

'пожалуйста перевести эту эту предложение в предложение'