In [4]:
import warnings
warnings.filterwarnings('ignore')
import os
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pathlib
import shutil
import einops
import typing
import random
import textwrap
from typing import Any, Tuple
import tensorflow as tf
import keras
import keras_nlp
#import tensorflow_text as tf_text
from Constants import *

In [5]:
os.listdir(dataset_dir)

['spa.txt', '_about.txt']

In [6]:
def load_data(path):
  text = path.read_text(encoding = 'utf-8')

  lines = text.splitlines()
  pairs = [line.split('\t') for line in lines]

  context = np.array([context for target, context, _ in pairs])
  target = np.array([target for target, context, _ in pairs])

  return target, context

In [11]:
target_raw, context_raw = load_data(data_file)
print("Expected output: \n", '\n'.join(textwrap.wrap(context_raw[-1])))
print()
print("Expected output: \n", '\n'.join(textwrap.wrap(target_raw[-1])))

Expected output: 
 Un día, me desperté y vi que Dios me había puesto pelo en la cara. Me
lo afeité. Al día siguiente, vi que Dios me lo había vuelto a poner en
la cara, así que me lo afeité otra vez. Al tercer día, cuando vi que
Dios me había puesto pelo en la cara de nuevo, decidí que Dios se
saliera con la suya. Por eso tengo barba.

Expected output: 
 One day, I woke up to find that God had put hair on my face. I shaved
it off. The next day, I found that God had put it back on my face, so
I shaved it off again. On the third day, when I found that God had put
hair back on my face again, I decided to let God have his way. That's
why I have a beard.


In [None]:
BUFFER_SIZE = len(context_raw)

train = np.random.uniform(size = (len(target_raw), )) < 0.8

raw_train_data = (
    tf.data.Dataset
    .from_tensor_slices((context_raw[train], target_raw[train]))
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
)

raw_val_data = (
    tf.data.Dataset
    .from_tensor_slices((context_raw[~train], target_raw[~train]))
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
)

In [None]:
for string_example_context, string_example_target in raw_train_data.take(1):
    print(string_example_context[:5])
    print()
    print(string_example_target[:5])
    break

In [None]:
sample_text = tf.constant("¿Todavía está en casa?")

print(sample_text.numpy())
print(tf_text.normalize_utf8(sample_text, "NFKD").numpy())

##### Preprocessing steps include: 
- Splitting accented characters.
- Keep spaces, alphabets and specific punctuations.
- Add spaces around punctuations.
- Strip whitespace.
- Add start and end tokens around sentences.

In [None]:
def text_preprocessing(text):
    
    text = tf_text.normalize_utf8(text, "NFKD")
    text = tf.strings.lower(text)
    text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
    text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
    text = tf.strings.strip(text)
    text = tf.strings.join(['[START]', text, '[END]'], separator = ' ')
    
    return text

In [None]:
print(sample_text.numpy().decode())
print(text_preprocessing(sample_text).numpy().decode())

In [None]:
context_processor = keras.TextVectorization(
    standardize = text_preprocessing,
    max_tokens = MAX_VOCAB_SIZE,
    ragged = True
)

In [None]:
context_processor.adapt(raw_train_data.map(lambda context, target: context))

In [None]:
context_processor.get_vocabulary()[:10]

In [None]:
target_processor = keras.TextVectorization(
    standardize = text_preprocessing,
    max_tokens = MAX_VOCAB_SIZE,
    ragged = True
)

In [None]:
target_processor.adapt(raw_train_data.map(lambda context, target: target))

In [None]:
target_processor.get_vocabulary()[:10]

In [None]:
sample_tokens = context_processor(string_example_context)
sample_tokens[:3, :]

In [None]:
context_vocab = np.array(context_processor.get_vocabulary())
tokens = context_vocab[sample_tokens[0].numpy()]
' '.join(tokens)

In [None]:
plt.subplot(1, 2, 1)
plt.pcolormesh(sample_tokens.to_tensor())
plt.title("Token ID's")

plt.subplot(1, 2, 2)
plt.pcolormesh(sample_tokens.to_tensor() != 0)
plt.title("Mask")

In [None]:
def data_processor(context, target):
    context = context_processor(context).to_tensor()
    target = target_processor(target)
    target_input = target[:, : -1].to_tensor()
    target_output = target[:, 1 :].to_tensor()
    
    return (context, target_input), target_output

train_set = raw_train_data.map(data_processor, tf.data.AUTOTUNE)
val_set = raw_val_data.map(data_processor, tf.data.AUTOTUNE)

In [None]:
for (sample_context_token, sample_target_input), sample_target_output in train_set.take(1):
    print(sample_context_token[0, :10].numpy())
    print()
    print(sample_target_input[0, :10].numpy())
    print(sample_target_output[0, :10].numpy())

In [None]:
class Encoder(tf.keras.layers.Layer):
    
    def __init__(self, text_processor, units):
        super(Encoder, self).__init__()
        self.text_processor = text_processor
        self.vocab_size = text_processor.vocabulary_size()
        self.units = units
        
        # Converting tokens to vectors
        self.embedding = tf.keras.layers.Embedding(self.vocab_size,
                                                   units
                                                   mask_zero = True)
        
        # Processing vectors sequentially
        self.rnn = tf.keras.layers.Bidirectional(merge_mode = "sum",
                                                 layers = tf.keras.layers.GRU(units,
                                                                              return_sequences = True,
                                                                              recurrent_initializer = "glorot_unitform"))
        
    def call(self, x):
        shape_checker = ShapeChecker()
        shape_checker(x, 'batch s')
        
        # Embedding layers gets token for embedding vector
        x = self.embedding(x)
        shape_checker(x, "batch s units")
        
        # GRU processes embeddings
        x = self.rnn(x)
        shape_checker(x, "batch s units")
        
        return x
    
    def convert_input(self, texts):
        texts = tf.convert_to_tensor(texts)
        if len(texts.shape) == 0:
            texts = tf.convert_to_tensor(texts)[tf.newaxis]
        context = self.text_processor(texts).to_tensor()
        context = self(context)
        
        return context

In [None]:
# Encode input sequence

encoder = Encoder(context_processor, UNITS)
sample_context = encoder(sample_context_token)

print(f"Context tokens, shape (batch, s): {sample_context_token.shape}")
print(f"Encoder output, shape (batch, s): {sample_context.shape}")

In [None]:
class CrossAttention(tf.keras.layers.Layer):
    
    def __init__(self, units, **kwargs):
        super().__init__()
        self.multi_head_attention = tf.keras.MultiHeadAttention(key_dims = units, num_heads = 1, **kwargs)
        self.norm_layer = tf.keras.layers.LayerNormalization()
        self.add = tf.keras.layers.Add()
        
    def call(self, x, context):
        shape_checker = ShapeChecker()
        
        shape_checker(x, "batch t units")
        shape_checker(context, 'batch s units')
        
        attention_output, attention_score = self.multi_head_attention(query = x,
                                                                      value = context,
                                                                      return_attention_score = True)
        
        shape_checker(x, "batch t units")
        shape_checker(attention_score, "batch t s")
        
        # Cache attention score for plotting later
        attention_score = tf.reduce_mean(attention_score, axis = 1)
        shape_checker(attention_score, "batch t s")
        self.last_attention_weights = attention_score
        
        x = self.add([x, attention_output])
        x = self.norm_layer(x)
        
        return x
    
attention_layer = CrossAttention(UNITS)

# Encoded tokens
embedding = tf.keras.layers.Embedding(target_processor.vocabulary_size(),
                                     output_dim = UNITS,
                                     mask_zero = True)

sample_embedded_target = embedding(sample_target_input)

result = attention_layer(sample_embedded_target, sample_context)

print(f"Context sequence, shape (batch, s, units): {sample_context.shape}")
print(f"Target sequence, shape (batch, t, units): {sample_embedded_target.shape}")
print(f"Attention result, shape (batch, t, units): {result.shape}")
print(f"Attention weights, shape (batch, t, s): {attention_layer.last_attention_weights.shape}")

In [None]:
attention_layer.last_attention_weights[0].numpy().sum(axis =- 1)

In [None]:
attention_weights = attention_layer.last_attention_weights
mask = (sample_context_token != 0).numpy()

plt.subplot(1, 2, 1)
plt.pcolormesh(mask * attention_weights[:, 0, :])
plt.title("Attention Weights")

plt.subplot(1, 2, 2)
plt.pcolormesh(mask)
plt.title("mask")

In [None]:
class Decoder(tf.keras.layers.Layer):
    @classmethod
    
    def add_method(cls, fun):
        setattr(cls, fun.__name__, fun)
        
        return fun
    
    def __init__(self, text_processor, units):
        super(Decoder, sellf).__init__()
        self.text_processor = text_processor
        self.vocab_size = text_processor.vocabulary_size()
        self.word_to_id = tf.keras.layers.StringLookup(vocabulary = text_processor.get_vocabulary(),
                                                       mask_token = '', 
                                                       oov_token = '[UNK]')
        self.id_to_word = tf.keras.layers.StringLookup(vocabulary = text_processor.vocabulary_size(),
                                                       mask_token = '',
                                                       oov_token =  '[UNK]',
                                                       invert = True)
        self.start_token = self.word_to_id('[START]')
        self.end_token = self.id_to_word('[END]')
        
        self.units = units
        
        # Embedding layer converts ids to vectors
        self.embedding = tf.keras.layers.Embedding(self.vocab_size,
                                                   units,
                                                   mask_zero = True)
        
        # RNN tracks generated sequences
        self.RNN = tf.keras.layers.GRU(units,
                                       return_sequences = True,
                                       return_state = True,
                                       recurrent_initializer = 'glorot_uniform')
        
        # RNN output becomes query for attention layer
        self.attention = CrossAttention(units)
        
        # Fully connected layer produces logits for each output token
        self.output_layer = tf.keras.layers.Dense(self.vocab_size)

In [None]:
@Decoder.add_method

def call(self,
         context, x,
         state = None,
         return_state = False):
    shape_checker = ShapeChecker()
    shape_checker(x, 'batch t')
    shape_checker(context, 'batch s units')
    
    x = self.embedding(x)
    shape_checker(x, 'batch t units')
    
    x, state = self.rnn(x, initial_state = state)
    shape_checker(x, 'batch t units')
    
    x = self.attention(x, context)
    self.last_attention_weights = self.attention.last_attention_weights
    shape_checker(x, 'batch t units')
    shape_checker(self.last_attention_weights, 'batch t s')
    
    logits = self.output_layer(x)
    shape_checker(logits, 'batch t target_vocab_size')
    
    if return_state:
        return logits, state
    else:
        return logits

In [None]:
decoder = Decoder(target_processor, UNITS)

In [None]:
logits = decoder(context_sample, sample_target_input)

print(f'Encoder output shape: (batch, s, units) {context_sample.shape}')
print(f"Input target tokens shape: (batch, t) {sample_target_input.shape}")
print(f"logits shape: (batch, target_vocabulary_size) {logits.shape}")

In [None]:
@Decoder.add_method

def get_initial_state(self, context):
    batc_size = tf.shape(context)[0]
    start_tokens = tf.fill([batch_size, 1], self.start_tokens)
    done = tf.zeros([batch_size, 1], dtype = tf.bool)
    embedded = self.emdedding(start_tokens)
    
    return start_tokens, done, self.rnn.get_initial_state(embedded)[0]

In [None]:
@Decoder.add_method

def tokens_to_text(self, tokens):
    words = self.id_to_word(tokens)
    result = tf.strings.reduce_join(words, axis = -1, separator = ' ')
    result = tf.strings.regex_replace(result, '^ * \[START\] *', '')
    result = tf.strings.regex_replace(result, ' *\[END\] *$', '')
    
    return result

In [None]:
@Decoder.add_method

def get_next_token(self, context, next_token, done, state, temperature = 0.0):
    logits, state = self(context,
                         next_token,
                         state = state,
                         return_state = True)
    
    if temperature == 0.0:
        next_token = tf.argmax(logits, axis =- 1)
    else:
        logits = logits[:, -1, :] / temperature
        next_token = tf.random.categorical(logits, num_samples = 1)
        
    # If sequence produces end token its done
    done = done | (next_token == self.end_token)
    # Once a sequence is done it only produces 0-padding
    next_token = tf.where(done, tf.constant(0, dtype = tf.int64), next_token)
    
    return next_token, done, state

In [None]:
# Generation loop

next_token, done, state = decoder.get_initial_state(context_sample)
tokens = []

for n in range(10):
    next_token, done, state = decoder.get_next_token(context_sample,
                                                     next_token,
                                                     done,
                                                     state,
                                                     temperature = 1.0)
    tokens.append(next_token)
    
tokens = tf.concat(tokens, axis =- 1)

result = decoder.tokens_to_text(tokens)
result[:3].numpy()

In [None]:
class Translator(tf.keras.Model):
    @classmethod
    
    def add_method(cls, fun):
        setattr(cls, fun.__name__, fun)
        
        return fun
    
    def __init__(self, units, context_processor, target_processor):
        super().__init__()
        # Add Encoder and Decoder
        encoder = Encoder(context_processor, units)
        decoder = Decoder(target_processor, units)
        
        self.encoder = encoder
        self.decoder = decoder
        
    def call(self, inputs):
        context, x = inputs
        context = self.encoder(context)
        logits = self.decoder(context, x)
        
        # Delete mask
        try:
            del logits._keras_mask
        except AttributeError:
            pass
        
        return logits

In [None]:
model = Translator(UNITS, context_processor, target_processor)
logits = model((sample_context_token, sample_target_input))

print(f"Context tokens shape: {sample_context_token.shape}")
print(f"Target tokens shape: {sample_target_input}")
print(f"logits shape: {logits.shape}")

In [None]:
def masked_loss(y_true, y_predicted):
    loss_func = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True,
                                                              reduction = 'none')
    loss = loss_func(y_true, y_predicted)
    
    mask = tf.cast(y_true != 0, loss.dtype)
    loss *= mask
    
    return tf.reduce_sum(loss) / tf.reduce_sum(mask)

In [None]:
def masked_accuracy(y_true, y_predicted):
    y_predicted = tf.argmax(y_predicted, axis =- 1)
    y_predicted = tf.cast(y_predicted, y_true.dtype)
    
    match = tf.cast(y_true == y_predicted, tf.float32)
    mask = tf.cast(y_true != 0, tf.float32)
    
    return tf.reduce_sum(match) / tf.reduce_sum(mask)

In [None]:
model.compile(optimizer = 'adam',
              loss = masked_loss
              metrics = [masked_accuracy, masked_loss])

In [None]:
vocabulary_size = 1.0 * target_processor.vocabulary_size()

{"expected_loss" : tf.math.log(vocabulary_size).numpy(),
 "expected_accuracy" : 1 / vocabulary_size}

In [None]:
model.evaluate(val_set,
               steps = 20,
               return_dict = True)

In [None]:
history = model.fit(train_set.repeat(),
                    validation_data = val_set,
                    #steps_per_epoch = 100,
                    validation_steps = 20,
                    callbacks = [keras.callbacks.EarlyStopping(patience = 3)],
                    epochs = 100
                    )

In [None]:
history = history.history

In [None]:
val_loss = history['val_loss']
loss = history['loss']

val_accuracy = history["val_masked_accuracy"]
accuracy = history["masked_accuracy"]

In [None]:
plt.plot(accuracy, label = "Loss")
plt.plot(val_accuracy, label = "Validation Loss")
plt.ylim([0, max(plt.ylim())])
plt.xlabel("Epoch #")
plt.ylabel("token")
plt.legend(loc = 'upper left')
plt.title("Accuracy")
plt.show()

In [None]:
plt.plot(loss, label = "Loss")
plt.plot(val_loss, label = "Validation Loss")
plt.ylim([0, max(plt.ylim())])
plt.xlabel("Epoch #")
plt.ylabel("token")
plt.legend(loc = 'upper right')
plt.title("Loss")
plt.show()

#### Text to text translation
- Process input text
- Generate next token
- Store generated tokens
- Stack tokens and attention weights

In [None]:
#@title
@Translator.add_method

def translate(self, texts, *, max_length = 50, temperature = 0.0):
    context = self.encoder.convert_input(texts)
    batch_size = tf.shape(texts)[0]
    
    tokens = []
    attention_weights = []
    next_token, done, state = self.decoder.get_initial_state(context)
    
    for _ in range(max_length):
        next_token, done, state = self.decoder.get_next_token(context,
                                                              next_token,
                                                              done, 
                                                              state,
                                                              temperature)
        
        tokens.append(next_token)
        attention_weights.append(self.decoder.last_attention_weights)
        
        if tf.executin_eagerly() and tf.reduce_all(done):
            break
        
    tokens = tf.concat(tokens, axis =- 1)
    self.last_attention_weights = tf.concat(attention_weights, axis = 1)    
    result = self.decoder.tokens_to_text(tokens)

    return result

In [None]:
@Translator.add_method

def attention_plot(self, text, **kwargs):
    asser isinstance(text, str)
    output = self.translate{[text], **kwargs}
    output = output[0].numpy().decode()
    
    attention = self.last_attention_weights[0]
    
    context = text_processing(text)
    context = context.numpy().decode().split()
    
    output = text_processing(output)
    output = output.numpy().decode().split()[1:]
    
    fig = plt.figure(figsize = (12, 8))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap = 'viridis', vmin = 0.0)
    
    fontdict = {"fontsize" : 13}
    
    ax.set_xticklabels([''] + context , fontdict = fontdict, rotation = 90)
    ax.set_yticklabels([''] + output, fontdict = fontdict)
    
    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
    
    ax.set_xlabel("Input text")
    ax.set_ylabel("Output text")

In [None]:
model.attention_plot("¿Ningun esta en casa?")

In [None]:
%%time

model.attention_plot("Esto es mi vida")

In [None]:
model.attention_plot("Tratar de descubrir")

In [2]:
import textwrap

In [9]:
long_text = context_raw[-1]

print("Expected output: \n", '\n'.join(textwrap.wrap(target_raw[-1])))

Expected output: 
 One day, I woke up to find that God had put hair on my face. I shaved
it off. The next day, I found that God had put it back on my face, so
I shaved it off again. On the third day, when I found that God had put
hair back on my face again, I decided to let God have his way. That's
why I have a beard.


In [None]:
model.attention_plot(long_text)

In [None]:
samples = ["Hace mucho calor aqui.",                # Its very hot here.
           "Quiero aprender a hablar espanol.",     # I want to learn to speak spanish.
           "El cuatro esta sucio."]                   # The room is dirty.

In [None]:
%%time

for sample in samples:
    print(model.translate([sample])[0].numpy().decode())
    
print()

##### Save and export model

In [None]:
class Export(tf.Module):
    
    def __init__(self, model):
        self.model = model
        
    @tf.function(input_signature = [tf.TensorSpec(dtype = tf.string, shape = [None])])
    def translate(self, inputs):
        return self.model.translate(inputs)

In [None]:
translator_model = Export(model)

In [None]:
_ = translator_model.translate(tf.constant(samples))

In [None]:
%%time

result = translator_model.translate(tf.constant(samples))

print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()

In [None]:
%%time

tf.saved_model.save(translator_model,
                    'translator',
                    signatures = {'serving_default' : translator_model.translate})

In [None]:
%%time

loaded_model = tf.saved_model.load('translator')
_ = loaded_model.translate(tf.constant(samples))

In [None]:
%%time

result = loaded_model.translate(tf.constant(samples))

print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()

##### Using a dynamic loop

- This is faster than the eqger execution implemented.

In [None]:
@Translator.add_method

def translate(self, texts, *, max_length = 500, temperature = tf.constant(0.0)):
    shape_checker = ShapeChecker()
    context = self.encoder.convert_input(texts)
    batch_size = tf.shape(context)[0]
    shape_checker(context, 'batch s units')
    
    next_token, done, state = self.decoder.get_initial_state(context)
    
    tokens = tf.TensorArray(tf.int64, size = 1, dynamic_size = True)
    
    for t in tf.range(max_length):
        next_token, done, state = self.decoder.get_next_token(context, 
                                                              next_token,
                                                              done,
                                                              state,
                                                              temperature)
        shape_checker(next_token, 'batch t1')
        
        tokens = tokens.write(t, next_token)
        
        if tf.reduce_all(done):
            break
        
    tokens = tokens.stack()
    shape_checker(tokens, 't batch t1')
    tokens = einops.rearrange(tokens, 't batch 1 -> batch t')
    shape_checker(tokens, 'batch t')
    
    text = self.decoder.tokens_to_text(tokens)
    shape_checker(text, 'batch t')
    
    return text

In [None]:
%%time

result = model.translate(samples)

print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()

In [None]:
class Export(tf.Module):
    
    def __iniit__(self, model):
        self.model = model
        
    @tf.function(input_signature = [tf.TensorSpec(dtype = tf.string, shape = [None])])
    def translate(self, inputs):
        
        return self.model.translate(inputs)

In [None]:
dyn_model = Export(model)

In [None]:
%%time

_ = dyn_model.translate(samples)

In [None]:
%%time

result = dyn_model.translate(samples)

print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()

In [None]:
%%time

tf.saved_model.save(dyn_model,
                    'dynamic_translator',
                    signatures = {'serving_default' : dyn_model.translate})

In [None]:
%%time

load_dyn_model = tf.saved_model.load('dynamic_translator')
_ = load_dyn_model.translate(tf.constant(samples))

In [None]:
%%time

result = load_dyn_model.translate(tf.constant(samples))

print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()