In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = "0"

import tensorflow as tf
from tensorflow import keras
import keras_nlp
import nltk
import pandas as pd
import numpy as np
import transformers
import pathlib

tf.config.list_physical_devices()

  from .autonotebook import tqdm as notebook_tqdm


[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]

In [2]:
import torch

# Overview
- considering using the following notebook as a basis for testing off-the-shelf translation models and attempt to create my own or modify an existing.
- the following was inspired by my speaking to my grandparents and uncle about my book sending app. Realizing they (and other non-english speakers) may find it handy to have the option to at least attempt translating the book for them, as well as, my recent exposure to more Natural Language Processing literature.

### off-the-shelf model

In [8]:
!nvidia-smi

Sun Mar  5 23:54:28 2023       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 531.18                 Driver Version: 531.18       CUDA Version: 12.1     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                      TCC/WDDM | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf            Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  NVIDIA GeForce RTX 3070 Ti    WDDM | 00000000:04:00.0  On |                  N/A |
| 61%   49C    P0               83W / 310W|   1026MiB /  8192MiB |      2%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [9]:
print(torch.cuda.is_available())

True


In [10]:
# Load the pre-trained translation model
model_name = "Helsinki-NLP/opus-mt-en-es"
model = transformers.pipeline("translation_en_to_es", model=model_name)

# Fine-tune the model on a specific domain
fine_tuned_model = transformers.pipeline(
    "translation_en_to_es", 
    model=model_name, 
    tokenizer=model.tokenizer, 
    device=1    # device 1 --> gpu
)

RuntimeError: CUDA error: invalid device ordinal
CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.

In [None]:
# # Train the fine-tuned model
# domain_data = [...] # Load domain-specific data
# for data in domain_data:
#     source_text = data["source_text"]
#     target_text = data["target_text"]
#     fine_tuned_model(source_text)

In [None]:
# Use the fine-tuned model to translate text
text_to_translate = "Hello, how are you?"
translation = fine_tuned_model(text_to_translate)
print(translation)

In [None]:
from tensorflow import keras
from tensorflow_text.tools.wordpiece_vocab import (
    bert_vocab_from_dataset as bert_vocab,
)

In [None]:
BATCH_SIZE = 64
EPOCHS = 5  # This should be at least 10 for convergence
MAX_SEQUENCE_LENGTH = 60
ENG_VOCAB_SIZE = 15000
SPA_VOCAB_SIZE = 17000

EMBED_DIM = 50
INTERMEDIATE_DIM = 20480
NUM_HEADS = 8

In [None]:
# dataset making methods for normal WordPieceToken processing
def preprocess_batch(eng, spa):
    batch_size = tf.shape(spa)[0]

    print(f"batch_size: {batch_size}")

    eng = eng_tokenizer(eng)
    spa = spa_tokenizer(spa)

    # Pad `eng` to `MAX_SEQUENCE_LENGTH`.
    eng_start_end_packer = keras_nlp.layers.StartEndPacker(
        sequence_length=MAX_SEQUENCE_LENGTH,
        pad_value=eng_tokenizer.token_to_id("[PAD]"),
    )
    eng = eng_start_end_packer(eng)

    # Add special tokens (`"[START]"` and `"[END]"`) to `spa` and pad it as well.
    spa_start_end_packer = keras_nlp.layers.StartEndPacker(
        sequence_length=MAX_SEQUENCE_LENGTH + 1,
        start_value=spa_tokenizer.token_to_id("[START]"),
        end_value=spa_tokenizer.token_to_id("[END]"),
        pad_value=spa_tokenizer.token_to_id("[PAD]"),
    )
    spa = spa_start_end_packer(spa)

    print(f"english start packer: {eng[0]}")
    print(f"english end packer: {eng[-1]}")
    print(f"spanish start packer: {spa[0]}")
    print(f"spanish end packer: {spa[-1]}")

    return (
        {
            "encoder_inputs": eng,
            "decoder_inputs": spa[:, :-1],
        },
        spa[:, 1:],
    )


def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(BATCH_SIZE)
    dataset = dataset.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE)
    return dataset.shuffle(2048).prefetch(16).cache()



def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

In [None]:
import pathlib

text_file = keras.utils.get_file(
    fname="spa-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
    extract=True,
)
text_file = pathlib.Path(text_file).parent / "spa-eng" / "spa.txt"

text_file_opus = keras.utils.get_file(
    fname="spa-eng-opus.gz",
    origin="https://opus.nlpl.eu/download.php?f=Books/v1/tmx/en-es.tmx.gz",
    untar=True,     # must use untar because we are dealing with .gz
)
text_file_opus = pathlib.Path(text_file_opus).parent / "spa-eng-text_file_opus" / "spa-eng-opus.txt"

In [None]:
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    eng, spa = line.split("\t")
    eng = eng.lower()
    spa = spa.lower()
    text_pairs.append((eng, spa))


from translate.storage.tmx import tmxfile

with open(r"C:\Users\bllen\.keras\datasets\en-es.tmx", 'rb') as fin:
    tmx_file = tmxfile(fin, 'en', 'ar')

text_pairs_opus = []
for node in tmx_file.unit_iter():
    print(node.source, node.target)
    eng, spa = node.source.lower().replace('"',""), node.target.lower().replace('"',"")
    text_pairs_opus.append((eng, spa))

In [None]:
import random

for _ in range(5):
    print(random.choice(text_pairs))


for _ in range(5):
    print(random.choice(text_pairs_opus))

In [None]:
# splitting into training/validation/test

random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))                                       # validation set is 15%
num_train_samples = len(text_pairs) - 2 * num_val_samples                           # train set is 70%
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")


print(f"------------------- opus dataset --------------------------")


random.shuffle(text_pairs_opus)
num_val_samples = int(0.15 * len(text_pairs_opus))                                       # validation set is 15%
num_train_samples = len(text_pairs_opus) - 2 * num_val_samples                           # train set is 70%
train_pairs_opus = text_pairs_opus[:num_train_samples]
val_pairs_opus = text_pairs_opus[num_train_samples : num_train_samples + num_val_samples]
test_pairs_opus = text_pairs_opus[num_train_samples + num_val_samples :]

print(f"{len(text_pairs_opus)} total pairs")
print(f"{len(train_pairs_opus)} training pairs")
print(f"{len(val_pairs_opus)} validation pairs")
print(f"{len(test_pairs_opus)} test pairs")

In [None]:
def train_word_piece(text_samples, vocab_size, reserved_tokens):
    word_piece_ds = tf.data.Dataset.from_tensor_slices(text_samples)
    vocab = keras_nlp.tokenizers.compute_word_piece_vocabulary(
        word_piece_ds.batch(1000).prefetch(2),
        vocabulary_size=vocab_size,
        reserved_tokens=reserved_tokens,
    )
    return vocab

In [None]:
reserved_tokens = ["[PAD]", "[UNK]", "[START]", "[END]"]

eng_samples = [text_pair[0] for text_pair in train_pairs]                       # getting english part of tuples
eng_vocab = train_word_piece(eng_samples, ENG_VOCAB_SIZE, reserved_tokens)

spa_samples = [text_pair[1] for text_pair in train_pairs]
spa_vocab = train_word_piece(spa_samples, SPA_VOCAB_SIZE, reserved_tokens)      # getting spanish part of tuples

eng_samples_opus = [text_pair[0] for text_pair in train_pairs_opus]                       # getting english part of tuples
eng_vocab_opus = train_word_piece(eng_samples_opus, ENG_VOCAB_SIZE, reserved_tokens)

spa_samples_opus = [text_pair[1] for text_pair in train_pairs_opus]
spa_vocab_opus = train_word_piece(spa_samples_opus, SPA_VOCAB_SIZE, reserved_tokens)      # getting spanish part of tuples

In [None]:
print("English Tokens: ", eng_vocab[100:110])
print("Spanish Tokens: ", spa_vocab[100:110])

print(f"--------------------------opus----------------------------")
print("English Tokens: ", eng_vocab_opus[100:110])
print("Spanish Tokens: ", spa_vocab_opus[100:110])

In [None]:
eng_tokenizer = keras_nlp.tokenizers.WordPieceTokenizer(vocabulary=eng_vocab, lowercase=False)
spa_tokenizer = keras_nlp.tokenizers.WordPieceTokenizer(vocabulary=spa_vocab, lowercase=False)

eng_tokenizer_opus = keras_nlp.tokenizers.WordPieceTokenizer(vocabulary=eng_vocab_opus, lowercase=False)
spa_tokenizer_opus = keras_nlp.tokenizers.WordPieceTokenizer(vocabulary=spa_vocab_opus, lowercase=False)

In [None]:
# trying to use word vectorization instead of simple workPieceOtkenizer
eng_vectorization = keras.layers.TextVectorization(max_tokens=ENG_VOCAB_SIZE, output_mode="int", output_sequence_length=MAX_SEQUENCE_LENGTH)
spa_vectorization = keras.layers.TextVectorization(max_tokens=SPA_VOCAB_SIZE, output_mode="int", output_sequence_length=MAX_SEQUENCE_LENGTH + 1, standardize=custom_standardization)
train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)                    # NOTE: MUST CALL EITHER ... adapt() to derive vocab set ... or ... supply a dataset
spa_vectorization.adapt(train_spa_texts)

In [None]:
# format dataset methods for 
def format_dataset_vectorization(eng, spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    enc_dec_dict = {"encoder_inputs": eng, "decoder_inputs": spa[:, :-1],}
    dec_right_shifted = spa[:, 1:]
    return (enc_dec_dict, dec_right_shifted)


def make_dataset_vectorization(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(BATCH_SIZE)
    dataset = dataset.map(format_dataset_vectorization, num_parallel_calls=tf.data.AUTOTUNE)
    return dataset.shuffle(2048).prefetch(16).cache()

In [None]:
# checking tokens by detokenizing

eng_input_ex = text_pairs[0][0]                         # first element of first tuple in list
eng_tokens_ex = eng_tokenizer.tokenize(eng_input_ex)
print("English sentence: ", eng_input_ex)
print("Tokens: ", eng_tokens_ex)
print(f"Recovered text after detokenizing: {eng_tokenizer.detokenize(eng_tokens_ex)}")

print()

spa_input_ex = text_pairs[0][1]
spa_tokens_ex = spa_tokenizer.tokenize(spa_input_ex)
print("Spanish sentence: ", spa_input_ex)
print("Tokens: ", spa_tokens_ex)
print(f"Recovered text after detokenizing: {spa_tokenizer.detokenize(spa_tokens_ex)}")

print(f"tf.data.AUTOTUNE: {tf.data.AUTOTUNE}")


print("-----------------opus--------------------")


eng_input_ex = text_pairs_opus[0][0]                         # first element of first tuple in list
eng_tokens_ex = eng_tokenizer_opus.tokenize(eng_input_ex)
print("English sentence: ", eng_input_ex)
print("Tokens: ", eng_tokens_ex)
print(f"Recovered text after detokenizing: {eng_tokenizer_opus.detokenize(eng_tokens_ex)}")

print()

spa_input_ex = text_pairs_opus[0][1]
spa_tokens_ex = spa_tokenizer_opus.tokenize(spa_input_ex)
print("Spanish sentence: ", spa_input_ex)
print("Tokens: ", spa_tokens_ex)
print(f"Recovered text after detokenizing: {spa_tokenizer_opus.detokenize(spa_tokens_ex)}")

In [None]:
train_ds = make_dataset_vectorization(train_pairs)
val_ds = make_dataset_vectorization(val_pairs)

In [None]:
train_ds_opus = make_dataset(train_pairs_opus)
val_ds_opus = make_dataset(val_pairs_opus)

In [None]:
train_ds = make_dataset_vectorization(train_pairs)
val_ds = make_dataset_vectorization(val_pairs)

train_ds_opus = make_dataset_vectorization(train_pairs_opus)
val_ds_opus = make_dataset_vectorization(val_pairs_opus)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

for inputs, targets in train_ds_opus.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

In [None]:
# Encoder
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")

x = keras_nlp.layers.TokenAndPositionEmbedding(
    vocabulary_size=ENG_VOCAB_SIZE,
    sequence_length=MAX_SEQUENCE_LENGTH,
    embedding_dim=EMBED_DIM,
    mask_zero=True,
)(encoder_inputs)

encoder_outputs = keras_nlp.layers.TransformerEncoder(intermediate_dim=INTERMEDIATE_DIM, num_heads=NUM_HEADS)(inputs=x)
encoder = keras.Model(encoder_inputs, encoder_outputs)


# Decoder
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, EMBED_DIM), name="decoder_state_inputs")

x = keras_nlp.layers.TokenAndPositionEmbedding(
    vocabulary_size=SPA_VOCAB_SIZE,
    sequence_length=MAX_SEQUENCE_LENGTH,
    embedding_dim=EMBED_DIM,
    mask_zero=True,
)(decoder_inputs)

x = keras_nlp.layers.TransformerDecoder(intermediate_dim=INTERMEDIATE_DIM, num_heads=NUM_HEADS)(decoder_sequence=x, encoder_sequence=encoded_seq_inputs)
x = keras.layers.Dropout(0.4)(x)
decoder_outputs = keras.layers.Dense(SPA_VOCAB_SIZE, activation="softmax")(x)
decoder = keras.Model(
    [
        decoder_inputs,
        encoded_seq_inputs,
    ],
    decoder_outputs,
)
decoder_outputs = decoder([decoder_inputs, encoder_outputs])

transformer = keras.Model(
    [encoder_inputs, decoder_inputs],
    decoder_outputs,
    name="transformer",
)

In [None]:
transformer.summary()
transformer.compile(
    "Adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=EPOCHS, validation_data=val_ds)

In [None]:
def decode_sequences(input_sentences):
    batch_size = tf.shape(input_sentences)[0]

    # Tokenize the encoder input.
    encoder_input_tokens = eng_tokenizer(input_sentences).to_tensor(
        shape=(None, MAX_SEQUENCE_LENGTH)
    )

    # Define a function that outputs the next token's probability given the
    # input sequence.
    def token_probability_fn(decoder_input_tokens):
        return transformer([encoder_input_tokens, decoder_input_tokens])[:, -1, :]

    # Set the prompt to the "[START]" token.
    prompt = tf.fill((batch_size, 1), spa_tokenizer.token_to_id("[START]"))

    generated_tokens = keras_nlp.utils.top_p_search(
        token_probability_fn,
        prompt,
        p=0.1,
        max_length=40,
        end_token_id=spa_tokenizer.token_to_id("[END]"),
    )
    generated_sentences = spa_tokenizer.detokenize(generated_tokens)
    return generated_sentences


test_eng_texts = [pair[0] for pair in test_pairs]
for i in range(2):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequences(tf.constant([input_sentence]))
    translated = translated.numpy()[0].decode("utf-8")
    translated = (
        translated.replace("[PAD]", "")
        .replace("[START]", "")
        .replace("[END]", "")
        .strip()
    )
    print(f"** Example {i} **")
    print(input_sentence)
    print(translated)
    print()

In [None]:
transformer_opus = keras.Model(
    [encoder_inputs, decoder_inputs],
    decoder_outputs,
    name="transformer",
)
transformer_opus.compile(
    "Adadelta", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer_opus.summary()
transformer_opus.fit(train_ds_opus, epochs=EPOCHS, validation_data=val_ds_opus)         # this did pretty poorly

In [None]:
test_eng_texts = [pair[0] for pair in test_pairs]
for i in range(2):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequences(tf.constant([input_sentence]))
    translated = translated.numpy()[0].decode("utf-8")
    translated = (
        translated.replace("[PAD]", "")
        .replace("[START]", "")
        .replace("[END]", "")
        .strip()
    )
    print(f"** Example {i} **")
    print(input_sentence)
    print(translated)
    print()

In [None]:
rouge_1 = keras_nlp.metrics.RougeN(order=1)
rouge_2 = keras_nlp.metrics.RougeN(order=2)

for test_pair in test_pairs[:30]:
    input_sentence = test_pair[0]
    reference_sentence = test_pair[1]

    translated_sentence = decode_sequences(tf.constant([input_sentence]))
    translated_sentence = translated_sentence.numpy()[0].decode("utf-8")
    translated_sentence = (
        translated_sentence.replace("[PAD]", "")
        .replace("[START]", "")
        .replace("[END]", "")
        .strip()
    )

    rouge_1(reference_sentence, translated_sentence)
    rouge_2(reference_sentence, translated_sentence)

print("ROUGE-1 Score: ", rouge_1.result())
print("ROUGE-2 Score: ", rouge_2.result())

### giving custom transformer architecture a try

In [None]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

In [None]:
text_file = keras.utils.get_file(
    fname="spa-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
    extract=True,
)
text_file = pathlib.Path(text_file).parent / "spa-eng" / "spa.txt"

In [None]:
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    eng, spa = line.split("\t")
    spa = "[start] " + spa + " [end]"
    text_pairs.append((eng, spa))

In [None]:
for _ in range(5):
    print(random.choice(text_pairs))

In [None]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

In [None]:
# NOTE: i should try turning each punctuation character into its own token
# ... "use split function to the TextVectorization layer"

strip_chars = string.punctuation + "¿"              # this is a string of punctuation of which we will use in regex to remove punctuation
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")          # NOTE: we don't want to strip "[" or "]" - they are used for positional embedding monickers


# PARAMETERS
vocab_size = 15000
sequence_length = 20
batch_size = 64


eng_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
spa_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)                    # NOTE: MUST CALL EITHER ... adapt() to derive vocab set ... or ... supply a dataset
spa_vectorization.adapt(train_spa_texts)

In [None]:
def format_dataset(eng, spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    enc_dec_dict = {"encoder_inputs": eng, "decoder_inputs": spa[:, :-1],}
    dec_right_shifted = spa[:, 1:]
    return (enc_dec_dict, dec_right_shifted)


def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

In [None]:
from tensorflow.keras import layers

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential([layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="gelu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [None]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

In [None]:
epochs = 5  # This should be at least 30 for convergence

transformer.summary()
transformer.compile("rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

In [None]:
# save the model
transformer.save('first_custom_transformer_eng_to_spa.h5')

In [None]:
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

In [None]:

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    print(f"input_sentence: {input_sentence} | translated: {translated}")

### attempting to use Transformers model "translation_en_to_es" as basis for lighter similarly architectured model

In [6]:
import torch

config = transformers.AutoConfig.from_pretrained('Helsinki-NLP/opus-mt-en-es')
model = transformers.TFAutoModelForSeq2SeqLM.from_pretrained("Helsinki-NLP/opus-mt-en-es")
model.summary()

All model checkpoint layers were used when initializing TFMarianMTModel.

All the layers of TFMarianMTModel were initialized from the model checkpoint at Helsinki-NLP/opus-mt-en-es.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFMarianMTModel for predictions without further training.


Model: "tf_marian_mt_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 model (TFMarianMainLayer)   multiple                  77943296  
                                                                 
 final_logits_bias (BiasLaye  multiple                 65001     
 r)                                                              
                                                                 
Total params: 78,008,297
Trainable params: 77,943,296
Non-trainable params: 65,001
_________________________________________________________________


In [7]:
model.config()

TypeError: 'MarianConfig' object is not callable