In [15]:
import re
import random
import string
import numpy as np
import tensorflow as tf
from tensorflow import keras

In [3]:
with open("ukr.txt", "r", encoding="utf-8") as f:
  lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
  eng, ukr, _ = line.split("\t")
  ukr = "[start] " + ukr + " [end]"
  text_pairs.append((eng, ukr))

In [4]:
for _ in range(5):
  print(random.choice(text_pairs))

("I worked yesterday, but Tom didn't.", '[start] Я вчора працював, а Том ні. [end]')
('There are alternatives.', '[start] Альтернативи є. [end]')
("Aren't you sad?", '[start] Хіба вам не сумно? [end]')
("Have you told Tom I'm here?", '[start] Ви сказали Тому, що я тут? [end]')
('Do what I say.', '[start] Роби, що я кажу. [end]')


In [5]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

158174 total pairs
110722 training pairs
23726 validation pairs
23726 test pairs


In [6]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 15000
sequence_length = 20
batch_size = 64

def custom_standardization(input_string):
  lowercase = tf.strings.lower(input_string)
  return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

eng_vectorization = keras.layers.TextVectorization(
  max_tokens=vocab_size,
  output_mode="int",
  output_sequence_length=sequence_length,
)
ukr_vectorization = keras.layers.TextVectorization(
  max_tokens=vocab_size,
  output_mode="int",
  output_sequence_length=sequence_length + 1,
  standardize=custom_standardization,
)

train_eng_texts = [pair[0] for pair in train_pairs]
train_ukr_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)
ukr_vectorization.adapt(train_ukr_texts)

In [7]:
def format_dataset(eng, ukr):
  eng = eng_vectorization(eng)
  ukr = ukr_vectorization(ukr)
  return ({ "encoder_inputs": eng, "decoder_inputs": ukr[:, :-1]}, ukr[:, 1:])

def make_dataset(pairs):
  eng_texts, ukr_texts = zip(*pairs)
  eng_texts = list(eng_texts)
  ukr_texts = list(ukr_texts)
  dataset = tf.data.Dataset.from_tensor_slices((eng_texts, ukr_texts))
  dataset = dataset.batch(batch_size)
  dataset = dataset.map(format_dataset)
  return dataset.cache().shuffle(2048).prefetch(16)

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [8]:
for inputs, targets in train_ds.take(1):
  print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
  print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
  print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


In [9]:
class TransformerEncoderLayer(keras.layers.Layer):
  def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
    super(TransformerEncoderLayer, self).__init__()
    self.attention = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
    self.dense_proj = tf.keras.Sequential([keras.layers.Dense(ff_dim, activation="relu"), keras.layers.Dense(embed_dim)])
    self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
    self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
    self.dropout1 = keras.layers.Dropout(rate)
    self.dropout2 = keras.layers.Dropout(rate)

  def call(self, inputs, training):
    attn_output = self.attention(inputs, inputs)
    attn_output = self.dropout1(attn_output, training=training)
    out1 = self.layernorm1(inputs + attn_output)
    ffn_output = self.dense_proj(out1)
    ffn_output = self.dropout2(ffn_output, training=training)
    return self.layernorm2(out1 + ffn_output)

In [10]:
class TransformerDecoderLayer(keras.layers.Layer):
  def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
    super(TransformerDecoderLayer, self).__init__()
    self.attention1 = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
    self.attention2 = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
    self.dense_proj = tf.keras.Sequential([keras.layers.Dense(ff_dim, activation="relu"), keras.layers.Dense(embed_dim)])
    self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
    self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
    self.layernorm3 = keras.layers.LayerNormalization(epsilon=1e-6)
    self.dropout1 = keras.layers.Dropout(rate)
    self.dropout2 = keras.layers.Dropout(rate)
    self.dropout3 = keras.layers.Dropout(rate)

  def call(self, inputs, enc_output, training):
    attn_output1 = self.attention1(inputs, inputs)
    attn_output1 = self.dropout1(attn_output1, training=training)
    out1 = self.layernorm1(inputs + attn_output1)
    attn_output2 = self.attention2(out1, enc_output)
    attn_output2 = self.dropout2(attn_output2, training=training)
    out2 = self.layernorm2(out1 + attn_output2)
    ffn_output = self.dense_proj(out2)
    ffn_output = self.dropout3(ffn_output, training=training)
    return self.layernorm3(out2 + ffn_output)

In [11]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.layers.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = keras.layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoderLayer(embed_dim, num_heads, latent_dim)(x)

decoder_inputs = keras.layers.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.layers.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = keras.layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)(decoder_inputs)
x = TransformerDecoderLayer(embed_dim, num_heads, latent_dim)(x, encoder_outputs)
decoder_outputs = keras.layers.Dense(vocab_size, activation="softmax")(x)

transformer = keras.models.Model([encoder_inputs, decoder_inputs], decoder_outputs, name="transformer")

In [14]:
epochs = 3

transformer.compile("rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)



<keras.src.callbacks.History at 0x214839fe920>

In [16]:
ukr_vocab = ukr_vectorization.get_vocabulary()
ukr_index_lookup = dict(zip(range(len(ukr_vocab)), ukr_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
  tokenized_input_sentence = eng_vectorization([input_sentence])
  decoded_sentence = "[start]"
  for i in range(max_decoded_sentence_length):
    tokenized_target_sentence = ukr_vectorization([decoded_sentence])[:, :-1]
    predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])
    sampled_token_index = np.argmax(predictions[0, i, :])
    sampled_token = ukr_index_lookup[sampled_token_index]
    decoded_sentence += " " + sampled_token
    if sampled_token == "[end]":
      break
  return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(5):
  input_sentence = random.choice(test_eng_texts)
  translated = decode_sequence(input_sentence)
  print(f'INPUT: {input_sentence}')
  print(f'OUTPUT: {translated}')

INPUT: He is always happy.OUTPUT: [start] Він веселий [end]INPUT: Nobody saw anything.OUTPUT: [start] Ніхто бачив нічого [end]INPUT: Clean the room.OUTPUT: [start] Чиста кімната [end]INPUT: Who's in the kitchen?OUTPUT: [start] Хто в кухні [end]INPUT: I've come to pick Tom up.OUTPUT: [start] Я повернувся Том зверху [end]