# Seq2seq для машинного перевода

В этом блокноте рассматриваются некоторые подходы к задаче машинного перевода с помощью

* Рекуррентных сетей
* Рекуррентных сетей с механизмом внимания
* Трасформеров

Первый и последний подходы уже реализованы. Вам предлагается реализовать второй подход, а именно интегрировать механизм внимания в рекуррентную НС. Для лучшего понимания рекомендуем ознакомиться [со следющей статьей](https://arxiv.org/pdf/1409.0473). Какой конкретно тип механизма внимания реализовывать остается на выбор студенту.

Для оценок трех рассмотренных подходов реализовать метрику BLEU. В качестве тестовой выборки можно использовать валидационный набор. Или произвести требуемое разделение самостоятельно.

В качестве резюме (на 3 балла):

1. Разобраться в задаче и в коде
2. Добавить внимание к рекуррентной сети
3. Реализовать BLEU
4. Сравнить полученные 3 модели между собой.
5. **Опционально (+1 балл)**: продемонстрировать alignment между словами на исходном и целевом языках (аналогично Figure 3 в предложенной статье).
6. **Опционально (+1 балл)**: сравнить 3 полученных модели (по метрике) между собой на парах различной длины. Например, вычислить метрики на коротких, средних и длинных предложениях. Если средних/длинных предложений нет в выборке -- сгенерировать самостоятельно, например через LLM.

# Загрузка данных

In [1]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip

--2025-06-04 03:42:28--  http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.141.207, 142.251.2.207, 74.125.137.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.141.207|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2638744 (2.5M) [application/zip]
Saving to: ‘spa-eng.zip’


2025-06-04 03:42:28 (168 MB/s) - ‘spa-eng.zip’ saved [2638744/2638744]



In [2]:
text_file = "spa-eng/spa.txt"
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    english, spanish = line.split("\t")
    spanish = "[start] " + spanish + " [end]"
    text_pairs.append((english, spanish))

In [3]:
import random
print(random.choice(text_pairs))

("You're my enemy.", '[start] Ustedes son mis enemigas. [end]')


In [4]:
import random
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

**Векторизация пар**

In [5]:
import tensorflow as tf
import string
import re
from tensorflow import keras
from tensorflow.keras import layers

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")

vocab_size = 15000
sequence_length = 20

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)

**Подготовка датасетов**

In [6]:
batch_size = 64

def format_dataset(eng, spa):
    eng = source_vectorization(eng)
    spa = target_vectorization(spa)
    return ({
        "english": eng,
        "spanish": spa[:, :-1],
    }, spa[:, 1:])

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [7]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['spanish'].shape: {inputs['spanish'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['english'].shape: (64, 20)
inputs['spanish'].shape: (64, 20)
targets.shape: (64, 20)


# RNN сеть

In [8]:
from tensorflow import keras
from tensorflow.keras import layers

embed_dim = 256
latent_dim = 1024

source = keras.Input(shape=(None,), dtype="int64", name="english")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)
encoded_source = layers.Bidirectional(
    layers.GRU(latent_dim), merge_mode="sum")(x)

In [9]:
past_target = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
x = decoder_gru(x, initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation="softmax")(x)
seq2seq_rnn = keras.Model([source, past_target], target_next_step)

In [10]:
seq2seq_rnn.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds)

Epoch 1/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m215s[0m 160ms/step - accuracy: 0.1413 - loss: 5.2539 - val_accuracy: 0.1572 - val_loss: 3.8899
Epoch 2/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m208s[0m 160ms/step - accuracy: 0.1600 - loss: 3.8847 - val_accuracy: 0.1884 - val_loss: 3.2694
Epoch 3/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m208s[0m 160ms/step - accuracy: 0.1857 - loss: 3.3296 - val_accuracy: 0.2062 - val_loss: 2.9089
Epoch 4/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m215s[0m 165ms/step - accuracy: 0.2028 - loss: 2.9544 - val_accuracy: 0.2220 - val_loss: 2.6460
Epoch 5/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m256s[0m 160ms/step - accuracy: 0.2167 - loss: 2.6702 - val_accuracy: 0.2325 - val_loss: 2.4666
Epoch 6/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m208s[0m 160ms/step - accuracy: 0.2286 - loss: 2.4352 - val_accuracy: 0.2403 - val_loss:

<keras.src.callbacks.history.History at 0x78d000423950>

**Пример перевода с помощью RNN сети**

In [11]:
import numpy as np
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])
        next_token_predictions = seq2seq_rnn.predict(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(next_token_predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

-
You're very alert.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 264ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[start] sos muy [UNK] [end]
-
He's better than me at math.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 49ms/step
[start] Él es mejor que yo en matemáticas [end]
-
She i

# RNN + attention

In [12]:
class AttentionDecoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embed_dim, latent_dim):
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, embed_dim, mask_zero=True)
        self.gru = tf.keras.layers.GRU(latent_dim * 2, return_state=True, return_sequences=True)
        self.attention = BahdanauAttention(latent_dim)
        self.dropout = tf.keras.layers.Dropout(0.5)
        self.dense = tf.keras.layers.Dense(vocab_size, activation="softmax")

    @tf.function
    def call(self, decoder_inputs, encoder_outputs, initial_state):
        embedded_inputs = self.embedding(decoder_inputs)
        batch_size = tf.shape(decoder_inputs)[0]
        max_len = tf.shape(decoder_inputs)[1]

        all_outputs = tf.TensorArray(dtype=tf.float32, size=max_len)
        state = initial_state

        def loop_body(t, outputs_ta, state):
            current_input = embedded_inputs[:, t:t+1]  # (batch_size, 1, embed_dim)
            context_vector, _ = self.attention(encoder_outputs, state)
            context_vector = tf.expand_dims(context_vector, 1)  # (batch_size, 1, context_dim)
            concat_input = tf.concat([current_input, context_vector], axis=-1)
            output, state = self.gru(concat_input, initial_state=state)
            outputs_ta = outputs_ta.write(t, output)
            return t + 1, outputs_ta, state

        t0 = tf.constant(0)
        _, outputs_ta, _ = tf.while_loop(
            lambda t, *_: t < max_len,
            loop_body,
            [t0, all_outputs, state]
        )

        decoder_outputs = outputs_ta.stack()  # (time, batch, hidden)
        decoder_outputs = tf.transpose(decoder_outputs, [1, 0, 2])  # (batch, time, hidden)
        decoder_outputs = self.dropout(decoder_outputs)
        decoder_outputs = self.dense(decoder_outputs)  # (batch, time, vocab)
        return decoder_outputs

In [13]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.W1 = tf.keras.layers.Dense(units)  # для encoder output
        self.W2 = tf.keras.layers.Dense(units)  # для decoder hidden state
        self.V = tf.keras.layers.Dense(1)

    def call(self, encoder_outputs, hidden_state):
        # encoder_outputs: (batch_size, seq_len, enc_units * 2)
        # hidden_state: (batch_size, dec_units * 2)

        hidden_state_expanded = tf.expand_dims(hidden_state, 1)  # (batch_size, 1, dec_units * 2)

        # Вычисление attention scores
        score = self.V(tf.nn.tanh(
            self.W1(encoder_outputs) + self.W2(hidden_state_expanded)
        ))  # (batch_size, seq_len, 1)

        attention_weights = tf.nn.softmax(score, axis=1)  # (batch_size, seq_len, 1)

        # Взвешенное суммирование encoder outputs
        context_vector = attention_weights * encoder_outputs  # (batch_size, seq_len, enc_units * 2)
        context_vector = tf.reduce_sum(context_vector, axis=1)  # (batch_size, enc_units * 2)

        return context_vector, attention_weights

In [14]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Гиперпараметры
latent_dim = 512
embed_dim = 256
vocab_size = 10000

# Входы
encoder_inputs = keras.Input(shape=(None,), name="english")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(encoder_inputs)
encoder_outputs, forward_h, backward_h = layers.Bidirectional(
    layers.GRU(latent_dim, return_sequences=True, return_state=True),
    merge_mode="concat")(x)
encoder_state = layers.Concatenate()([forward_h, backward_h])

decoder_inputs = keras.Input(shape=(None,), name="spanish")

# Подключаем кастомный attention decoder
attention_decoder = AttentionDecoder(vocab_size, embed_dim, latent_dim)
decoder_outputs = attention_decoder(decoder_inputs, encoder_outputs, encoder_state)

model = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
model.fit(train_ds, epochs=15, validation_data=val_ds)

1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)` method on your layer. It should create all variables used by the layer (e.g. by calling `layer.build()` on all its children layers).
Exception encountered: ''in user code:

    File "<ipython-input-12-a328b88a09e8>", line 24, in loop_body  *
        output, state = self.gru(concat_input, initial_state=state)
    File "/usr/local/lib/python3.11/dist-packages/keras/src/utils/traceback_utils.py", line 122, in error_handler  **
        raise e.with_traceback(filtered_tb) from None

    OperatorNotAllowedInGraphError: Exception encountered when calling GRU.call().
    
    [1mIterating over a symbolic `tf.Tensor` is not allowed. You can attempt the following resolutions to the problem: If you are running

OperatorNotAllowedInGraphError: Exception encountered when calling AttentionDecoder.call().

[1mCould not automatically infer the output shape / dtype of 'attention_decoder' (of type AttentionDecoder). Either the `AttentionDecoder.call()` method is incorrect, or you need to implement the `AttentionDecoder.compute_output_spec() / compute_output_shape()` method. Error encountered:

in user code:

    File "<ipython-input-12-a328b88a09e8>", line 24, in loop_body  *
        output, state = self.gru(concat_input, initial_state=state)
    File "/usr/local/lib/python3.11/dist-packages/keras/src/utils/traceback_utils.py", line 122, in error_handler  **
        raise e.with_traceback(filtered_tb) from None

    OperatorNotAllowedInGraphError: Exception encountered when calling GRU.call().
    
    [1mIterating over a symbolic `tf.Tensor` is not allowed. You can attempt the following resolutions to the problem: If you are running in Graph mode, use Eager execution mode or decorate this function with @tf.function. If you are using AutoGraph, you can try decorating this function with @tf.function. If that does not work, then you may be using an unsupported feature or your source code may not be visible to AutoGraph. See https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/autograph/g3doc/reference/limitations.md#access-to-source-code for more information.[0m
    
    Arguments received by GRU.call():
      • sequences=tf.Tensor(shape=(None, 1, 1280), dtype=float32)
      • initial_state=tf.Tensor(shape=(None, 1024), dtype=float32)
      • mask=None
      • training=False
[0m

Arguments received by AttentionDecoder.call():
  • args=('<KerasTensor shape=(None, None), dtype=float32, sparse=False, name=spanish>', '<KerasTensor shape=(None, None, 1024), dtype=float32, sparse=False, name=keras_tensor_10>', '<KerasTensor shape=(None, 1024), dtype=float32, sparse=False, name=keras_tensor_13>')
  • kwargs=<class 'inspect._empty'>

# Трансформер

**Класс `TransformerDecoder`**

In [15]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

**Слой PositionalEmbedding**

In [16]:
import keras.ops as ops

In [17]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = ops.shape(inputs)[-1]
        positions = ops.arange(0, length, 1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return ops.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "sequence_length": self.sequence_length,
                "vocab_size": self.vocab_size,
                "embed_dim": self.embed_dim,
            }
        )
        return config

**End-to-end Трансформер**

In [18]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

In [19]:
embed_dim = 256
dense_dim = 2048
num_heads = 8


encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)



**Обучение**

In [20]:
transformer.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
transformer.fit(train_ds, epochs=30, validation_data=val_ds)

Epoch 1/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 52ms/step - accuracy: 0.6419 - loss: nan - val_accuracy: 0.6459 - val_loss: nan
Epoch 2/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m65s[0m 44ms/step - accuracy: 0.6458 - loss: nan - val_accuracy: 0.6459 - val_loss: nan
Epoch 3/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m57s[0m 44ms/step - accuracy: 0.6458 - loss: nan - val_accuracy: 0.6459 - val_loss: nan
Epoch 4/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m59s[0m 46ms/step - accuracy: 0.6458 - loss: nan - val_accuracy: 0.6459 - val_loss: nan
Epoch 5/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m57s[0m 44ms/step - accuracy: 0.6458 - loss: nan - val_accuracy: 0.6459 - val_loss: nan
Epoch 6/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m57s[0m 44ms/step - accuracy: 0.6458 - loss: nan - val_accuracy: 0.6459 - val_loss: nan
Epoch 7/30
[1m1302/1302[0m [32m━━━━━━

<keras.src.callbacks.history.History at 0x78d000486990>

**Пример перевода**

In [21]:
import numpy as np
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

-
I thought Tom had already talked to Mary about that.
[start]                    
-
This works.
[start]                    
-
Tom is like a father to me.
[start]                    
-
Who built this place?
[start]                    
-
Hard work has made Japan what it is today.
[start]                    
-
You can't cling to the past.
[start]                    
-
10 minutes remained until the end of the lesson.
[start]                    
-
Tom was hurt.
[start]                    
-
Every member must attend.
[start]                    
-
I have once been to Europe.
[start]                    
-
I used to use Twitter, but then found it a bit boring, so I stopped using it.
[start]                    
-
I think she will divorce him.
[start]                    
-
The main valve is turned off.
[start]                    
-
I'm thinking.
[start]                    
-
What about next Sunday?
[start]                    
-
I can't stand that noise.
[start]                    
-
We took lots

# Оценка моделей

Шаг 1: Реализация BLEU

In [22]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
smoothie = SmoothingFunction().method4

def compute_bleu(reference, prediction):
    reference = [reference.split()]
    prediction = prediction.split()
    return sentence_bleu(reference, prediction, smoothing_function=smoothie)

Шаг 2: Функция оценки BLEU для модели

In [23]:
def evaluate_model_bleu(model, decode_function, test_pairs, num_samples=100):
    scores = []
    for eng, spa in random.sample(test_pairs, num_samples):
        ref = spa.replace("[start]", "").replace("[end]", "").strip()
        pred = decode_function(eng).replace("[start]", "").replace("[end]", "").strip()
        score = compute_bleu(ref, pred)
        scores.append(score)
    return sum(scores) / len(scores)

Шаг 3: Декодеры для трёх моделей

RNN-декодер — уже реализован как decode_sequence() — сохраним его отдельно:

In [24]:
decode_rnn = decode_sequence

RNN + Attention. Реализовать decode_sequence_attention, аналогично, используя attention-модель:

In [25]:
def decode_sequence_attention(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])
        next_token_predictions = model.predict(
            [tokenized_input_sentence, tokenized_target_sentence], verbose=0)
        sampled_token_index = np.argmax(next_token_predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

Transformer-декодер — реализован.

In [26]:
decode_transformer = decode_sequence

Шаг 4: Сравнение моделей

In [28]:
print("BLEU RNN:", evaluate_model_bleu(seq2seq_rnn, decode_rnn, test_pairs))
#print("BLEU Attention RNN:", evaluate_model_bleu(model, decode_sequence_attention, test_pairs))
print("BLEU Transformer:", evaluate_model_bleu(transformer, decode_transformer, test_pairs))



BLEU RNN: 0.0
BLEU Transformer: 0.0


Шаг 5 (Опционально): Alignment Visualization

Визуализация alignment (выравнивания) между словами на исходном и целевом языках. Это особенно актуально для модели RNN + Attention, где механизм внимания позволяет «подсматривать» в соответствующие части входного предложения.

 Модифицируем Attention слой для возврата весов внимания

класс BahdanauAttention возвращает веса внимания:

In [68]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, encoder_outputs, hidden_state):
      if len(hidden_state.shape) == 1:
        hidden_state = tf.expand_dims(hidden_state, 0)  # (512,) -> (1, 512)
      hidden_with_time_axis = tf.expand_dims(hidden_state, 1)  # (1, 1, 512)
      score = self.V(tf.nn.tanh(self.W1(encoder_outputs) + self.W2(hidden_with_time_axis)))
      attention_weights = tf.nn.softmax(score, axis=1)
      context_vector = attention_weights * encoder_outputs
      context_vector = tf.reduce_sum(context_vector, axis=1)
      return context_vector, tf.squeeze(attention_weights, -1)


 Модифицируем декодер для сохранения attention map

В методе call декодера добавим сохранение весов внимания:

In [70]:
import tensorflow as tf

class AttentionDecoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, dec_units, attention):
        super().__init__()
        self.dec_units = dec_units
        self.attention = attention

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(
            dec_units,
            return_sequences=True,
            return_state=True,
            recurrent_initializer='glorot_uniform'
        )
        self.fc = tf.keras.layers.Dense(vocab_size)

    def call_with_attention(self, decoder_inputs, encoder_outputs, initial_state):
        return self._decode(decoder_inputs, encoder_outputs, initial_state, return_attention=True)

    def _decode(self, decoder_inputs, encoder_outputs, initial_state, return_attention):
        x = self.embedding(decoder_inputs)
        outputs = []
        attention_weights_all = []
        state = initial_state

        for t in range(x.shape[1]):
            x_t = x[:, t:t+1, :]
            context_vector, attention_weights = self.attention(encoder_outputs, state)
            attention_weights_all.append(attention_weights)

            context_vector, attention_weights = self.attention(
                encoder_outputs, tf.expand_dims(state, 0)
                )

            x_combined = tf.concat([tf.expand_dims(context_vector, 1), x_t], axis=-1)
            output, state = self.gru(x_combined, initial_state=state)


        outputs = tf.concat(outputs, axis=1)
        logits = self.fc(outputs)

        if return_attention:
            attention_weights_all = tf.stack(attention_weights_all, axis=1)
            return logits, attention_weights_all

        return logits

3. Функция декодирования с визуализацией внимания

In [48]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super().__init__()
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(enc_units, return_sequences=True, return_state=True)

    def call(self, x):
        x = self.embedding(x)
        output, state = self.gru(x)
        return output, state

In [49]:
source_vocab_size = len(source_vectorization.get_vocabulary())
embedding_dim = 256
enc_units = 512

encoder = Encoder(vocab_size=source_vocab_size, embedding_dim=embedding_dim, enc_units=enc_units)

In [61]:
target_vocab_size = len(target_vectorization.get_vocabulary())
embedding_dim = 256
dec_units = 512

In [62]:
attention_layer = BahdanauAttention(units=dec_units)

In [64]:
decoder = AttentionDecoder(
    vocab_size=target_vocab_size,
    embedding_dim=embedding_dim,
    dec_units=dec_units,
    attention=attention_layer
)

In [50]:
encoder = Encoder(vocab_size=source_vocab_size, embedding_dim=embedding_dim, enc_units=enc_units)

In [57]:
def preprocess_input(sentence, source_vectorization):
    sentence = tf.convert_to_tensor([sentence])
    tokenized_input = source_vectorization(sentence)
    return tokenized_input

In [51]:
import matplotlib.pyplot as plt
import seaborn as sns

def decode_with_attention(input_sentence):
    tokenized_input = source_vectorization([input_sentence])
    encoder_outputs, initial_state = encoder(tokenized_input)

    decoded_sentence = "[start]"
    attention_maps = []

    for i in range(max_decoded_sentence_length):
        tokenized_target = target_vectorization([decoded_sentence])[:, :-1]
        predictions, attention = attention_decoder.call_with_attention(
            tokenized_target,
            encoder_outputs,
            initial_state=initial_state
            )

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        attention_maps.append(attention[0, i].numpy())
        if sampled_token == "[end]":
            break

    return decoded_sentence, np.array(attention_maps), tokenized_input


def plot_attention(sentence, decoded_sentence, attention_map, input_tokens):
    input_tokens = [token for token in input_tokens if token != ""]
    output_tokens = decoded_sentence.split()
    plt.figure(figsize=(12, 8))
    sns.heatmap(attention_map[:len(output_tokens), :len(input_tokens)],
                xticklabels=input_tokens,
                yticklabels=output_tokens,
                cmap='viridis')
    plt.xlabel("Input")
    plt.ylabel("Output")
    plt.title("Attention Alignment")
    plt.show()

In [58]:
def decode_with_attention(input_sentence, encoder, decoder, source_vectorization, target_vectorization, max_target_len=20):
    tokenized_input = preprocess_input(input_sentence, source_vectorization)
    encoder_outputs, encoder_state = encoder(tokenized_input)

    decoder_input = tf.expand_dims([target_vectorization.get_vocabulary().index('[start]')], 0)
    decoder_state = encoder_state

    result_tokens = []
    attention_maps = []

    for _ in range(max_target_len):
        logits, attention_weights = decoder._decode(
            decoder_input,
            encoder_outputs,
            decoder_state,
            return_attention=True
        )

        # Получаем логиты последнего токена
        predicted_id = tf.argmax(logits[:, -1, :], axis=-1).numpy()[0]
        predicted_word = target_vectorization.get_vocabulary()[predicted_id]

        if predicted_word == '[end]':
            break

        result_tokens.append(predicted_word)
        attention_maps.append(attention_weights[:, -1, :])  # последняя позиция

        decoder_input = tf.expand_dims([predicted_id], 0)
        decoder_state = decoder.gru.layers[-1].states[0]  # новое состояние

    # Формируем итоговый attention map: (target_len, source_len)
    attention_map = tf.concat(attention_maps, axis=0).numpy()

    # Получаем токены исходного предложения
    input_indices = tokenized_input[0].numpy()
    input_tokens = [source_vectorization.get_vocabulary()[i] for i in input_indices if i != 0]

    return ' '.join(result_tokens), attention_map, input_tokens

In [59]:
import matplotlib.pyplot as plt
import numpy as np

def plot_attention(input_sentence, decoded_sentence, attention_map, input_tokens):
    fig, ax = plt.subplots(figsize=(10, 8))
    attention = attention_map[:len(decoded_sentence.split()), :len(input_tokens)]

    cax = ax.matshow(attention, cmap='viridis')
    fig.colorbar(cax)

    ax.set_xticklabels([''] + input_tokens, rotation=90)
    ax.set_yticklabels([''] + decoded_sentence.split())

    ax.set_xlabel('Input Sentence')
    ax.set_ylabel('Predicted Translation')
    plt.show()

In [71]:
input_sentence = "How are you?"
decoded_sentence, attention_map, input_tokens = decode_with_attention(
    input_sentence,
    encoder,
    decoder,
    source_vectorization,
    target_vectorization
)

plot_attention(input_sentence, decoded_sentence, attention_map, input_tokens)

InvalidArgumentError: Exception encountered when calling BahdanauAttention.call().

[1m{{function_node __wrapped__AddV2_device_/job:localhost/replica:0/task:0/device:GPU:0}} required broadcastable shapes [Op:AddV2] name: [0m

Arguments received by BahdanauAttention.call():
  • encoder_outputs=tf.Tensor(shape=(1, 20, 512), dtype=float32)
  • hidden_state=tf.Tensor(shape=(512,), dtype=float32)

Шаг 6 (Опционально): BLEU по длине предложений

In [46]:
short, medium, long = [], [], []

for pair in test_pairs:
    eng_len = len(pair[0].split())
    if eng_len <= 5:
        short.append(pair)
    elif eng_len <= 10:
        medium.append(pair)
    else:
        long.append(pair)

In [55]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

smoothie = SmoothingFunction().method4

def compute_bleu_static(pairs, predictions):
    scores = []
    for (_, ref), pred in zip(pairs, predictions):
        ref_tokens = ref.split()
        pred_tokens = pred.split()
        score = sentence_bleu([ref_tokens], pred_tokens, smoothing_function=smoothie)
        scores.append(score)
    return sum(scores) / len(scores) if scores else 0