<a href="https://colab.research.google.com/github/ann04ka/Labs/blob/main/notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import keras.ops as ops

In [2]:
from tensorflow.keras import layers, Model
import tensorflow as tf
import numpy as np
import os
import random
import re
import string

In [4]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip

--2025-06-04 04:02:35--  http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.195.207, 74.125.20.207, 108.177.98.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.195.207|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2638744 (2.5M) [application/zip]
Saving to: ‘spa-eng.zip’


2025-06-04 04:02:35 (212 MB/s) - ‘spa-eng.zip’ saved [2638744/2638744]



In [5]:
text_file = "spa-eng/spa.txt"
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    english, spanish = line.split("\t")
    spanish = "[start] " + spanish + " [end]"
    text_pairs.append((english, spanish))

import random
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

In [14]:
import tensorflow as tf
import string
import re
from tensorflow import keras
from tensorflow.keras import layers

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")

vocab_size = 15000
sequence_length = 20

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)

batch_size = 64

def format_dataset(eng, spa):
    eng = source_vectorization(eng)
    spa = target_vectorization(spa)
    return ({
        "english": eng,
        "spanish": spa[:, :-1],
    }, spa[:, 1:])

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [19]:
from tensorflow import keras
from tensorflow.keras import layers

embed_dim = 256
latent_dim = 1024

source = keras.Input(shape=(None,), dtype="int64", name="english")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)
encoded_source = layers.Bidirectional(
    layers.GRU(latent_dim), merge_mode="sum")(x)

past_target = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
x = decoder_gru(x, initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation="softmax")(x)
seq2seq_rnn = keras.Model([source, past_target], target_next_step)

seq2seq_rnn.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
seq2seq_rnn.fit(train_ds, epochs=1, validation_data=val_ds)

[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m216s[0m 160ms/step - accuracy: 0.1588 - loss: 5.2638 - val_accuracy: 0.1578 - val_loss: 3.8747


<keras.src.callbacks.history.History at 0x7e38536e30d0>

In [20]:
embed_dim = 256
latent_dim = 1024

from tensorflow.keras.models import Sequential
from tensorflow.keras import layers, Model
from tensorflow.keras.layers import InputLayer

rnn_model = Sequential([
    InputLayer(input_shape=(sequence_length,), dtype='int64', name='english'),

    layers.Embedding(vocab_size, embed_dim, input_length=sequence_length),

    layers.Bidirectional(layers.GRU(latent_dim, return_sequences=False)),

    layers.Dropout(0.5),

    layers.RepeatVector(sequence_length),

    layers.GRU(latent_dim * 2, return_sequences=True),

    layers.Dropout(0.5),

    layers.Dense(vocab_size, activation="softmax")
])

rnn_model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)



In [21]:
rnn_model.fit(train_ds, epochs=1, validation_data=train_ds)

Expected: english
Received: inputs=['Tensor(shape=(None, None))']


[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m595s[0m 454ms/step - accuracy: 0.6700 - loss: 2.3377 - val_accuracy: 0.7221 - val_loss: 1.6060


<keras.src.callbacks.history.History at 0x7e3878776850>

In [22]:
rnn_model.save("rnn_model.keras")

# from tensorflow.keras.models import load_model
# seq2seq_rnn = load_model("/content/seq2seq_rnn.keras")

In [45]:
def build_rnn_attention_model():
    encoder_inputs = layers.Input(shape=(None,), dtype="int64", name="english")
    x = layers.Embedding(vocab_size, embed_dim)(encoder_inputs)
    encoder_gru = layers.GRU(latent_dim, return_sequences=True, return_state=True)
    encoder_outputs, encoder_state = encoder_gru(x)

    decoder_inputs = layers.Input(shape=(None,), dtype="int64", name="spanish")
    y = layers.Embedding(vocab_size, embed_dim)(decoder_inputs)
    decoder_gru = layers.GRU(latent_dim, return_sequences=True, return_state=True)
    decoder_outputs, _ = decoder_gru(y, initial_state=encoder_state)

    # Сохраняем attention layer как отдельный слой
    attention_layer = layers.Attention(name="attention")
    attention_output = attention_layer([encoder_outputs, decoder_outputs])

    merged = layers.Concatenate()([decoder_outputs, attention_output])
    merged = layers.Dropout(0.5)(merged)
    final_output = layers.Dense(vocab_size, activation="softmax")(merged)

    model = Model([encoder_inputs, decoder_inputs], final_output)
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

    # Вспомогательная модель для получения attention weights
    attention_model = Model(
        [encoder_inputs, decoder_inputs],
        [final_output, attention_output]
    )

    return model, attention_model

# Построение модели
seq2seq_rnn_with_attention, attention_model = build_rnn_attention_model()

In [35]:
rnn_with_attention_model.fit(train_ds, epochs=1, validation_data=train_ds)

OperatorNotAllowedInGraphError: Exception encountered when calling GRU.call().

[1mIterating over a symbolic `tf.Tensor` is not allowed. You can attempt the following resolutions to the problem: If you are running in Graph mode, use Eager execution mode or decorate this function with @tf.function. If you are using AutoGraph, you can try decorating this function with @tf.function. If that does not work, then you may be using an unsupported feature or your source code may not be visible to AutoGraph. See https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/autograph/g3doc/reference/limitations.md#access-to-source-code for more information.[0m

Arguments received by GRU.call():
  • sequences=tf.Tensor(shape=(None, None, 256), dtype=float32)
  • initial_state=None
  • mask=None
  • training=True

In [27]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.position_embeddings = layers.Embedding(input_dim=sequence_length, output_dim=embed_dim)
    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_pos = self.position_embeddings(positions)
        return embedded_tokens + embedded_pos

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = tf.keras.Sequential([
            layers.Dense(dense_dim, activation="relu"),
            layers.Dense(embed_dim)
        ])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attn_output = self.attention(inputs, inputs, attention_mask=mask)
        out1 = self.layernorm_1(inputs + attn_output)
        out2 = self.layernorm_2(out1 + self.dense_proj(out1))
        return out2

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.attention_1 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = tf.keras.Sequential([
            layers.Dense(dense_dim, activation="relu"),
            layers.Dense(embed_dim)
        ])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = causal_mask
        attn_output_1 = self.attention_1(inputs, inputs, attention_mask=padding_mask)
        out1 = self.layernorm_1(inputs + attn_output_1)
        attn_output_2 = self.attention_2(query=out1, value=encoder_outputs, key=encoder_outputs)
        out2 = self.layernorm_2(out1 + attn_output_2)
        out3 = self.layernorm_3(out2 + self.dense_proj(out2))
        return out3
    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, seq_length = input_shape[0], input_shape[1]
        i = tf.range(seq_length)[:, tf.newaxis]
        j = tf.range(seq_length)
        mask = tf.cast(i >= j, dtype="int32")
        causal_mask = tf.broadcast_to(mask, shape=[batch_size, seq_length, seq_length])
        return causal_mask

encoder_inputs = layers.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
x = TransformerEncoder(embed_dim, 2048, 8)(x)
encoder_outputs = x

decoder_inputs = layers.Input(shape=(None,), dtype="int64", name="spanish")
y = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
y = TransformerDecoder(embed_dim, 2048, 8)(y, encoder_outputs)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(y)

transformer = Model([encoder_inputs, decoder_inputs], decoder_outputs)
transformer.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

In [28]:
transformer.fit(train_ds, epochs=1, validation_data=train_ds)

[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m110s[0m 74ms/step - accuracy: 0.6974 - loss: 2.3272 - val_accuracy: 0.7437 - val_loss: 1.5188


<keras.src.callbacks.history.History at 0x7e38782a5610>

In [29]:
transformer.save("transformer.keras")

In [30]:
from nltk.translate.bleu_score import sentence_bleu
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))

def decode_sequence(model, input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(sequence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])[:, :-1]
        predictions = model.predict([tokenized_input_sentence, tokenized_target_sentence], verbose=0)
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_word = spa_index_lookup[sampled_token_index]
        if sampled_word == "[end]" or len(decoded_sentence.split()) > sequence_length:
            break
        decoded_sentence += " " + sampled_word
    return decoded_sentence.replace("[start] ", "").replace(" [end]", "")

def evaluate_model(model, pairs, n_samples=10):
    total_bleu = 0
    for _ in range(n_samples):
        input_sentence, true_sentence = random.choice(pairs)
        pred_sentence = decode_sequence(model, input_sentence)
        reference = [true_sentence.replace("[start] ", "").replace(" [end]", "").split()]
        candidate = pred_sentence.split()
        bleu = sentence_bleu(reference, candidate)
        total_bleu += bleu
    return total_bleu / n_samples

In [33]:
# rnn_model.fit(train_ds, validation_data=val_ds, epochs=1)
# seq2seq_rnn_with_attention.fit(train_ds, validation_data=val_ds, epochs=1)
# transformer.fit(train_ds, validation_data=val_ds, epochs=1)

print("BLEU RNN:", evaluate_model(rnn_model, test_pairs))
# print("BLEU RNN+Attention:", evaluate_model(seq2seq_rnn_with_attention, test_pairs))
print("BLEU Transformer:", evaluate_model(transformer, test_pairs))

BLEU RNN: 6.260842608366016e-232
BLEU Transformer: 4.5293359739455395e-232


In [13]:
train_ds

<CacheDataset element_spec=({'english': TensorSpec(shape=(None, None), dtype=tf.int64, name=None), 'spanish': TensorSpec(shape=(None, None), dtype=tf.int64, name=None)}, TensorSpec(shape=(None, None), dtype=tf.int64, name=None))>

In [39]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from nltk.translate.bleu_score import sentence_bleu

spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
eng_vocab = source_vectorization.get_vocabulary()
eng_index_lookup = dict(zip(range(len(eng_vocab)), eng_vocab))

In [40]:
def visualize_attention(model, input_sentence):
    tokenized_input = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    all_att_weights = []
    generated_words = []

    for i in range(20):  # максимум 20 шагов
        tokenized_target = target_vectorization([decoded_sentence])[:, :-1]
        output, att_weights = model.predict([tokenized_input, tokenized_target], verbose=0)

        sampled_token_idx = np.argmax(output[0, i, :])
        sampled_word = spa_index_lookup[sampled_token_idx]

        if sampled_word == "[end]" or len(generated_words) >= 20:
            break

        decoded_sentence += " " + sampled_word
        generated_words.append(sampled_word)
        all_att_weights.append(att_weights[0][i])

    input_words = input_sentence.split()

    fig, ax = plt.subplots(figsize=(10, 6))
    sns.heatmap(
        np.array(all_att_weights),
        xticklabels=input_words,
        yticklabels=generated_words,
        cmap="Blues",
        annot=True,
        fmt=".2f",
        ax=ax
    )
    ax.set_xlabel("Input Words (English)")
    ax.set_ylabel("Generated Translation (Spanish)")
    ax.set_title("Word Alignment via Attention")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

In [44]:
attention_model = Model(
    inputs=[source, past_target],
    outputs=[rnn_model.output, att_layer_output]
)

AttributeError: The layer sequential_6 has never been called and thus has no defined output.

In [38]:
example_sentence, _ = random.choice(test_pairs)
visualize_attention(rnn_model, example_sentence)

ValueError: not enough values to unpack (expected 2, got 1)

In [41]:
def evaluate_model_by_length(model, pairs, max_len_groups=[10, 20]):
    results = {}
    short, medium, long = [], [], []

    for pair in pairs:
        src, tgt = pair
        length = len(src.split())
        if length <= max_len_groups[0]:
            short.append(pair)
        elif length <= max_len_groups[1]:
            medium.append(pair)
        else:
            long.append(pair)

    def get_bleu(group):
        return evaluate_model(model, group, n_samples=10)

    results["short"] = get_bleu(short) if short else 0
    results["medium"] = get_bleu(medium) if medium else 0
    results["long"] = get_bleu(long) if long else 0

    return results

In [42]:
print("Оценка по длине:")
for name, model in [
    ("RNN", rnn_model),
    # ("RNN+Attention", seq2seq_rnn_with_attention),
    ("Transformer", transformer)
]:
    print(f"\n{name}:")
    scores = evaluate_model_by_length(model, test_pairs)
    print(f"Short: {scores['short']:.4f}")
    print(f"Medium: {scores['medium']:.4f}")
    print(f"Long: {scores['long']:.4f}")

Оценка по длине:

RNN:


The hypothesis contains 0 counts of 2-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 3-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 4-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()


Short: 0.0000
Medium: 0.0000
Long: 0.0000

Transformer:
Short: 0.0000
Medium: 0.0000
Long: 0.0000


In [43]:
def generate_long_sentences(n=5, repeat_times=3):
    new_pairs = []
    for _ in range(n):
        pair = random.choice(train_pairs)
        src, tgt = pair
        src = " ".join(src.split() * repeat_times)
        tgt = " ".join(tgt.replace("[start] ", "").replace(" [end]", "").split() * repeat_times)
        new_pairs.append((src, "[start] " + tgt + " [end]"))
    return new_pairs

long_test_pairs = test_pairs + generate_long_sentences(n=10, repeat_times=3)