In [1]:
import random
import string
import re
import numpy as np
import tensorflow as tf
import keras
from tensorflow.keras.layers import MultiHeadAttention

2024-12-28 09:09:22.973886: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-28 09:09:23.239538: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1735376963.337267     391 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1735376963.365220     391 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-12-28 09:09:23.610493: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [2]:
with open('ukr.txt', 'r', encoding='utf-8') as f:
    lines = f.read().split('\n')[:-1]

print("Data from ukr.txt")
for _ in range(3):
    print(random.choice(lines))

text_pairs = []
for line in lines:
    eng, ukr, _ = line.split('\t')
    ukr = '[s] ' + ukr + ' [e]'
    text_pairs.append((eng, ukr))

print("\nSample from text_pairs")
for t in range(5):
    print(random.choice(text_pairs))

random.shuffle(text_pairs)
text_pairs = text_pairs[:50000]
num_val = int(0.15 * len(text_pairs))
num_train = len(text_pairs) - 2 * num_val
train_pairs = text_pairs[:num_train]
val_pairs = text_pairs[num_train: num_train + num_val]
test_pairs = text_pairs[num_train + num_val:]

Data from ukr.txt
Stop next to the school.	Зупиніть біля школи.	CC-BY 2.0 (France) Attribution: tatoeba.org #1572093 (fanty) & #6451672 (deniko)
Tom slipped on the ice and fell.	Том послизнувся на кризі та впав.	CC-BY 2.0 (France) Attribution: tatoeba.org #6076668 (CK) & #5812016 (deniko)
Tom is being bullied by Mary.	Мері цькує Тома.	CC-BY 2.0 (France) Attribution: tatoeba.org #1398212 (Spamster) & #6289146 (deniko)

Sample from text_pairs
('Do you have a map of Australia?', '[s] У тебе є мапа Австралії? [e]')
('Do you like my T-shirt?', '[s] Тобі подобається моя футболка? [e]')
('I lit three candles.', '[s] Я запалив три свічки. [e]')
('Tom drank a cup of coffee.', '[s] Том випив чашку кави. [e]')
('Bye, Tom.', '[s] Бувайте, Томе. [e]')


In [3]:
strip_chars = string.punctuation.replace('[', '')
strip_chars = strip_chars.replace(']', '')

vocabulary_size = 15000
sequence_length = 20
batch_size = 64

def ukr_standardization(input_string):
    return tf.strings.regex_replace(tf.strings.lower(input_string), '[%s]' % re.escape(strip_chars), '')

eng_vector = keras.layers.TextVectorization(
    max_tokens=vocabulary_size,
    output_mode='int',
    output_sequence_length=sequence_length,
)
ukr_vector = keras.layers.TextVectorization(
    max_tokens=vocabulary_size,
    output_mode='int',
    output_sequence_length=sequence_length + 1,
    standardize=ukr_standardization,
)
train_eng = [pair[0] for pair in train_pairs]
train_ukr = [pair[1] for pair in train_pairs]
eng_vector.adapt(train_eng)
ukr_vector.adapt(train_ukr)

for v in [eng_vector, ukr_vector]:
    print(len(v.get_vocabulary()))
    print(v.get_vocabulary()[10:20])

I0000 00:00:1735376968.045228     391 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 9517 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4070 SUPER, pci bus id: 0000:01:00.0, compute capability: 8.9


6417
[np.str_('do'), np.str_('im'), np.str_('have'), np.str_('dont'), np.str_('me'), np.str_('was'), np.str_('he'), np.str_('mary'), np.str_('in'), np.str_('it')]
15000
[np.str_('на'), np.str_('Мері'), np.str_('ти'), np.str_('Тома'), np.str_('я'), np.str_('з'), np.str_('у'), np.str_('Це'), np.str_('У'), np.str_('в')]


In [4]:
def format_dataset(eng, ukr):
    eng = eng_vector(eng)
    ukr = ukr_vector(ukr)
    return ({"encoder_inputs": eng, "decoder_inputs": ukr[:, :-1]}, ukr[:, 1:])

def make_dataset(pairs):
    eng_texts, ukr_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    ukr_texts = list(ukr_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, ukr_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.cache().shuffle(2048).prefetch(16)

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [5]:
for inputs, targets in train_ds.take(1):
    print(f'encoder inputs shape: {inputs["encoder_inputs"].shape}')
    print(f'decoder inputs shape: {inputs["decoder_inputs"].shape}')
    print(f"targets shape: {targets.shape}")

encoder inputs shape: (64, 20)
decoder inputs shape: (64, 20)
targets shape: (64, 20)


2024-12-28 09:09:29.764436: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


In [6]:
# Генерація позиційного кодування
def get_positional_encoding(seq_len, embed_dim):
    position = tf.range(seq_len, dtype=tf.float32)[:, tf.newaxis]
    div_term = tf.exp(tf.range(0, embed_dim, 2, dtype=tf.float32) * -(tf.math.log(10000.0) / embed_dim))
    
    sinusoidal = tf.concat([tf.sin(position * div_term), tf.cos(position * div_term)], axis=-1)
    return sinusoidal

# Оновлення класів TransformerEncoder та TransformerDecoder з додаванням позиційного кодування
class TransformerEncoder(keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, latent_dim):
        super(TransformerEncoder, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.latent_dim = latent_dim
        self.attention = MultiHeadAttention(num_heads, embed_dim)  # Assuming you have this implemented

    def call(self, inputs, training=False):
        seq_len = tf.shape(inputs)[1]
        pos_encoding = get_positional_encoding(seq_len, self.embed_dim)
        inputs = inputs + pos_encoding  # Add positional encoding to the input
        attn_output = self.attention(inputs, inputs)  # Example attention mechanism
        return attn_output

class TransformerDecoder(keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerDecoder, self).__init__()
        self.attention1 = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.attention2 = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = tf.keras.Sequential([keras.layers.Dense(ff_dim, activation="relu"), keras.layers.Dense(embed_dim)])
        self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = keras.layers.Dropout(rate)
        self.dropout2 = keras.layers.Dropout(rate)
        self.dropout3 = keras.layers.Dropout(rate)

    def call(self, inputs, enc_output, training=False):
        seq_len = tf.shape(inputs)[1]
        pos_encoding = get_positional_encoding(seq_len, inputs.shape[-1])
        inputs = inputs + pos_encoding
        attn_output1 = self.attention1(inputs, inputs)
        attn_output1 = self.dropout1(attn_output1, training=training)
        out1 = self.layernorm1(inputs + attn_output1)
        attn_output2 = self.attention2(out1, enc_output)
        attn_output2 = self.dropout2(attn_output2, training=training)
        out2 = self.layernorm2(out1 + attn_output2)
        ffn_output = self.dense_proj(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        return self.layernorm3(out2 + ffn_output)

In [7]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.layers.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = keras.layers.Embedding(input_dim=vocabulary_size, output_dim=embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, num_heads, latent_dim)(x, training=True)

decoder_inputs = keras.layers.Input(shape=(None,), dtype="int64", name="decoder_inputs")
x = keras.layers.Embedding(input_dim=vocabulary_size, output_dim=embed_dim)(decoder_inputs)

x = TransformerDecoder(embed_dim, num_heads, latent_dim)(x, encoder_outputs, training=True)
decoder_outputs = keras.layers.Dense(vocabulary_size, activation="softmax")(x)

transformer = keras.models.Model([encoder_inputs, decoder_inputs], decoder_outputs, name="transformer")

In [8]:
epochs = 30

transformer.compile("rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

Epoch 1/30


I0000 00:00:1735376972.353505     476 service.cc:148] XLA service 0x7ff2c405c1a0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1735376972.353941     476 service.cc:156]   StreamExecutor device (0): NVIDIA GeForce RTX 4070 SUPER, Compute Capability 8.9
2024-12-28 09:09:32.438020: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
W0000 00:00:1735376972.540279     476 assert_op.cc:38] Ignoring Assert operator compile_loss/sparse_categorical_crossentropy/SparseSoftmaxCrossEntropyWithLogits/assert_equal_1/Assert/Assert
I0000 00:00:1735376972.892994     476 cuda_dnn.cc:529] Loaded cuDNN version 90300



































[1m  2/547[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m44s[0m 82ms/step - accuracy: 0.1855 - loss: 8.9735       

I0000 00:00:1735376989.398262     476 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m214/547[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m9s[0m 27ms/step - accuracy: 0.6929 - loss: 3.2297 

W0000 00:00:1735376995.365197     477 assert_op.cc:38] Ignoring Assert operator compile_loss/sparse_categorical_crossentropy/SparseSoftmaxCrossEntropyWithLogits/assert_equal_1/Assert/Assert



































[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 55ms/step - accuracy: 0.7194 - loss: 2.5472  

W0000 00:00:1735377020.064139     480 assert_op.cc:38] Ignoring Assert operator compile_loss/sparse_categorical_crossentropy/SparseSoftmaxCrossEntropyWithLogits/assert_equal_1/Assert/Assert
W0000 00:00:1735377020.918270     477 assert_op.cc:38] Ignoring Assert operator compile_loss/sparse_categorical_crossentropy/SparseSoftmaxCrossEntropyWithLogits/assert_equal_1/Assert/Assert









[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m54s[0m 64ms/step - accuracy: 0.7195 - loss: 2.5461 - val_accuracy: 0.7726 - val_loss: 1.5606
Epoch 2/30
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 29ms/step - accuracy: 0.7773 - loss: 1.5848 - val_accuracy: 0.8030 - val_loss: 1.2990
Epoch 3/30
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 26ms/step - accuracy: 0.8098 - loss: 1.3334 - val_accuracy: 0.8758 - val_loss: 0.8267
Epoch 4/30
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 29ms/step - accuracy: 0.8824 - loss: 0.8511 - val_accuracy: 0.9179 - val_loss: 0.5769
Epoch 5/30
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 27ms/step - accuracy: 0.9051 - loss: 0.6849 - val_accuracy: 0.9336 - val_loss: 0.4433
Epoch 6/30
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 28ms/step - accuracy: 0.9239 - loss: 0.5155 - val_accuracy: 0.9436 - val_loss: 0.3715
Epoch 7/30
[1m547/547[0m 

<keras.src.callbacks.history.History at 0x7ff3b2be9580>

In [9]:
ukr_vocab = ukr_vector.get_vocabulary()
ukr_index_lookup = dict(zip(range(len(ukr_vocab)), ukr_vocab))
max_decoded_sentence_length = 20  

def decode_sequence(input_sentence):
    # Токенізація
    tokenized_input_sentence = eng_vector([input_sentence])
    
    decoded_sentence = "[s]" 
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = ukr_vector([decoded_sentence])[:, :-1]
        
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])  
        sampled_token = ukr_index_lookup[sampled_token_index]  
        
        decoded_sentence += " " + sampled_token 
        
        if sampled_token == "[e]":
            break
    
    return decoded_sentence


test_eng_texts = [pair[0] for pair in test_pairs]

print(f'Результат:\n{"-"*50}')
for _ in range(10):
    input_sentence = random.choice(test_eng_texts)  
    translated = decode_sequence(input_sentence)  
    print(f'INPUT: {input_sentence}\nOUTPUT: {translated}\n{"-"*50}')

Результат:
--------------------------------------------------
INPUT: Would you please open the window?
OUTPUT: [s] Неважливо післязавтра договір невдячним присмак захворюванням олівця павутиння павутиння вдиху недобре розуміли розуміли всіма церкви дітьми    
--------------------------------------------------
INPUT: I didn't notice it.
OUTPUT: [s] знала післязавтра кішок невдячним огиди твариною твариною рублів баксів підтримки недобре недобре розуміли шостому році році усі церкви цю 
--------------------------------------------------
INPUT: Tom told me to run.
OUTPUT: [s] Неважливо післязавтра друже невдячним фотоальбом рублів рублів баксів баксів недобре недобре недобре худобу худобу хворіють дітьми    
--------------------------------------------------
INPUT: What to do next is the question.
OUTPUT: [s] Дах пожежі невдячним фотоальбом фотоальбом твариною твариною випічки випічки недобре недобре ходьби худобу коми хворіють дітьми    
--------------------------------------------------