In [2]:
import io
import os
import re
import time
import unicodedata

import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers.experimental import preprocessing

## 1. Data preprocessing

In [3]:
input_file = "data/divina_textonly.txt"
target_file = "data/divina_syll_textonly.txt"

In [4]:
input_text_raw = open(input_file, "rb").read().decode(encoding="utf-8")
target_text_raw = open(target_file, "rb").read().decode(encoding="utf-8")
print("Length of input text: {} characters".format(len(input_text_raw)))
print("Length of target text: {} characters".format(len(target_text_raw)))

Length of input text: 578077 characters
Length of target text: 892871 characters


In [5]:
input_vocab = sorted(set(input_text_raw))
target_vocab = sorted(set(target_text_raw))
input_vocab_size = len(input_vocab)
target_vocab_size = len(target_vocab)

In [6]:
print("Input vocab size: {}".format(input_vocab_size))
print("Target vocab size: {}".format(target_vocab_size))

Input vocab size: 80
Target vocab size: 81


The *preprocess* function adds the start and end symbols to each line and eliminates the empty ones.

In [7]:
def preprocess(text):
    """
    For each line in the file, add start symbol "^" in the beginning and end symbol "$" in the end
    """
    return ["^" + line.strip() + "$" for line in text.split("\n") if line.strip() != ""]

input_text_prepr = preprocess(input_text_raw)
target_text_prepr = preprocess(target_text_raw)

The tokenizer encodes each line into a tensor of char-indexes and for simplicity fits only on the target's vocabulary.

In [8]:
tokenizer = tf.keras.preprocessing.text.Tokenizer(filters="", char_level=True, lower=False)
tokenizer.fit_on_texts(target_text_prepr)

input_text_lines_enc = tokenizer.texts_to_sequences(input_text_prepr)
target_text_lines_enc = tokenizer.texts_to_sequences(target_text_prepr)

Padding is required in order to have a non-ragged tensor to feed to the neural network.

In [9]:
def pad(x):
    return tf.keras.preprocessing.sequence.pad_sequences(x, padding="post")

In [10]:
input_text = pad(input_text_lines_enc)
target_text = pad(target_text_lines_enc)

## 2. Training

In [11]:
input_train, input_test, target_train, target_test = train_test_split(input_text, target_text)

The dataset is created by grouping the lines in batches and by shuffling them.

Each input's line is in correspondence with its target.

In [12]:
BUFFER_SIZE = len(input_train)
BATCH_SIZE = 64
steps_per_epoch = len(input_train) // BATCH_SIZE

embedding_dim = 256
units = 1024
vocab_size = len(tokenizer.word_index) + 1 # the +1 is added to take into account the id 0 of the padding

max_length_targ, max_length_inp = target_text.shape[1], input_text.shape[1]

dataset = tf.data.Dataset.from_tensor_slices((input_train, target_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

The encoder and decoder are constituted of an embedding layer, followed by a GRU.

The decoder takes the final hidden state of the encoder as its initial hidden state and outputs logits of size equal to the one of the target's vocabulary.

In [13]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.enc_units, return_state=True)

    def call(self, x, hidden=None):
        x = self.embedding(x)

        if hidden is None:
            hidden = self.gru.get_initial_state(x)

        output, state = self.gru(x, initial_state=hidden)

        return output, state


class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(         
            self.dec_units,
            return_sequences=True,
            return_state=True,
            recurrent_initializer="glorot_uniform",
            )
        self.fc = tf.keras.layers.Dense(vocab_size)

    def call(self, x, enc_hidden):

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x, initial_state=enc_hidden)

        # output shape == (batch_size * 1, hidden_size)
        output = tf.reshape(output, (-1, output.shape[2]))

        # output shape == (batch_size, vocab)
        x = self.fc(output)

        return x, state

In [14]:
encoder = Encoder(vocab_size, embedding_dim, units, BATCH_SIZE)

decoder = Decoder(vocab_size, embedding_dim, units, BATCH_SIZE)

The loss is calculated using Sparse Categorical Crossentropy and the loss of the padding is masked.


In [15]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction="none")

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

In [16]:
checkpoint_dir = "./training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer, encoder=encoder, decoder=decoder)

We use teacher forcing feeding the target as the next input to the decoder.

In [18]:
@tf.function
def train_step(inp, targ, enc_hidden):
    loss = 0

    with tf.GradientTape() as tape:
        _, enc_hidden = encoder(inp, enc_hidden)

        dec_hidden = enc_hidden

        dec_input = tf.expand_dims([tokenizer.word_index["^"]] * BATCH_SIZE, 1)

        for t in range(1, targ.shape[1]):
            # passing enc_output to the decoder
            predictions, dec_hidden = decoder(dec_input, dec_hidden)

            loss += loss_function(targ[:, t], predictions)

            # using teacher forcing
            dec_input = tf.expand_dims(targ[:, t], 1)

    batch_loss = loss / int(targ.shape[1])

    variables = encoder.trainable_variables + decoder.trainable_variables

    gradients = tape.gradient(loss, variables)

    optimizer.apply_gradients(zip(gradients, variables))

    return batch_loss

In [19]:
EPOCHS = 20

for epoch in range(EPOCHS):
    start = time.time()

    enc_hidden = None
    total_loss = 0

    for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
        batch_loss = train_step(inp, targ, enc_hidden)
        total_loss += batch_loss

        if batch % 100 == 0:
            print(f"Epoch {epoch+1} Batch {batch} Loss {batch_loss.numpy():.4f}")
    # saving (checkpoint) the model every 2 epochs
    if (epoch + 1) % 2 == 0:
        checkpoint.save(file_prefix=checkpoint_prefix)

    print(f"Epoch {epoch+1} Loss {total_loss/steps_per_epoch:.4f}")
    print(f"Time taken for 1 epoch {time.time()-start:.2f} sec\n")

Epoch 1 Batch 0 Loss 3.2395
Epoch 1 Batch 100 Loss 1.3291
Epoch 1 Loss 1.5451
Time taken for 1 epoch 69.52 sec

Epoch 2 Batch 0 Loss 1.2653
Epoch 2 Batch 100 Loss 1.1384
Epoch 2 Loss 1.1502
Time taken for 1 epoch 25.84 sec

Epoch 3 Batch 0 Loss 1.0949
Epoch 3 Batch 100 Loss 1.0737
Epoch 3 Loss 1.0515
Time taken for 1 epoch 25.52 sec

Epoch 4 Batch 0 Loss 1.0180
Epoch 4 Batch 100 Loss 0.9723
Epoch 4 Loss 0.9806
Time taken for 1 epoch 25.29 sec

Epoch 5 Batch 0 Loss 0.9435
Epoch 5 Batch 100 Loss 0.9448
Epoch 5 Loss 0.9251
Time taken for 1 epoch 25.42 sec

Epoch 6 Batch 0 Loss 0.8646
Epoch 6 Batch 100 Loss 0.9172
Epoch 6 Loss 0.8785
Time taken for 1 epoch 25.65 sec

Epoch 7 Batch 0 Loss 0.8393
Epoch 7 Batch 100 Loss 0.8494
Epoch 7 Loss 0.8420
Time taken for 1 epoch 25.55 sec

Epoch 8 Batch 0 Loss 0.7956
Epoch 8 Batch 100 Loss 0.8193
Epoch 8 Loss 0.8095
Time taken for 1 epoch 25.74 sec

Epoch 9 Batch 0 Loss 0.7669
Epoch 9 Batch 100 Loss 0.7755
Epoch 9 Loss 0.7767
Time taken for 1 epoch 25.

## 3. Translation and Attention Plot

We define the *evaluate* function to preprocess the sentence in input and to get the predicted ids of the translation.

The ids of the translation are obtained by applying *argmax* to the predicted logits of the decoder.

We begin with the id of the start symbol and, at each new step, we pass to the decoder the id it has just output.

The translation stops when the end symbol is reached.

In [36]:
def evaluate(sentence):
    attention_plot = np.zeros((target_text.shape[1], input_text.shape[1]))

    inputs = [tokenizer.word_index[i] for i in list(map(str, sentence))]
    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_length_inp, padding="post")
    inputs = tf.convert_to_tensor(inputs)

    result = ""

    hidden = [tf.zeros((1, units))]
    enc_out, enc_hidden = encoder(inputs, hidden)

    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([tokenizer.word_index["^"]], 0)

    for t in range(max_length_targ):
        predictions, dec_hidden, attention_weights= decoder(dec_input, dec_hidden, enc_out)

        # storing the attention weights to plot later on
        attention_weights = tf.reshape(attention_weights, (-1,))
        attention_plot[t] = attention_weights.numpy()

        predicted_id = tf.argmax(predictions[0]).numpy()

        result += tokenizer.index_word[predicted_id] + " "

        if tokenizer.index_word[predicted_id] == "$":
            return result, sentence, attention_plot

        # the predicted ID is fed back into the model
        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence, attention_plot

In [33]:
# function for plotting the attention weights
def plot_attention(attention, sentence, predicted_sentence):
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap="viridis")

    fontdict = {"fontsize": 14}

    ax.set_xticklabels([""] + sentence, fontdict=fontdict, rotation=90)
    ax.set_yticklabels([""] + predicted_sentence, fontdict=fontdict)

    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.show()

In [37]:
def translate(sentence):
    result, sentence, attention_plot = evaluate(sentence)

    print("Input:", sentence)
    print("Predicted translation:", result)

    attention_plot = attention_plot[:len(result.split(' ')),:len(sentence.split(' '))]
    plot_attention(attention_plot, sentence.split(' '), result.split(' '))

In [None]:
translate("^ Nel mezzo del cammin di nostra vita $")