In [1]:
text_file = "spa-eng/spa.txt" 
with open(text_file) as f:
        lines = f.read().split("\n")[:-1]
        text_pairs = []
        for line in lines:
            english, spanish = line.split("\t") 
            spanish = "[start] " + spanish + " [end]" 
            text_pairs.append((english, spanish))
        

In [None]:
import random
print(random.choice(text_pairs))

In [None]:
#train validation split
num_samples = len(text_pairs)
random.shuffle(text_pairs)
num_val_samples = int(0.15*num_samples)
num_train_samples = num_samples - 2 * num_val_samples
train_samples = text_pairs[:num_train_samples]
val_samples = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_samples = text_pairs[num_train_samples + num_val_samples:]

usually we strip all the punctuation in both english and spanish texts , but we have inserted [start] and [end] special tokens which are special tokens and hence [] should not be stripped. also an additional character in spanish is also there, which needs to be stripped
In certain applications, puncuations are also added as tokens.

In [None]:
import tensorflow as tf
import re
import string

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(text):
    lowercase = tf.strings.lower(text)
    return tf.strings.regex_replace(lowercase, f"[{re.escape(strip_chars)}]","")


In [None]:
from tensorflow.keras import layers

In [None]:
vocab_size = 15000 # top 15000 tokens in each language
sequence_length = 20 # pick top 20 words in sentence

source_vectorization = layers.TextVectorization(max_tokens = vocab_size, output_mode = "int", output_sequence_length = sequence_length)
target_vectorization = layers.TextVectorization(max_tokens = vocab_size, output_mode = "int", output_sequence_length = sequence_length+1, standardize = custom_standardization)
# if we don't pass standardize, it will do default standardization -> remove punctuations + lowercase
# for target, we need to remove [] from character strip and also spanish symbol is to stripped too
train_english_texts = [pair[0] for pair in train_samples]
train_spanish_texts = [pair[1] for pair in train_samples]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)

In [None]:
batch_size = 64
def format_dataset(eng,spa):
    eng = source_vectorization(eng)
    spa = target_vectorization(spa)
    return ({"english":eng, "spanish":spa[:,:-1]},spa[:,1:])

In [None]:
def make_dataset(pairs):
    eng_text, spa_text = zip(*pairs)
    eng_text = list(eng_text)
    spa_text = list(spa_text)
    dataset = tf.data.Dataset.from_tensor_slices((eng_text,spa_text))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls = 4)
    return dataset.shuffle(2048).prefetch(16).cache()

In [None]:
train_ds = make_dataset(train_samples)
val_ds = make_dataset(val_samples)

In [None]:
for input,targets in train_ds.take(1):
    print("the shape of english text is ", input['english'].shape)
    print("the shape of spanish text is ", input['spanish'].shape)
    print("the shape of target text is ", targets.shape)

Dataset is ready. first we ll try out the data on recurrent network based sequence-sequence model

RNNs specially LSTMs and their variants like GRUs were SOTA models for NLP tasks. 
for this machine translation task , we could use RNNs with sequence=True.
The requirement is that input sequence_length = output_sequence length, this can be managed by padding source or target sequence. another disadvantage is that for prediction of token N, you get to look at tokens 0...N-1, but for translation having access to tokens after N of the source sequence can be beneficial.

In [None]:
from tensorflow import keras

In [None]:
#a potential model

inputs = keras.Input(shape=(sequence_length,), dtype="int64")
x = layers.Embedding(input_dim=vocab_size, output_dim=128)(inputs)
x = layers.LSTM(32, return_sequences=True)(x)
outputs = layers.Dense(vocab_size, activation="softmax")(x)
model = keras.Model(inputs, outputs)

In [None]:
model.summary()

we see that the sequence length is maintained by the model with every pass through

one solution is that the RNN parses through the source sequence and upon the end of the sequence,it produces a output vector or the internal state vector in the end could be used which has encoded the input sequence. Then this vector can be used as input to the decoder, which will produce the token N+1 given tokens 0...N and the learned vector at the end of encoder.

In [None]:
embed_dim = 256
latent_dim = 1024

source = keras.Input(shape=(None,), dtype="int64", name="english") # the english source sentence goes here
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True) (source)
encoded_source = layers.Bidirectional(layers.GRU(latent_dim), merge_mode="sum") (x)


In [None]:
past_target = keras.Input(shape=(None,), dtype="int64",name="spanish") # past spanish tokens in decoder
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True) (past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
x = decoder_gru(x , initial_state = encoded_source)
x  = layers.Dropout(0.5) (x)
model2= keras.Model(past_target, x)
print(model2.summary())

target_next_step = layers.Dense(vocab_size, activation="softmax")(x) 
seq2seq_rnn = keras.Model([source, past_target], target_next_step)

In [None]:
seq2seq_rnn.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])

In [None]:
seq2seq_rnn.fit(train_ds, validation_data = val_ds, epochs=15)

translation of an english sentence given a seed token [start] -> inference

In [None]:
import numpy as np
spanish_vocab = target_vectorization.get_vocabulary()
spanish_index_lookup = dict(zip(range(len(spanish_vocab)),spanish_vocab))
max_decoded_sequence_length = 20


In [None]:
def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    target_sentence = "[start]"
    for i in range(max_decoded_sequence_length):
        tokenized_target_sentence = target_vectorization([target_sentence])
        next_token_prediction = seq2seq_rnn.predict([tokenized_input_sentence, tokenized_target_sentence])
        next_sample_token_index = np.argmax(next_token_prediction[0,i,:])
        next_sample_token = spanish_index_lookup[next_sample_token_index]
        target_sentence += " " + next_sample_token
        if next_sample_token == "[end]":
            break
    return target_sentence
        

In [None]:
test_eng_texts = [pair[0] for pair in test_samples]

In [None]:
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

Though an accuracy of 64 percent is claimed, upon fitting the model, only 28 percent accuracy is obtained. a deeper investigation is needed
and also, BLEU is a more reliable metric compared to accuracy for seq2seq translation.

Drawbacks of rnn approach:
the entirety of source sequence representation is to be held in encoder -> very little flexibility especially when translating long ,complex sequences
rnns forget context with increase in number of tokens to remember. by the time we reach 100th token, the rnn has very little information about the zeroth token.thus it fails when we use it for long documents.
this paved way for transformer architecture [self attention + position embedding]

Transformer for sequence translation:
transformer decoder is very similar to transformer encoder except that there is a communication between exit block of transformer encoder and attention block of decoder.

in a decoder, the queries are the target sentence representations while the source sequence representations are the keys an values, This way, for every token in sequence, there is a communication between target and source [unlike RNNs].

Decoder : given.0...N tokens, predict the N+1 token.

Transformer by default will look at all tokens in a sequence to calculate the key-pair scores and update value vector. but, during inference, you will have access to 0, n tokens and anything beyond. So , during training , if we have access to tokens beyond n, it willuse that info to have perfect training accuracy, but when used during inference, it will be useless spitting non-sense because it has been trained to predict given future tokens. So, causal mask [mask which filters out tokens N+1:end] will have to be added to architecture.

In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self,embed_dim,dense_dim,num_heads,**kwargs):
        super().__init__(**kwargs)
        self.dense_dim = dense_dim
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.attention1 = layers.MultiHeadAttention(num_heads=num_heads,key_dim=embed_dim)
        self.attention2 = layers.MultiHeadAttention(num_heads = num_heads,key_dim=embed_dim)
        self.dense_proj = keras.Sequential([layers.Dense(dense_dim,activation='relu'), layers.Dense(embed_dim),])
        self.layer_norm1 = layers.LayerNormalization()
        self.layer_norm2 = layers.LayerNormalization()
        self.layer_norm3= layers.LayerNormalization()
        self.supports_masking = True
    
    def get_config(self):
        config = super().get_config
        config.update({
            "dense_dim":self.dense_dim,
            "embed_dim":self.embed_dim,
            "num_heads":self.num_heads
        })
        return config
        
        

In [1]:
#get causal attetion mask - understanding 
#idea : mask one half of inputs so that future tokens are not seen during training
batch_size_causal = 64
seq_length_causal = 20
i = tf.range(sequence_length)[:, tf.newaxis]
print(i.shape)
j = tf.range(sequence_length)
mask = tf.cast(i >= j, dtype="int32")
#print(mask)
mask = tf.reshape(mask, (1, seq_length_causal, seq_length_causal))
print(mask.shape)
print(tf.expand_dims(batch_size_causal, -1).shape)


NameError: name 'tf' is not defined