## **0. Preliminary Settings**

First of all, we need to clone the repository to get access to the code and use utility functions inside the notebook

In [None]:
!git clone https://github.com/mazzio97/DeepComedy.git

project_path = 'DeepComedy/'

This folder is then added to the system path so that the modules can be used inside the notebook

In [None]:
import sys

sys.path.append(project_path + 'src')

Finally, the *Divine Comedy* is loaded and stored in a variable

In [None]:
with open(project_path + 'res/divine_comedy.txt', 'r', encoding='ISO-8859-1') as f:
  divine_comedy = f.read()

print(divine_comedy[:231])
print('\n\n[...]\n\n')
print(divine_comedy[-266:])

Also, we set seeds for Python, Numpy/Keras and Tensorflow to guarantee the maximal level of reproducibility

> Though, the results could still differ a little bit due to other randomized routines called during the execution and the inner stochasticity introduced by parallel computing

In [None]:
import random
import numpy as np
import tensorflow as tf

random.seed(0)
np.random.seed(0)
tf.random.set_seed(0)

## **1. Data Processing**

### ***1.1 Text Mark***

We use the provided function `mark` to map the original *Divine Comedy* into a marked version containing:

* a marker both at the beginning and at the end of each *cantica*

* a marker both at the beginning and at the end of each *canto*

* a marker between each couple of *tercets*

In [None]:
from text_processing.markers import mark

divine_comedy_marked = mark(divine_comedy)
print(divine_comedy_marked[:260])
print('\n\n[...]\n\n')
print(divine_comedy_marked[-319:])

### ***1.2 Extracting the Verses***

We want to build a dataset in which the input sequence represents a piece of the *Divine Comedy* going from verse *i* to verse *i+n* and the target sequence represents a piece of the *Divine Comedy* going from verse *i* to verse *i+n+1*, thus we need at first to split the dataset and get a list of verses

In [None]:
divine_comedy_split = divine_comedy_marked.split('\n')

for i, verse in enumerate(divine_comedy_split[:20]):
  print(f'{i+1:02} --> {verse}')

In [None]:
print(len(divine_comedy_split))

### ***1.3 Building the Dataset***

As we know what is the rhyming scheme of the *Divine Comedy*, we know that we will need at least the last *3* verses (*3 actual verses or 2 actual verses + 1 marker verse to indicate the end of the tercet*) to predict a correct fifth one, so we set `seq_length = 3`

> Differently from single-token models, here we have a lower amount of samples and a greater variability (indeed, the dataset is less dense), thus we can choose a `step_length` of *1* and a larger `train_val_split`

In [None]:
seq_length = 3
step_length = 1
batch_size = 64
train_val_split = 0.7

tot_samples = int((len(divine_comedy_split) - seq_length) / step_length)
train_samples = round(tot_samples * train_val_split)

print('Train Samples:', train_samples)
print('  Val Samples:', tot_samples - train_samples)

Now, we map the list of verses into a dataset taking *4* verses per time, and splitting them into an input string of the first *3* verses and a target string containing all the *4* verses taken into consideration

In [None]:
from tensorflow.data import Dataset
from tensorflow.strings import reduce_join

def split_input_target(chunk):
  input_text = reduce_join(chunk[:-1], separator='\n') + '\n'
  target_text = reduce_join(chunk, separator='\n') + '\n'
  return input_text, target_text

dataset = Dataset.from_tensor_slices(divine_comedy_split)
dataset = dataset.window(seq_length + 1, step_length, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(seq_length + 1))
dataset = dataset.map(split_input_target).shuffle(tot_samples, seed=0)

Finally, we encode each block of the comedy using the provided `word_tokenizer` to tokenize the text into words, including punctuation

> Some special tokens are reserved to the markers

In [None]:
from text_processing.tokenizers import word_tokenizer

tokenizer = word_tokenizer(divine_comedy)
print(tokenizer.vocab_size, 'tokens:')
print()
for i, token in enumerate(tokenizer.tokens[:40]):
  print("'{}'".format('\\n' if token == '\n' else token))

In [None]:
max_inp = 0
max_tar = 0

for inp, tar in dataset:
  max_inp = max(max_inp, len(tokenizer.encode(inp.numpy())))
  max_tar = max(max_tar, len(tokenizer.encode(tar.numpy())))

# add two for the start/end tokens
max_inp = max_inp + 2
max_tar = max_tar + 2

print('Max  Input:', max_inp)
print('Max Target:', max_tar)

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

def encode_dataset(input_dataset, target_dataset):
  def encode_sample(inp, tar):
    inp = [tokenizer.vocab_size] + tokenizer.encode(inp.numpy()) + [tokenizer.vocab_size+1]
    tar = [tokenizer.vocab_size] + tokenizer.encode(tar.numpy()) + [tokenizer.vocab_size+1]

    return pad_sequences([inp], maxlen=max_inp, padding='post')[0], pad_sequences([tar], maxlen=max_tar, padding='post')[0]
  return tf.py_function(encode_sample, [input_dataset, target_dataset], [tf.int64, tf.int64])

train_dataset = dataset.take(train_samples).map(encode_dataset)
train_dataset = train_dataset.cache()
train_dataset = train_dataset.batch(batch_size, drop_remainder=True)
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)

val_dataset = dataset.take(tot_samples - train_samples).map(encode_dataset)
val_dataset = val_dataset.cache()
val_dataset = val_dataset.batch(batch_size, drop_remainder=True)
val_dataset = val_dataset.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
for input, target in train_dataset.take(1):
  input = input.numpy()[0]
  target = target.numpy()[0]

  print(f'Input  Shape: {input.shape}')
  print(f'Target Shape: {target.shape}')
  print()

  print('INPUT:\n')
  print(input)
  print(tokenizer.decode([token for token in input if 0 < token < tokenizer.vocab_size]))
  print('\n\n---------------------\n\n')
  print('TARGET:\n')
  print(target)
  print(tokenizer.decode([token for token in target if 0 < token < tokenizer.vocab_size]))

## **2. Model**

### ***2.1 Architecture***

The model is the combination of two different models:

* an *Encoder*, made up of an *Embedding* and a *GRU* layer, which takes the input string and returns both the output and the hidden state of the *GRU*

* a *Decoder*, made up of an *Embedding* and a *GRU* layer as well as an *Attention* layer, which takes in input the latest decoded token (initially, this is the start token), the latest hidden state (initially, this is the hidden state of the encoder) and the output of the encoder, and returns the hidden state of the *GRU* and the output processed through a *Dense* layer

> The variable parameters of the model are:
> * the dimension of the *Embedding* layer
> * the number of units of the *GRU* layer
> * the kind of *Attention* layer, which can be either:
>   - *Additive* (https://arxiv.org/pdf/1409.0473.pdf, Bahdanau et al.)
>   -  *Multiplicative* (https://arxiv.org/pdf/1508.04025.pdf, Luong et al.)
> * the dropout rate 

In [None]:
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Embedding, GRU, Reshape, Attention, AdditiveAttention, Concatenate, Dense
from tensorflow.keras.utils import plot_model

embedding_dim = 256
gru_units = 1024
attention = 'ADD'
dropout = 0.2

#### *2.1.1 Encoder*

In [None]:
encoder_input = Input((max_inp,), name='encoder_input')
encoder_embedding = Embedding(tokenizer.vocab_size + 2, embedding_dim, name='encoder_embedding')(encoder_input)
encoder_gru = GRU(
    gru_units, return_sequences=True, return_state=True, stateful=False,
    dropout=dropout, recurrent_initializer='glorot_uniform', name='encoder_gru'
)(encoder_embedding)

encoder = Model(encoder_input, encoder_gru, name='Encoder')

display(plot_model(encoder, show_shapes=True, show_layer_names=False, rankdir='LR'))
print()
encoder.summary()

#### *2.1.2 Decoder*

In [None]:
decoder_input = Input((1,), name='decoder_input')
hidden_state = Input((gru_units,), name='hidden_state')
encoder_output = Input((max_inp, gru_units), name='encoder_output')
expanded_hidden = Reshape((1, gru_units), name='expanded_hidden')(hidden_state)

if attention == 'ADD':
  decoder_attention = AdditiveAttention(name='decoder_attention')([expanded_hidden, encoder_output])
elif attention == 'MUL':
  decoder_attention = Attention(name='decoder_attention')([expanded_hidden, encoder_output])

decoder_embedding = Embedding(tokenizer.vocab_size + 2, embedding_dim, name='decoder_embedding')(decoder_input)
decoder_concatenate = Concatenate(name='decoder_concatenate')((decoder_attention, decoder_embedding))
decoder_output, decoder_hidden = GRU(
    gru_units, return_sequences=True, return_state=True, stateful=False,
    dropout=dropout, recurrent_initializer='glorot_uniform', name='decoder_gru'
)(decoder_concatenate)
shrunk_output = Reshape((gru_units,), name='shrunk_output')(decoder_output)
output = Dense(tokenizer.vocab_size + 2, name='output')(shrunk_output)

decoder = Model([decoder_input, hidden_state, encoder_output], [output, decoder_hidden], name='Decoder')

display(plot_model(decoder, show_shapes=True, show_layer_names=False))
print()
decoder.summary()

### ***2.2 Training***

We need to write a custom training loop as, during the decoding phase, we will need to add one token at a time as well as setting the correct input state, then we will proceed with the training phase, storing every `epochs_interval` epochs the weights of the model in a file that indicates the values of its parameters

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.metrics import SparseCategoricalAccuracy

history = {'train loss': [], 'train acc': [], 'val loss': [], 'val acc': []}

optimizer = Adam()
loss_object = SparseCategoricalCrossentropy(from_logits=True, reduction='none')
acc_object = SparseCategoricalAccuracy()

def loss_function(real, pred):
  loss_ = loss_object(real, pred)
  mask = tf.cast(tf.math.logical_not(tf.math.equal(real, 0)), dtype=loss_.dtype)
  return tf.reduce_mean(loss_ * mask)

@tf.function
def train_step(inp, tar):
  loss = 0
  acc = 0
  with tf.GradientTape() as tape:
    # first of all, we callget the outputs from the encoder
    encoder_output, hidden_state = encoder(inp)
    
    # the decoder input is initially made up of the start token only
    decoder_input = tf.expand_dims([tokenizer.vocab_size] * batch_size, 1)

    # for each token in the target
    for t in range(1, tar.shape[1]):
      # we get both the output and the hidden state of the decoder (this last one will be used in the next prediction)
      predictions, hidden_state = decoder([decoder_input, hidden_state, encoder_output])

      # then, we use the teacher forcing technique by feeding the target as the next input
      decoder_input = tf.expand_dims(tar[:, t], 1)

      # finally, we update the total loss and accuracy
      loss += loss_function(tar[:, t], predictions)
      acc += acc_object(tar[:, t], predictions)

  # we apply the backpropagation both to the encoder and decoder variables
  variables = encoder.trainable_variables + decoder.trainable_variables
  gradients = tape.gradient(loss, variables)
  optimizer.apply_gradients(zip(gradients, variables))

  # and finally, we return the loss over the entire batch
  return loss / int(tar.shape[1]), acc / int(tar.shape[1])

@tf.function
def val_step(inp, tar):
  loss = 0
  acc = 0
  with tf.GradientTape() as tape:
    encoder_output, hidden_state = encoder(inp)
    decoder_input = tf.expand_dims([tokenizer.vocab_size] * batch_size, 1)
    for t in range(1, tar.shape[1]):
      predictions, hidden_state = decoder([decoder_input, hidden_state, encoder_output])
      decoder_input = tf.expand_dims(tar[:, t], 1)
      loss += loss_function(tar[:, t], predictions)
      acc += acc_object(tar[:, t], predictions)
  return loss / int(tar.shape[1]), acc / int(tar.shape[1])

In [None]:
import time
from utils.checkpoint import restore_checkpoint, save_checkpoint

encoder_checkpoint_signature = 'Encoder seq_{} stp_{} btc_{} tvs_{} emb_{} unt_{} att_{} drp_{} epc_'.format(
    seq_length, step_length, batch_size, train_val_split,
    embedding_dim, gru_units, attention, dropout
)
decoder_checkpoint_signature = 'Decoder seq_{} stp_{} btc_{} tvs_{} emb_{} unt_{} att_{} drp_{} epc_'.format(
    seq_length, step_length, batch_size, train_val_split,
    embedding_dim, gru_units, attention, dropout
)
checkpoint_directory = 'ckpt/'
initial_epoch = restore_checkpoint(encoder, checkpoint_directory, encoder_checkpoint_signature, verbose=True)
assert initial_epoch == restore_checkpoint(decoder, checkpoint_directory, decoder_checkpoint_signature, verbose=False)

epochs = 100
epochs_interval = 10
batches_interval = 20

for epoch in range(initial_epoch, epochs):
  start = time.time()
  print(f'Starting Epoch {epoch+1}/{epochs}')

  total_loss = 0
  total_acc = 0
  for (batch, (inp, tar)) in enumerate(train_dataset):
    batch_loss, batch_acc = train_step(inp, tar)
    total_loss += batch_loss
    total_acc += batch_acc
    if (batch + 1) % batches_interval == 0:
      print(f'  > Batch {batch+1}', end=' \t\t ')
      print(f'- train_loss: {batch_loss:.4f} - train_acc: {batch_acc:.4f}')

  history['train loss'].append(total_loss / (batch + 1))
  history['train acc'].append(total_acc / (batch + 1))

  total_loss = 0
  total_acc = 0
  for (batch, (inp, tar)) in enumerate(val_dataset):
    batch_loss, batch_acc = val_step(inp, tar)
    total_loss += batch_loss
    total_acc += batch_acc

  history['val loss'].append(total_loss / (batch + 1))
  history['val acc'].append(total_acc / (batch + 1))

  elapsed = time.time() - start
  print(f'Ending Epoch {epoch+1}/{epochs}', end=' \t ')
  print(f'- train_loss: {history["train loss"][-1]:.4f} - train_acc: {history["train acc"][-1]:.4f}', end=' ')
  print(f'- val_loss: {history["val loss"][-1]:.4f} - val_acc: {history["val acc"][-1]:.4f}')
  print(f'Elapsed Time {elapsed:.2f}s\n')

  save_checkpoint(encoder, epoch, checkpoint_directory, encoder_checkpoint_signature, epochs_interval, verbose=True)
  save_checkpoint(decoder, epoch, checkpoint_directory, decoder_checkpoint_signature, epochs_interval, verbose=False)

Here's a graphical representation of the improvement of the model, with respect both to the loss and the accuracy, across the epochs

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

if epochs - initial_epoch > 0:
  sns.set_style('darkgrid')
  sns.set_context('notebook')
  plt.figure(figsize=(12, 5))

  x = np.arange(initial_epoch, epochs) + 1

  plt.subplot(1, 2, 1)
  plt.plot(x, history['train loss'], label='train')
  plt.plot(x, history['val loss'], label='val')
  plt.legend()
  plt.title('Loss')

  plt.subplot(1, 2, 2)
  plt.plot(x, history['train acc'], label='train')
  plt.plot(x, history['val acc'], label='val')
  plt.legend()
  plt.title('Accuracy')

  plt.show()

## **3. Generation**

The generation is based on the trained model and it uses a `temperature_factor` to allow some degree of randomness

> The next token is chosen among a subset of those having a probability which is at least `1 / temperature_factor` with respect to the maximal one

> It goes without saying that a higher `temperature_factor` leads to a more explorative generation, while a lower `temperature_factor` leads to a more conservative one (in particular, with `temperature_factor = 1` the generation is completely deterministic)

In [None]:
from tensorflow.nn import softmax
from text_processing.markers import unmark, MARKERS

newline_token = tokenizer.encode('\n')[0]

def evaluate(inp_list, max_length=30, temperature_factor=1, verbose=False):
  # the encoder input is the tokenized string obtained from the given input list surrounded by a start and an end token
  encoder_input = [tokenizer.vocab_size] + tokenizer.encode('\n'.join(inp_list)) + [tokenizer.vocab_size + 1]
  encoder_input = pad_sequences([encoder_input], maxlen=max_inp, padding='post')

  # initally, the decoder input is the start token 
  decoder_input = tf.expand_dims([tokenizer.vocab_size], 0)

  # the final output of the evaluation (initially, this is an empty list)
  output = []

  # we repeat the same process of the training phase (teacher forcing) to reach the  get the correct hidden state up to the final input word
  encoder_output, hidden_state = encoder(encoder_input)
  for t in range(1, encoder_input.shape[1]):
    predictions, hidden_state = decoder([decoder_input, hidden_state, encoder_output])
    decoder_input = tf.expand_dims(encoder_input[:, t], 1)

  # then we generate the remaining part
  for t in range(max_length):
    logits, hidden_state = decoder([decoder_input, hidden_state, encoder_output])

    # we get the probabilities for the decoded token (special tokens excluded)
    probabilities = softmax(logits[0, :tokenizer.vocab_size]).numpy()

    # we take a subset of possible tokens whose probability is at least 1/temperature_factor of the maximal one
    indices = np.arange(tokenizer.vocab_size)[probabilities >= probabilities.max() / temperature_factor]

    # we renormalize this subset using, again, a softmax activation
    probabilities = softmax(probabilities[probabilities >= probabilities.max() / temperature_factor]).numpy()
    
    # the id is randomly chosen among the indices according to the computed probabilities
    predicted_id = np.random.choice(indices, size=1, p=probabilities)[0]
    
    # if the token coincides with the nd token or the newline token, the generation is interrupted
    if predicted_id == newline_token or predicted_id >= tokenizer.vocab_size:
      break

    # totherwise the token is replaced as input and appended to the final output
    decoder_input = tf.expand_dims([predicted_id], 0)
    output.append(predicted_id)

    if verbose:
      print(tokenizer.decode([predicted_id]), end='')

  return output

def generate(
    input_string=divine_comedy_marked[:386], # first three tercets of the comedy
    max_iterations=250, end_marker=MARKERS['canto end'],
    temperature_factor=1.0, verbose=False
):
  # at the beginning, the generated string is the encoding of the input string (plus a newline character)
  generated_string = input_string

  for i in range(max_iterations):
    # the input list is made up of the last 'seq_length' verses (-1 for the last blank verse to be filled)
    input_list = generated_string.split('\n')[-seq_length-1:]

    # the generated verse is then decoded
    generated_verse = tokenizer.decode(evaluate(input_list, temperature_factor=temperature_factor, verbose=verbose))
    if verbose:
      print()

    # if the verse coincides with the end marker, the generation is interrupted, otherwise it is appended with a newline
    if generated_verse == end_marker:
      break
    generated_string += generated_verse + '\n'
  
  # we finally return the decoded (and unmarked) string, excluding the input provided by the user
  return unmark(generated_string[len(input_string):])

In [None]:
from metrics.store import store

result_dir = results_path + model_name
result_file = 'seq_{} stp_{} btc_{} tvs_{} emb_{} unt_{} att_{} drp_{}'.format(
    seq_length, step_length, batch_size, train_val_split,
    embedding_dim, gru_units, attention, dropout
)

print('Generating Temperature 1...')
samples = [generate(temperature_factor=1)]
store(result_dir, result_file, temperature=1, sample_texts=samples, original_text=divine_comedy)

repetitions = 5
for temperature in [2, 3, 5, 10]:
  samples = []
  for r in range(repetitions):
    print(f'Generating Temperature {temperature} (repetition {r+1})...')
    samples.append(generate(temperature_factor=temperature))
  store(result_dir, result_file, temperature, sample_texts=samples, original_text=divine_comedy)

In [None]:
generated_canto = generate(temperature_factor=3.0, verbose=True)

In [None]:
print(generated_canto)