#Lib

In [None]:
# Установим свежую версию TensorFlow для поддержки слоя `tf.keras.layers.MultiHeadAttention`.
!apt install --allow-change-held-packages libcudnn8=8.1.0.77-1+cuda11.2
!pip uninstall -y -q tensorflow keras tensorflow-estimator tensorflow-text
!pip install protobuf~=3.20.3
!pip install -q tensorflow_datasets
!pip install -q -U tensorflow-text tensorflow

#Import

In [None]:
# Фреймворк Tensorflow
import tensorflow_datasets as tfds
import tensorflow as tf
import tensorflow_text as text

# Расщепление выборки
from sklearn.model_selection import train_test_split

# Токенизатор
from tensorflow_text.tools.wordpiece_vocab import bert_vocab_from_dataset as bert_vocab

# Линейная алгебра
import numpy as np

# Вывод графиков
import matplotlib.pyplot as plt

# Регулярные выражения
import re

# Файловая система
import pathlib

# Отключим мешаюшие предупреждения
import warnings
warnings.filterwarnings("ignore")

#Dataset

In [None]:
examples, metadata = tfds.load('web_questions', with_info=True, as_supervised=False)

In [None]:
# Преобразуем датасет в список для удобного разделения
train_examples = list(examples['train'])

# Разделяем данные на обучающую и валидационную выборки (80% обучающая и 20% валидационная)
train_examples, val_examples = train_test_split(train_examples, test_size=0.2, random_state=42)

In [None]:
print('Размер обучающей выборки: ', len(train_examples))
print('Размер валидационной выборки: ', len(val_examples))

In [None]:
# Выводим несколько примеров из обучающей выборки
for example in train_examples[:1]:
    print('Пример:')
    print('Вопрос:', example['question'].numpy().decode('utf-8'))
    print('Ответ:', example['answers'].numpy())

#Tokinizer

In [None]:
VOCAB_SIZE = 8000
# Параметры токенизатора (lower_case - приводим к нижнему регистру)
bert_tokenizer_params=dict(lower_case=True)

# Определяем токены, с которыми работает токенизатор
# [START] - начало строки
# [END]   - конец строки
# [UNK]   - неизвестное слово
# [PAD]   - используется для выравнивания длин всех предложений
reserved_tokens=["[PAD]", "[UNK]", "[START]", "[END]"]

bert_vocab_args = dict(
    # Желаемый размер словаря
    vocab_size = VOCAB_SIZE,
    # Токены включаемые в словарь
    reserved_tokens=reserved_tokens,
    # Аргументы для `text.BertTokenizer`
    bert_tokenizer_params=bert_tokenizer_params,
    # Аргументы для `wordpiece_vocab.wordpiece_tokenizer_learner_lib.learn`
    learn_params={}, # не используем, но определяем чтобы не было ошибок
)

In [None]:
train_questions = [example['question'].numpy().decode('utf-8') for example in train_examples]

In [None]:
bert_vocab_args = dict(
    vocab_size=VOCAB_SIZE,
    reserved_tokens=reserved_tokens,
    bert_tokenizer_params=bert_tokenizer_params,
    learn_params={},
)

In [None]:
# Создаем словарь для вопросов
%%time
question_vocab = bert_vocab.bert_vocab_from_dataset(
    tf.data.Dataset.from_tensor_slices(train_questions).batch(1000).prefetch(2),
    **bert_vocab_args
)

In [None]:
print("Словарь для вопросов:")
print(question_vocab[:10])
print(question_vocab[100:110])
print(question_vocab[1000:1010])
print(question_vocab[-10:])

In [None]:
def write_vocab_file(filepath, vocab):
  with open(filepath, 'w') as f:
    for token in vocab:
      print(token, file=f)

In [None]:
# Сохраняем словарь в файл
write_vocab_file('question_vocab.txt', question_vocab)

In [None]:
# Ответы могут быть представлены списком байтовых строк, извлекаем первый ответ из списка
train_answers = [' '.join([ans.decode('utf-8') for ans in example['answers'].numpy()]) for example in train_examples]

In [None]:
# Создаем словарь для ответов
%%time
answer_vocab = bert_vocab.bert_vocab_from_dataset(
    tf.data.Dataset.from_tensor_slices(train_answers).batch(1000).prefetch(2),
    **bert_vocab_args
)

In [None]:
print("Словарь для ответов:")
print(answer_vocab[:10])
print(answer_vocab[100:110])
print(answer_vocab[1000:1010])
print(answer_vocab[-10:])

In [None]:
write_vocab_file('answer_vocab.txt', answer_vocab)

In [None]:
!ls *.txt

# Download tokinizer

In [None]:
# Загружаем токенизаторы на основе словарей вопросов и ответов
question_tokenizer = text.BertTokenizer('question_vocab.txt', **bert_tokenizer_params)
answer_tokenizer = text.BertTokenizer('answer_vocab.txt', **bert_tokenizer_params)

In [None]:
print('Отберем строки для примера:')
# Проходим по первым трем примерам из списка
for example in train_examples[:3]:  # Отбираем три примера
    answers = [ans.decode('utf-8') for ans in example['answers'].numpy()]  # Декодируем байтовые строки
    print(answers)

In [None]:
print('Отберем строки для примера:')
# Отбираем вопросы и ответы для примера
for example in train_examples[:3]:  # Отбираем первые 3 примера
    question = example['question'].numpy().decode('utf-8')
    answers = [ans.decode('utf-8') for ans in example['answers'].numpy()]

print('Вопрос:', question)
print('Ответы:', answers)

In [None]:
# Токенизируем вопрос
question_tokens = question_tokenizer.tokenize([question])
question_tokens = question_tokens.merge_dims(-2, -1)  # Объединяем оси для создания последовательности токенов

print('Токенизированный вопрос:', question_tokens.to_list())

In [None]:
# Токенизируем ответы
answer_tokens = answer_tokenizer.tokenize([answers[0]])  # Токенизируем первый ответ
answer_tokens = answer_tokens.merge_dims(-2, -1)

print('Токенизированный ответ:', answer_tokens.to_list())

In [None]:
# Обратное преобразование (детокенизация) для проверки
detokenized_question = question_tokenizer.detokenize(question_tokens)
detokenized_answer = answer_tokenizer.detokenize(answer_tokens)

print('Детокенизированный вопрос:', tf.strings.reduce_join(detokenized_question, separator=' ', axis=-1).numpy())
print('Детокенизированный ответ:', tf.strings.reduce_join(detokenized_answer, separator=' ', axis=-1).numpy())

# Len

In [None]:
lengths = []

batch_size = 1024  # Определяем размер батча
num_batches = len(train_examples) // batch_size  # Количество полных батчей

# Цикл по батчам
for i in range(num_batches + 1):  # Обрабатываем все батчи
    batch = train_examples[i * batch_size : (i + 1) * batch_size]  # Формируем батч

    # Извлекаем вопросы и ответы для текущего батча
    questions = [ex['question'].numpy().decode('utf-8') for ex in batch]
    answers = [' '.join([ans.numpy().decode('utf-8') for ans in ex['answers']]) for ex in batch]

    # Токенизируем вопросы
    question_tokens = question_tokenizer.tokenize(questions)
    lengths.append(question_tokens.row_lengths())

    # Токенизируем ответы
    answer_tokens = answer_tokenizer.tokenize(answers)
    lengths.append(answer_tokens.row_lengths())

In [None]:
# Объединяем длины всех токенов
all_lengths = np.concatenate(lengths)

# Строим гистограмму распределения длин
plt.hist(all_lengths, np.linspace(0, 500, 101))
plt.ylim(plt.ylim())

# Максимальная длина токенов
max_length = max(all_lengths)
plt.plot([max_length, max_length], plt.ylim())
plt.title(f'Максимальное количество токенов в примере: {max_length}');

In [None]:
MAX_TOKENS=80

# START END ADD

In [None]:
# Получаем индексы токенов [START] и [END]
START = tf.argmax(tf.constant(reserved_tokens) == "[START]")
END = tf.argmax(tf.constant(reserved_tokens) == "[END]")

def add_start_end(ragged):
    count = ragged.bounding_shape()[0]  # Количество предложений в батче
    starts = tf.fill([count, 1], START)  # Токены [START]
    ends = tf.fill([count, 1], END)      # Токены [END]
    return tf.concat([starts, ragged, ends], axis=1)  # Добавляем токены в начало и конец

In [None]:
for example in train_examples[:3]:  # Пример обработки первых 3 примеров
    # Извлекаем и токенизируем вопросы и ответы
    question = example['question'].numpy().decode('utf-8')
    answers = [ans.decode('utf-8') for ans in example['answers'].numpy()]

    question_tokens = question_tokenizer.tokenize([question])
    answer_tokens = answer_tokenizer.tokenize([answers[0]])

    # Приводим к двумерной форме, если это необходимо
    question_tokens = question_tokens.merge_dims(-2, -1)
    answer_tokens = answer_tokens.merge_dims(-2, -1)

    # Добавляем токены начала и конца
    question_tokens_with_start_end = add_start_end(question_tokens)
    answer_tokens_with_start_end = add_start_end(answer_tokens)

    # Детокенизация
    detokenized_question = question_tokenizer.detokenize(question_tokens_with_start_end)
    detokenized_answer = answer_tokenizer.detokenize(answer_tokens_with_start_end)

    # Объединяем токены в строку для примера
    final_question = tf.strings.reduce_join(detokenized_question, separator=' ', axis=-1)
    final_answer = tf.strings.reduce_join(detokenized_answer, separator=' ', axis=-1)

    print("Пример вопроса с токенами [START] и [END]:")
    print(final_question.numpy())  # Выведет массив строк

    print("Пример ответа с токенами [START] и [END]:")
    print(final_answer.numpy())  # Выведет массив строк

# Cleanup Text

In [None]:
def cleanup_text(reserved_tokens, token_txt):
  # Удаление токенов, кроме "[UNK]".
  # Поиск зарезервированных токенов кроме [UNK]
  bad_tokens = [re.escape(tok) for tok in reserved_tokens if tok != "[UNK]"]
  # "Плохие" токены для регулярки объединяем знаком ИЛИ (|)
  bad_token_re = "|".join(bad_tokens)

  # Ищем в строке регулярку
  bad_cells = tf.strings.regex_full_match(token_txt, bad_token_re)
  # Отсеиваем из исходной строки все найденные включения "плохих" токенов
  result = tf.ragged.boolean_mask(token_txt, ~bad_cells)

  # Сцепление строк.
  result = tf.strings.reduce_join(result, separator=' ', axis=-1)

  return result

In [None]:
# Отберем строки для примера:
for example in train_examples[:3]:  # Берем первые 3 примера
    question = example['question'].numpy().decode('utf-8')
    answers = [ans.decode('utf-8') for ans in example['answers'].numpy()]

    print(question)  # Вывод вопроса
    print(answers)   # Вывод ответов

In [None]:
# Детокенизация вопросов и ответов
detokenized_question = question_tokenizer.detokenize(question_tokens)
detokenized_answer = answer_tokenizer.detokenize(answer_tokens)

In [None]:
detokenized_question

In [None]:
cleanup_text(reserved_tokens, detokenized_question).numpy()

# Custom Tokinizer

In [None]:
class CustomTokenizer(tf.Module):
  def __init__(self, reserved_tokens, vocab_path):
    # Определяем токенизатор
    self.tokenizer = text.BertTokenizer(vocab_path, lower_case=True)
    # Зарезервированные токены
    self._reserved_tokens = reserved_tokens
    # Путь к файлу словаря
    self._vocab_path = tf.saved_model.Asset(vocab_path)
    # Читаем из файла словарь и делим по строкам
    vocab = pathlib.Path(vocab_path).read_text().splitlines()
    self.vocab = tf.Variable(vocab)

    # Для экспорта класса необходимо создать так называемые сигнатуры,
    # чтобы tensorflow понимал с какими данными он работает

    # Сигнатура для tokenize (работает с пакетами строк).
    self.tokenize.get_concrete_function(
        tf.TensorSpec(shape=[None], dtype=tf.string))

    # Сигнатура для `detokenize` и `lookup`
    # Могут работать как с `Tensors`, так и `RaggedTensors`
    # с тензорами формы [batch, tokens]
    self.detokenize.get_concrete_function(
        tf.TensorSpec(shape=[None, None], dtype=tf.int64))
    self.detokenize.get_concrete_function(
          tf.RaggedTensorSpec(shape=[None, None], dtype=tf.int64))

    self.lookup.get_concrete_function(
        tf.TensorSpec(shape=[None, None], dtype=tf.int64))
    self.lookup.get_concrete_function(
          tf.RaggedTensorSpec(shape=[None, None], dtype=tf.int64))

    # Методы `get_*` не имеют аргументов
    self.get_vocab_size.get_concrete_function()
    self.get_vocab_path.get_concrete_function()
    self.get_reserved_tokens.get_concrete_function()

    # После определения сигнатур можно определить и сами методы класса

  @tf.function
  def tokenize(self, strings):
    enc = self.tokenizer.tokenize(strings)
    # Объединяем оси `word` и `word-piece` (как в примере выше)
    enc = enc.merge_dims(-2,-1)
    enc = add_start_end(enc)
    return enc

  @tf.function
  def detokenize(self, tokenized):
    words = self.tokenizer.detokenize(tokenized)
    return cleanup_text(self._reserved_tokens, words) # очищаем перед выводом

  @tf.function
  def lookup(self, token_ids):
    return tf.gather(self.vocab, token_ids) # возвращаем явное соответствие словаря токенам

  @tf.function
  def get_vocab_size(self):
    return tf.shape(self.vocab)[0] # определяем длину словаря по нулевому индексу формы

  @tf.function
  def get_vocab_path(self):
    return self._vocab_path # получение пути к файлу словаря

  @tf.function
  def get_reserved_tokens(self):
    return tf.constant(self._reserved_tokens) # получение списка зарезервированных токенов


In [None]:
# Создание модуля TensorFlow с токенизаторами для вопросов и ответов
tokenizers = tf.Module()
tokenizers.question = CustomTokenizer(reserved_tokens, 'question_vocab.txt')
tokenizers.answer = CustomTokenizer(reserved_tokens, 'answer_vocab.txt')

In [None]:
# Сохранение токенизаторов
model_name = 'webquestions_converter'
tf.saved_model.save(tokenizers, model_name)

In [None]:
# Пример токенизации вопросов
encoded_questions = tokenizers.question.tokenize([example['question'].numpy().decode('utf-8') for example in train_examples[:3]])

In [None]:
# Вывод токенизированных вопросов
for row in encoded_questions.to_list():
    print(row)

In [None]:
# Детокенизация токенов вопросов
round_trip_questions = tokenizers.question.detokenize(encoded_questions)
for line in round_trip_questions.numpy():
    print(line.decode('utf-8'))  # Выводим восстановленные вопросы

In [None]:
# Поиск токенов для вопросов (возвращает текстовое представление токенов)
tokens_questions = tokenizers.question.lookup(encoded_questions)
tokens_questions

# Pre Dataset

In [None]:
def prepare_batch(questions, answers):
    # Токенизируем вопросы
    questions = tokenizers.question.tokenize(questions)   # Токенизация вопросов
    questions = questions[:, :MAX_TOKENS]                 # Обрезаем по длине MAX_TOKENS
    questions = questions.to_tensor()                     # Преобразуем в тензор

    # Токенизируем ответы
    answers = tokenizers.answer.tokenize(answers)
    answers = answers[:, :(MAX_TOKENS+1)]                 # Обрезаем до MAX_TOKENS+1 для учета [END]

    # Разделение на вход декодера и метки
    answer_inputs = answers[:, :-1].to_tensor()           # Убираем [END] для входа декодера
    answer_labels = answers[:, 1:].to_tensor()            # Убираем [START] для меток

    return (questions, answer_inputs), answer_labels


In [None]:
# Размер буфера в памяти при подготовке датасета
BUFFER_SIZE = 20000

# Размер пакета
BATCH_SIZE = 64

In [None]:
def make_batches(ds):
    return (
        ds
        .shuffle(BUFFER_SIZE)                     # Перемешиваем данные
        .batch(BATCH_SIZE)                        # Делим датасет на пакеты
        .map(prepare_batch, num_parallel_calls=tf.data.AUTOTUNE)  # Применяем функцию prepare_batch
        .prefetch(buffer_size=tf.data.AUTOTUNE))  # Загружаем данные заранее

In [None]:
# Преобразуем train_examples и val_examples в два списка: вопросов и ответов
train_questions = [ex['question'].numpy().decode('utf-8') for ex in train_examples]
train_answers = [ex['answers'][0].numpy().decode('utf-8') for ex in train_examples]

val_questions = [ex['question'].numpy().decode('utf-8') for ex in val_examples]
val_answers = [ex['answers'][0].numpy().decode('utf-8') for ex in val_examples]

# Создаем датасеты для вопросов и ответов
train_dataset = tf.data.Dataset.from_tensor_slices((train_questions, train_answers))
val_dataset = tf.data.Dataset.from_tensor_slices((val_questions, val_answers))

# Корректная функция для пакетов с разбиением на вопросы и ответы
def make_batches(ds):
    return (
        ds
        .shuffle(BUFFER_SIZE)                     # Перемешиваем данные
        .batch(BATCH_SIZE)                        # Делим датасет на пакеты
        .map(lambda q, a: prepare_batch(q, a), num_parallel_calls=tf.data.AUTOTUNE)  # Применяем функцию prepare_batch
        .prefetch(buffer_size=tf.data.AUTOTUNE))  # Загружаем данные заранее

# Создание пакетов данных
train_batches = make_batches(train_dataset)
val_batches = make_batches(val_dataset)

# Извлечение одного пакета данных для проверки
for (questions, answers), answer_labels in train_batches.take(1):
    break

# Вывод размеров тензоров
print(questions.shape)     # Размер вопросов
print(answers.shape)       # Размер ответов
print(answer_labels.shape) # Размер меток для ответов


In [None]:
# length - порядковый номер слова в фразе
# depth - размер пространства эмбеддинга
def positional_encoding(length, depth):
  depth = depth/2

  positions = np.arange(length)[:, np.newaxis]     # форма (seq, 1)
  depths = np.arange(depth)[np.newaxis, :]/depth   # форма (1, depth)

  angle_rates = 1 / (10000**depths)         # форма (1, depth)
  angle_rads = positions * angle_rates      # форма (pos, depth)

  pos_encoding = np.concatenate(
      [np.sin(angle_rads), np.cos(angle_rads)],
      axis=-1)

  return tf.cast(pos_encoding, dtype=tf.float32)  # указываем тип возвращаемых данных

In [None]:
# Вычислим позиционное кодирование для вопросов и ответов
pos_encoding_questions = positional_encoding(length=2048, depth=512)
pos_encoding_answers = positional_encoding(length=2048, depth=512)

# Проверим форму
print("Позиционное кодирование для вопросов:", pos_encoding_questions.shape)
print("Позиционное кодирование для ответов:", pos_encoding_answers.shape)

# Positional Embedding

In [None]:
class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.d_model = d_model
        self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True)  # Эмбеддинги токенов
        self.pos_encoding = positional_encoding(length=MAX_TOKENS, depth=d_model)         # Позиционное кодирование

    def compute_mask(self, *args, **kwargs):
        return self.embedding.compute_mask(*args, **kwargs)

    def call(self, x):
        length = tf.shape(x)[1]
        x = self.embedding(x)

        # Масштабирование эмбеддингов
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        # Добавление позиционного кодирования
        x = x + self.pos_encoding[tf.newaxis, :length, :]
        return x

In [None]:
# Создание слоя позиционного эмбеддинга для вопросов и ответов
embed_question = PositionalEmbedding(vocab_size=tokenizers.question.get_vocab_size().numpy(), d_model=512)
embed_answer = PositionalEmbedding(vocab_size=tokenizers.answer.get_vocab_size().numpy(), d_model=512)

# Применение слоев эмбеддинга к токенизированным вопросам и ответам
question_emb = embed_question(questions)  # Эмбеддинги для вопросов
answer_emb = embed_answer(answers)        # Эмбеддинги для ответов

In [None]:
question_emb._keras_mask

# Base Attention

In [None]:
class BaseAttention(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
    self.layernorm = tf.keras.layers.LayerNormalization()
    self.add = tf.keras.layers.Add()

# CrossAttention

In [None]:
class CrossAttention(BaseAttention):
  def call(self, x, context):
    # Пропускаем сигнал через многоголовое внимание
    attn_output, attn_scores = self.mha(
        query=x,                        # запрос
        key=context,                    # ключ
        value=context,                  # значение
        return_attention_scores=True)   # возвращаем оценки внимания

    # Запоминаем оценки на будущее
    self.last_attn_scores = attn_scores

    # Добавляем остаточную связь и нормализацию
    x = self.add([x, attn_output])
    x = self.layernorm(x)

    return x

In [None]:
# При инициализации через базовый класс передаем в слой tf.keras.layers.MultiHeadAttention
# параметры num_heads - число голов, key_dim - размерность ключа
sample_ca = CrossAttention(num_heads=2, key_dim=512)

print(answer_emb.shape)
print(question_emb.shape)
print(sample_ca(answer_emb, question_emb).shape)

# GlobalSelfAttention

In [None]:
class GlobalSelfAttention(BaseAttention):
  def call(self, x):
    # Пропускаем сигнал через многоголовое внимание
    attn_output = self.mha(
        query=x,  # запрос
        value=x,  # ключ
        key=x)    # значение

    # Добавляем остаточную связь и нормализацию
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x

In [None]:
sample_gsa = GlobalSelfAttention(num_heads=2, key_dim=512)

print(answer_emb.shape)
print(sample_gsa(answer_emb).shape)

#Casual Attention

In [None]:
class CausalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x,
        use_causal_mask = True)  # отличается от GlobalSelfAttention одним аргументом
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x

In [None]:
sample_csa = CausalSelfAttention(num_heads=2, key_dim=512)

print(question_emb.shape)
print(sample_csa(question_emb).shape)

In [None]:
out1 = sample_csa(embed_answer(answers[:, :3]))
out2 = sample_csa(embed_answer(answers))[:, :3]

tf.reduce_max(abs(out1 - out2)).numpy()

# Feed Forward

In [None]:
class FeedForward(tf.keras.layers.Layer):
  def __init__(self, d_model, dff, dropout_rate=0.1):
    super().__init__()
    self.seq = tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),
      tf.keras.layers.Dense(d_model),
      tf.keras.layers.Dropout(dropout_rate)
    ])
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    x = self.add([x, self.seq(x)])
    x = self.layer_norm(x)
    return x

In [None]:
sample_ffn = FeedForward(512, 2048)

print(answer_emb.shape)
print(sample_ffn(answer_emb).shape)

# Encoder Layer

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self,*, d_model, num_heads, dff, dropout_rate=0.1):
    super().__init__()

    self.self_attention = GlobalSelfAttention(
        num_heads=num_heads,  # число голов
        key_dim=d_model,      # размерность ключа
        dropout=dropout_rate) # уровень регуляризации

    self.ffn = FeedForward(d_model, dff) # число нейронов во втором и первом Dense слое, соответственно

  def call(self, x):
    x = self.self_attention(x)
    x = self.ffn(x)
    return x

In [None]:
sample_encoder_layer = EncoderLayer(d_model=512, num_heads=8, dff=2048)

print(question_emb.shape)
print(sample_encoder_layer(question_emb).shape)

In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads,
               dff, vocab_size, dropout_rate=0.1):
    super().__init__()

    # Инициируем переменные внутри класса
    self.d_model = d_model
    self.num_layers = num_layers

    # Создаем объект класса позиционного кодирования
    self.pos_embedding = PositionalEmbedding(
        vocab_size=vocab_size, d_model=d_model)

    # Создаем объект класса для слоя кодировщика
    self.enc_layers = [
        EncoderLayer(d_model=d_model,
                     num_heads=num_heads,
                     dff=dff,
                     dropout_rate=dropout_rate)
        for _ in range(num_layers)]

    # Создаем объект класса для слоя регуляризации
    self.dropout = tf.keras.layers.Dropout(dropout_rate)

  def call(self, x):
    # Форма x токена: (batch, seq_len)
    # Прогоняем последовательность токенов через слой позиционного кодирования
    x = self.pos_embedding(x)  # форма на выходе (batch_size, seq_len, d_model)

    # Прогоняем последовательность токенов через слой регуляризации
    x = self.dropout(x)

    # Прогоняем последовательность токенов через num_layers слоев кодировщика
    for i in range(self.num_layers):
      x = self.enc_layers[i](x)

    return x  # форма на выходе (batch_size, seq_len, d_model)

# Decoder Layer

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self,
               *,
               d_model,
               num_heads,
               dff,
               dropout_rate=0.1):
    super(DecoderLayer, self).__init__()

    # Слой внимания с причинно-следственной связью
    self.causal_self_attention = CausalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    # Слой с кросс-вниманием
    self.cross_attention = CrossAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    # Слой прямого распространения
    self.ffn = FeedForward(d_model, dff)

  def call(self, x, context):
    # Пропускаем последовательность токенов через:
    # Каузальный слой внимания
    x = self.causal_self_attention(x=x)
    # Слой кросс-внимания и контекстным вектором из кодировщика
    x = self.cross_attention(x=x, context=context)

    # Запомним оценки внимания на будущее
    self.last_attn_scores = self.cross_attention.last_attn_scores
    # Через слой прямого распространения
    x = self.ffn(x)  # Форма `(batch_size, seq_len, d_model)`.
    return x

# Decoder

In [None]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size,
               dropout_rate=0.1):
    super(Decoder, self).__init__()

    # Инициируем переменные внутри класса
    self.d_model = d_model
    self.num_layers = num_layers

    # Создаем объект класса позиционного кодирования
    self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size,
                                             d_model=d_model)
    # Создаем объект класса для слоя регуляризации
    self.dropout = tf.keras.layers.Dropout(dropout_rate)

    # Создаем сразу стек слоев декодировщиков с помощью генератора списков по числу слоев
    self.dec_layers = [
        DecoderLayer(d_model=d_model, num_heads=num_heads,
                     dff=dff, dropout_rate=dropout_rate)
        for _ in range(num_layers)]

    # Сбрасываем оценки внимания
    self.last_attn_scores = None

  def call(self, x, context):
    # Подаем на вход последовательность токенов x формой (batch, target_seq_len)

    # Пропускаем через слой позиционного кодирования (и конечно же эмбеддинг)
    x = self.pos_embedding(x)  # форма на выходе (batch_size, target_seq_len, d_model)

    # Регуляризация
    x = self.dropout(x)

    # Прогоняем через num_layers слоев декодировщиков
    for i in range(self.num_layers):
      x  = self.dec_layers[i](x, context)

    # Сохраняем оценки внимания из последнего слоя
    self.last_attn_scores = self.dec_layers[-1].last_attn_scores

    # Форма x на выходе (batch_size, target_seq_len, d_model)
    return x

# Transformer

In [None]:
class Transformer(tf.keras.Model):
  def __init__(self, *, num_layers, d_model, num_heads, dff,
               input_vocab_size, target_vocab_size, dropout_rate=0.1):
    super().__init__()
    # Кодировщик
    self.encoder = Encoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           vocab_size=input_vocab_size,
                           dropout_rate=dropout_rate)
    # Декодировщик
    self.decoder = Decoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           vocab_size=target_vocab_size,
                           dropout_rate=dropout_rate)
    # Конечный слой
    self.final_layer = tf.keras.layers.Dense(target_vocab_size)

  def call(self, inputs):
    # Чтобы использовать метод `.fit` для обучения модели, необходимо передать
    # все входные данные в первом аргументе
    context, x  = inputs

    # Передаем контекст в кодировщик
    context = self.encoder(context)  # форма выходных данных (batch_size, context_len, d_model)

    # Передаем контекст и целевой вектор в декодировщик
    x = self.decoder(x, context)  # форма выходных данных (batch_size, target_len, d_model)

    # Прогоняем выходные данные через финальный слой
    logits = self.final_layer(x)  # форма выходных данных (batch_size, target_len, target_vocab_size)

    try:
      # После прохождения данных через все слои необходимо удалить
      # маску, чтобы она не масштабировала, потери и метрики
      # Обработчик ошибок позволяет избежать исключений при повторной попытке удаления
      del logits._keras_mask
    except AttributeError: # отлавливаем ошибку отсутствия аттрибута
      pass

    # Возвращаем наши логиты
    return logits

In [None]:
num_layers = 4
d_model = 128
dff = 512
num_heads = 8
dropout_rate = 0.1

EPOCHS = 10

In [None]:
# Инициализация модели трансформера для WebQuestions
transformer = Transformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=tokenizers.question.get_vocab_size().numpy(),
    target_vocab_size=tokenizers.answer.get_vocab_size().numpy(),
    dropout_rate=dropout_rate)

In [None]:
# Входные данные: вопросы (input) и ответы (target)
questions = tf.random.uniform((64, 29), maxval=tokenizers.question.get_vocab_size().numpy(), dtype=tf.int32)  # Пример вопросов
answers = tf.random.uniform((64, 29), maxval=tokenizers.answer.get_vocab_size().numpy(), dtype=tf.int32)  # Пример ответов

# Прогон через модель
output = transformer((questions, answers))

# Вывод формы входных данных и выходных данных модели
print(questions.shape)  # Ожидаемая форма: (batch_size, seq_len)
print(answers.shape)    # Ожидаемая форма: (batch_size, seq_len)
print(output.shape)     # Ожидаемая форма: (batch_size, seq_len, target_vocab_size)

# Вывод оценок внимания из последнего слоя декодера
attn_scores = transformer.decoder.dec_layers[-1].last_attn_scores
print(attn_scores.shape)  # Ожидаемая форма: (batch_size, num_heads, target_seq_len, input_seq_len)

# Optimizer

In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super().__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    step = tf.cast(step, dtype=tf.float32)
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

In [None]:
learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

In [None]:
# Количество батчей для обучения
num_batches = 0
for (batch, (_,_)) in enumerate(train_batches):
  num_batches = batch
print(num_batches)

In [None]:
plt.plot(learning_rate(tf.range(num_batches*EPOCHS, dtype=tf.float32)))
plt.ylabel('Скорость обучения')
plt.xlabel('Шаг обучения')

In [None]:
plt.plot(learning_rate(tf.range(num_batches*EPOCHS*50, dtype=tf.float32)))
plt.ylabel('Скорость обучения')
plt.xlabel('Шаг обучения')

In [None]:
# Функция потерь с учетом маски
def masked_loss(label, pred):
  # Задаем маску, где метки не равны 0
  mask = label != 0
  # Определяем функцию потерь
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
  loss = loss_object(label, pred)

  # Важно чтобы mask и loss имели одинаковый тип данных
  mask = tf.cast(mask, dtype=loss.dtype)
  # Наложение маски на loss
  loss *= mask

  # Масштабирование потерь на маску
  loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
  return loss

# Функция точности с учетом маски
def masked_accuracy(label, pred):
  pred = tf.argmax(pred, axis=2)
  label = tf.cast(label, pred.dtype)
  # Оценка совпадения метки и предсказания
  match = label == pred
  # Задаем маску, где метки не равны 0
  mask = label != 0

  # Логическое И
  match = match & mask

  # Преобразуем к одному типу и масштабирование совпадений на маску
  match = tf.cast(match, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match)/tf.reduce_sum(mask)

In [None]:
# Компиляция модели с использованием настроек потерь, оптимизатора и метрики
transformer.compile(
    loss=masked_loss,
    optimizer=optimizer,
    metrics=[masked_accuracy])


In [None]:
# Обучение модели на тренировочных данных с валидацией на валидационных данных
transformer.fit(train_batches,
                epochs=15,
                validation_data=val_batches)

# Inference

In [None]:
class QABotTranslator(tf.Module):
    def __init__(self, tokenizers, transformer):
        self.tokenizers = tokenizers  # Токенизаторы для вопросов и ответов
        self.transformer = transformer  # Модель трансформера

    def __call__(self, question, max_length=MAX_TOKENS):
        # Проверяем, что вопрос является тензором
        assert isinstance(question, tf.Tensor)
        if len(question.shape) == 0:
            question = question[tf.newaxis]

        # Токенизация вопроса
        question = self.tokenizers.question.tokenize(question).to_tensor()
        encoder_input = question

        # Инициализация с токена [START] для генерации ответа
        start_end = self.tokenizers.answer.tokenize([''])[0]
        start = start_end[0][tf.newaxis]
        end = start_end[1][tf.newaxis]

        # Используем tf.TensorArray для динамического цикла генерации ответа
        output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
        output_array = output_array.write(0, start)

        for i in tf.range(max_length):
            # На каждом шаге используем ранее сгенерированные токены для генерации следующего токена
            output = tf.transpose(output_array.stack())
            predictions = self.transformer([encoder_input, output], training=False)

            # Получаем последний предсказанный токен
            predictions = predictions[:, -1:, :]
            predicted_id = tf.argmax(predictions, axis=-1)

            # Добавляем предсказанный токен к выходной последовательности
            output_array = output_array.write(i + 1, predicted_id[0])

            # Прерываем цикл, если сгенерирован токен [END]
            if predicted_id == end:
                break

        # Преобразование сгенерированных токенов в текст
        output = tf.transpose(output_array.stack())
        text = self.tokenizers.answer.detokenize(output)[0]

        # Преобразуем токены в человекочитаемый формат
        tokens = self.tokenizers.answer.lookup(output)[0]

        # Вычисляем внимание на последнем шаге
        self.transformer([encoder_input, output[:, :-1]], training=False)
        attention_weights = self.transformer.decoder.last_attn_scores

        return text, tokens, attention_weights

In [None]:
# Пример использования чат-бота для вопрос-ответ с WebQuestions
qa_translator = QABotTranslator(tokenizers=tokenizers, transformer=transformer)

In [None]:
def get_answer(question):
    """
    Функция принимает вопрос в виде строки и возвращает ответ, сгенерированный моделью.
    """
    # Преобразуем вопрос в тензор
    question_tensor = tf.constant(question)

    # Получаем предсказанный ответ
    translated_text, translated_tokens, attention_weights = qa_translator(question_tensor)

    # Возвращаем ответ
    return translated_text.numpy().decode('utf-8')

# Пример вызова функции
question = "Who is the president of the USA?"
answer = get_answer(question)

print(f"Вопрос: {question}")
print(f"Ответ: {answer}")


In [None]:
question = "where is angola located?"
answer = get_answer(question)

print(f"Вопрос: {question}")
print(f"Ответ: {answer}")