Translator for German to English

Import the required libraries

In [3]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

Download the Dataset

In [4]:
#from google.colab import drive
#drive.mount('/content/drive')

file_path = './deu.txt'

Create list of input and target words/sentences

In [5]:
import csv
with open(file_path, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='\t')
        pairs = list(reader)

context = np.array([pair[1] for pair in pairs])
target = np.array([pair[0] for pair in pairs])

print(context[-1])
print(target[-1])

Ohne Zweifel findet sich auf dieser Welt zu jedem Mann genau die richtige Ehefrau und umgekehrt; wenn man jedoch in Betracht zieht, dass ein Mensch nur Gelegenheit hat, mit ein paar hundert anderen bekannt zu sein, von denen ihm nur ein Dutzend oder weniger nahesteht, darunter höchstens ein oder zwei Freunde, dann erahnt man eingedenk der Millionen Einwohner dieser Welt leicht, dass seit Erschaffung ebenderselben wohl noch nie der richtige Mann der richtigen Frau begegnet ist.
Doubtless there exists in this world precisely the right woman for any given man to marry and vice versa; but when you consider that a human being has the opportunity of being acquainted with only a few hundred people, and out of the few hundred that there are but a dozen or less whom he knows intimately, and out of the dozen, one or two friends at most, it will easily be seen, when we remember the number of millions who inhabit this world, that probably, since the earth was created, the right man has never yet m

Shuffle the dataset and Create batches

In [6]:
BATCH_SIZE = 64

# Generate a boolean array 'is_train' to determine if each sample should be included in the training set.
# Each element of 'is_train' is True with a probability of 0.8, indicating inclusion in the training set.
is_train = np.random.uniform(size=(len(target),)) < 0.8

# Create a training dataset ('train_ten') using samples where 'is_train' is True.
# The dataset is constructed from tensor slices of 'context' and 'target' arrays corresponding to True values in 'is_train'.
# Shuffle the dataset with a buffer size of the length of 'context' and batch it with the specified BATCH_SIZE.
train_ten = (
    tf.data.Dataset
    .from_tensor_slices((context[is_train], target[is_train]))
    .shuffle(len(context))
    .batch(BATCH_SIZE))

# Create a test dataset ('test_ten') using samples where 'is_train' is False.
# The dataset is constructed from tensor slices of 'context' and 'target' arrays corresponding to False values in 'is_train'.
# Shuffle the dataset with a buffer size of the length of 'context' and batch it with the specified BATCH_SIZE.
test_ten = (
    tf.data.Dataset
    .from_tensor_slices((context[~is_train], target[~is_train]))
    .shuffle(len(context))
    .batch(BATCH_SIZE))


Preprocess the datatset

In [7]:
import unicodedata

# clean the data by converting to lower case, removing unwanted characters and addinf start and end token
def clean_data(text):
    # Convert the text to lowercase
    text = tf.strings.lower(text)
    # Keep only alphabets, digits, and punctuation
    text = tf.strings.regex_replace(text, '[^a-z0-9?.!,¿]', ' ')
    # Strip leading and trailing whitespaces
    text = tf.strings.strip(text)
    # Add start and end tokens to the text
    text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
    return text


Tokenize and Vectorize the text data

In [8]:
max_vocab_size = 5000  # Define the maximum vocabulary size

# Define the text vectorization layer for context
context_vectorization = tf.keras.layers.TextVectorization(
    standardize=clean_data,  # Preprocessing function for standardization
    max_tokens=max_vocab_size,  # Maximum vocabulary size
    ragged=True  # Allow ragged tensors (varying-length sequences)
)
# Adapt the text vectorization layer to the training data
context_vectorization.adapt(train_ten.map(lambda context, target: context))

# Display the first 10 words from the vocabulary
context_vocab = context_vectorization.get_vocabulary()[:10]
print("Context Vocabulary:", context_vocab)

# Define the text vectorization layer for target
target_vectorization = tf.keras.layers.TextVectorization(
    standardize=clean_data,  # Preprocessing function for standardization
    max_tokens=max_vocab_size,  # Maximum vocabulary size
    ragged=True  # Allow ragged tensors (varying-length sequences)
)
# Adapt the text vectorization layer to the training data
target_vectorization.adapt(train_ten.map(lambda context, target: target))

# Display the first 10 words from the vocabulary
target_vocab = target_vectorization.get_vocabulary()[:10]
print("Target Vocabulary:", target_vocab)


Context Vocabulary: ['', '[UNK]', '[START]', '[END]', 'ich', 'tom', 'ist', 'nicht', 'das', 'du']
Target Vocabulary: ['', '[UNK]', '[START]', '[END]', 'i', 'tom', 'to', 'the', 'you', 'a']


In [9]:
def process_text(context, target):
    # Process context and target texts using the TextVectorization layers
    context = context_vectorization(context)
    target = target_vectorization(target)

    # Get inputs and outputs for the target text (teacher forcing)
    targ_in = target[:, :-1]
    targ_out = target[:, 1:]

    # Return processed data
    return (context.to_tensor(), targ_in.to_tensor()), targ_out.to_tensor()

# Map the process_text function to create training and validation datasets
train_ds = train_ten.map(process_text, tf.data.AUTOTUNE)
test_ds = test_ten.map(process_text, tf.data.AUTOTUNE)

Encoder using LSTM

In [10]:
rnn_units = 128
class Encoder(tf.keras.layers.Layer):
  def __init__(self, text_processor, units):
    super(Encoder, self).__init__()
    self.text_processor = text_processor
    self.vocab_size = text_processor.vocabulary_size()
    self.units = units
    # converts tokens to vectors
    self.embedding = tf.keras.layers.Embedding(self.vocab_size, units,
                                               mask_zero=True)
    # The RNN layer processes those vectors sequentially.
    # Initialize a Bidirectional LSTM layer
    self.rnn = tf.keras.layers.Bidirectional(
      # Parameter for specifying how the outputs of the forward and backward LSTMs are merged
      merge_mode='sum',
      # Parameter specifying the recurrent layer used for bidirectional processing
      layer=tf.keras.layers.LSTM(
        units,  # Dimensionality of the output space (number of units in LSTM cell)
        return_sequences=True, # Return the full sequence of outputs for each timestep
        recurrent_initializer='glorot_uniform'  # Initializer for the recurrent weights
    )
)
  def call(self, x):
    x = self.embedding(x)
    x = self.rnn(x)
    return x

  def input_to_tensor(self, texts):
    texts = tf.convert_to_tensor(texts)
    if len(texts.shape) == 0:
      texts = tf.convert_to_tensor(texts)[tf.newaxis]
    context = self.text_processor(texts).to_tensor()
    context = self(context)
    return context

Attention Layer

In [11]:
class Attention_Layer(tf.keras.layers.Layer):
    def __init__(self, units, **kwargs):
        super().__init__()
        # Initialize a MultiHeadAttention layer
        self.mha = tf.keras.layers.MultiHeadAttention(key_dim=units, num_heads=1, **kwargs)
        # Initialize a LayerNormalization layer
        self.layernorm = tf.keras.layers.LayerNormalization()
        # Initialize an Add layer
        self.add = tf.keras.layers.Add()

    def call(self, x, context):
        # Perform MultiHeadAttention
        attn_output, attn_scores = self.mha(
            query=x,
            value=context,
            return_attention_scores=True)
        # Calculate mean attention scores across heads
        attn_scores = tf.reduce_mean(attn_scores, axis=1)
        # Add the attention output to the input
        x = self.add([x, attn_output])
        # Apply layer normalization
        x = self.layernorm(x)
        return x


Decoder Layer using LSTM

In [12]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, text_processor, units):
        super(Decoder, self).__init__()
        # Initialize the text processor
        self.text_processor = text_processor
        # Get vocabulary size
        self.vocab_size = text_processor.vocabulary_size()
        # Initialize StringLookup layers for token conversion
        self.char_to_id = tf.keras.layers.StringLookup(
            vocabulary=text_processor.get_vocabulary(),
            mask_token='', oov_token='[UNK]')
        # convert the numerical ids back to text
        self.id_to_char = tf.keras.layers.StringLookup(
            vocabulary=text_processor.get_vocabulary(),
            mask_token='', oov_token='[UNK]',
            invert=True)

        # Define start and end tokens
        self.start_token = self.char_to_id('[START]')
        self.end_token = self.char_to_id('[END]')
        # Define the number of units
        self.units = units
        # 1. The embedding layer converts token IDs to vectors
        self.embedding = tf.keras.layers.Embedding(self.vocab_size,
                                                   units, mask_zero=True)
        # 2. The RNN keeps track of what's been generated so far.
        self.rnn = tf.keras.layers.LSTM(units,
                                         return_sequences=True,
                                         return_state=True,
                                         recurrent_initializer='glorot_uniform')
        # 3. The RNN output will be the query for the attention layer.
        self.attention = Attention_Layer(units)
        # 4. This fully connected layer produces the logits for each
        # output token.
        self.output_layer = tf.keras.layers.Dense(self.vocab_size)

    def call(self, context, x, state=None, return_state=False):
        # 1. Lookup the embeddings
        x = self.embedding(x)
        # 2. Process the target sequence.
        state = self.rnn.get_initial_state(x)
        x, *state = self.rnn(x, initial_state=state)
        # 3. Use the RNN output as the query for the attention over the context.
        x = self.attention(x, context)
        # Step 4. Generate logit predictions for the next token.
        logits = self.output_layer(x)

        if return_state:
            return logits, state
        else:
            return logits

    def get_initial_state(self, context):
        batch_size = tf.shape(context)[0]
        start_tokens = tf.fill([batch_size, 1], self.start_token)
        done = tf.zeros([batch_size, 1], dtype=tf.bool)
        embedded = self.embedding(start_tokens)
        return start_tokens, done, self.rnn.get_initial_state(embedded)[0]

    def tokens_to_text(self, tokens):
        words = self.id_to_char(tokens)
        result = tf.strings.reduce_join(words, axis=-1, separator=' ')
        result = tf.strings.regex_replace(result, '^ *\[START\] *', '')
        result = tf.strings.regex_replace(result, ' *\[END\] *$', '')
        return result

    def get_next_token(self, context, next_token, done, state):
        logits, state = self(
            context, next_token,
            state=state,
            return_state=True)

        next_token = tf.argmax(logits, axis=-1)
        # If a sequence produces an `end_token`, set it `done`
        done = done | (next_token == self.end_token)
        # Once a sequence is done it only produces 0-padding.
        next_token = tf.where(done, tf.constant(0, dtype=tf.int64), next_token)

        return next_token, done, state


Translator

In [13]:
class Translator(tf.keras.Model):
  def __init__(self, units,
               context_vectorization,
               target_vectorization):
    super().__init__()
    # Build the encoder and decoder
    encoder = Encoder(context_vectorization, units)
    decoder = Decoder(target_vectorization, units)

    self.encoder = encoder
    self.decoder = decoder

  def call(self, inputs):
    context, x = inputs
    context = self.encoder(context)
    logits = self.decoder(context, x)
    return logits

  def translate(self,
                texts, *,
                max_length=50):
    # Process the input texts
    context = self.encoder.input_to_tensor(texts)
    batch_size = tf.shape(texts)[0]

    # Setup the loop inputs
    tokens = []
    next_token, done, state = self.decoder.get_initial_state(context)

    for _ in range(max_length):
      # Generate the next token
      next_token, done, state = self.decoder.get_next_token(
          context, next_token, done,  state)

      # Collect the generated tokens
      tokens.append(next_token)

      if tf.executing_eagerly() and tf.reduce_all(done):
        break

    # Stack the lists of tokens and attention weights.
    tokens = tf.concat(tokens, axis=-1)   # t*[(batch 1)] -> (batch, t)

    result = self.decoder.tokens_to_text(tokens)
    return result

In [14]:
model = Translator(rnn_units, context_vectorization, target_vectorization)


In [15]:
def fn_loss(y_true, y_pred):
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction='none')
    loss = loss_fn(y_true, y_pred)
    return tf.reduce_mean(loss)

In [16]:
def fn_acc(y_true, y_pred):
    y_pred = tf.argmax(y_pred, axis=-1)
    y_pred = tf.cast(y_pred, y_true.dtype)
    match = tf.cast(y_true == y_pred, tf.float32)
    total_tokens = tf.cast(tf.size(y_true), tf.float32)
    return tf.reduce_sum(match) / total_tokens

In [17]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
model.compile(optimizer=optimizer,
              loss=fn_loss,
              metrics=[fn_acc, fn_loss], run_eagerly=True)

In [18]:
model.evaluate(test_ds, steps=20, return_dict=True)



{'loss': 3.9005684852600098,
 'fn_acc': 0.00015413903747685254,
 'fn_loss': 8.516472816467285}

In [None]:
# Train the model using the `fit` method.

# Arguments:
# train_ds.repeat(): Training dataset. The `repeat()` method repeats the dataset indefinitely.
# epochs=100: Number of epochs (iterations over the entire dataset).
# steps_per_epoch=20: Number of steps (batches) to yield from the training dataset in each epoch.
# validation_data=test_ds: Validation dataset.
# validation_steps=20: Number of steps (batches) to yield from the validation dataset after each epoch.

history = model.fit(
    train_ds.repeat(),
    epochs=100,
    steps_per_epoch = 20,
    validation_data=test_ds,
    validation_steps = 20)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

Epoch 94/100
Epoch 95/100
Epoch 96/100
Epoch 97/100
Epoch 98/100
Epoch 99/100
Epoch 100/100


In [None]:
plt.plot(history.history['fn_loss'], label='loss')
plt.plot(history.history['val_fn_loss'], label='val_loss')
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch #')
plt.ylabel('CE')
plt.legend()

In [None]:
plt.plot(history.history['fn_acc'], label='accuracy')
plt.plot(history.history['val_fn_acc'], label='val_accuracy')
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch #')
plt.ylabel('CE')
plt.legend()

In [None]:
import nltk
from nltk.translate.bleu_score import sentence_bleu

result = model.translate(['Ich habe die Geduld dafür'])
predicted = result[0].numpy().decode()
actual = "I have the patience for this"
print("Predicted:", predicted)
print("Actual:", actual)

# Calculate BLEU score
bleu_score = sentence_bleu([predicted], actual)

print("BLEU Score for Reference:", bleu_score)