In [1]:
# from google.colab import auth
# auth.authenticate_user()
import sys,os,warnings
if "google.colab" in sys.modules:
    %pip install "tensorflow-text==2.13.0"
    %pip install kaleido
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
warnings.filterwarnings("ignore")
import numpy as np
import re
from typing import Literal
import tensorflow as tf
from tensorflow import keras
import tensorflow_text as tftext
import tensorflow_text.tools.wordpiece_vocab.bert_vocab_from_dataset as bert_vocab
from zipfile import ZipFile
from IPython.display import clear_output
from shutil import copytree,copy2
import requests
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
pio.templates.default = "plotly_dark"
if "google.colab" not in sys.modules:
    gpus = tf.config.list_physical_devices("GPU")
    tf.config.experimental.set_virtual_device_configuration(
        gpus[0],
        [tf.config.LogicalDeviceConfiguration(memory_limit=9000)]
        )
tf.get_logger().setLevel("ERROR")
%xmode Context
clear_output()

In [2]:
with tf.device("/job:localhost"):
    url = "https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip"
    file_path = keras.utils.get_file(fname="spa-eng.zip",origin=url,extract=True)
    with ZipFile(file_path,"r") as f:
        f.extractall("spa-eng")
    with open("spa-eng/spa-eng/spa.txt","r") as f:
        text = f.read()
clear_output()
en_text,es_text = zip(*[line.split("\t") for line in text.splitlines()])
for en,es in zip(en_text[:10],es_text[:10]):
    print(f"{en} ----> {es}")

Go. ----> Ve.
Go. ----> Vete.
Go. ----> Vaya.
Go. ----> Váyase.
Hi. ----> Hola.
Run! ----> ¡Corre!
Run. ----> Corred.
Who? ----> ¿Quién?
Fire! ----> ¡Fuego!
Fire! ----> ¡Incendio!


In [3]:
en_url = "https://github.com/h4ck4l1/datasets/raw/main/NLP_with_RNN_and_Attention/en_vocab.txt"
es_url = "https://github.com/h4ck4l1/datasets/raw/main/NLP_with_RNN_and_Attention/spa_vocab.txt"

en_content = requests.get(en_url).content
es_content = requests.get(es_url).content

with open("en_vocab.txt","wb") as f:
    f.write(en_content)

with open("spa_vocab.txt","wb") as f:
    f.write(es_content)

In [4]:
en_tokenizer = tftext.BertTokenizer(
    "en_vocab.txt",
    normalization_form="NFKD"
)
es_tokenizer = tftext.BertTokenizer(
    "spa_vocab.txt",
    normalization_form="NFKD"
)

In [5]:
with open("en_vocab.txt","r") as f:
    en_vocab = f.read()

with open("spa_vocab.txt","r") as f:
    es_vocab = f.read()

en_vocab = np.array(en_vocab.splitlines())
es_vocab = np.array(es_vocab.splitlines())
en_text = np.array(en_text)
es_text = np.array(es_text)

In [6]:
start_token = tf.argmax(en_vocab == "[START]",output_type=tf.int64)
end_token = tf.argmax(es_vocab== "[END]",output_type=tf.int64)

In [7]:
def upstream(sentence:str,lang:Literal["en","es"]):
    assert lang in ["en","es"],f"The provided argument for lang is not in ['en','es']"
    bsize = tf.shape(sentence)[0]
    sentence = tf.convert_to_tensor(sentence)
    sentence = tftext.normalize_utf8(sentence,"NFKD")
    sentence = tf.strings.lower(sentence)
    sentence = tf.strings.regex_replace(sentence,r"[^ a-z,.?!¿]","")
    sentence = tf.strings.regex_replace(sentence,r"[,.?!¿]",r" \0 ")
    sentence = tf.strings.strip(sentence)
    if lang == "en":
        tokens = en_tokenizer.tokenize(sentence).merge_dims(-2,-1).to_tensor()
    else:
        tokens = es_tokenizer.tokenize(sentence).merge_dims(-2,-1).to_tensor()
    return tf.concat([tf.fill(dims=[bsize,1],value=start_token),tokens,tf.fill(dims=[bsize,1],value=end_token)],axis=-1)

In [8]:
def downstream(tokens:tf.Tensor,lang:Literal["en","es"]):
    assert lang in ["en","es"],f"The provided argument for lang is not in ['en','es']"
    if lang == "en":
        words = en_tokenizer.detokenize(tokens)
    else:
        words = es_tokenizer.detokenize(tokens)
    bad_tokens = "|".join([re.escape(_) for _ in ["[START]","[END]","[PAD]"]])
    mask = tf.strings.regex_full_match(words,bad_tokens)
    re_words = tf.ragged.boolean_mask(words,~mask)
    return tf.strings.reduce_join(re_words,separator=" ",axis=1)


# escape strings ^\[START\](?:.*?\[PAD\])*?(.*?)\[END\]$

In [9]:
def preprocess(context,target):
    context = upstream(context,"en")
    target = upstream(target,"es")
    return (context,target[:,:-1]),target[:,1:]

In [10]:
BATCH_SIZE = 64
all_indices = np.random.uniform(size=len(en_text))
train_indices = all_indices <= 0.8
valid_indices = all_indices > 0.8
train_size = len(train_indices)
valid_size = len(valid_indices)
train_ds = (
    tf.data.Dataset
    .from_tensor_slices((en_text[train_indices],es_text[train_indices]))
    .batch(BATCH_SIZE)
    .shuffle(len(en_text))
    .map(preprocess)
    .repeat()
    .prefetch(-1)
)
valid_ds = (
    tf.data.Dataset
    .from_tensor_slices((en_text[valid_indices],es_text[valid_indices]))
    .batch(BATCH_SIZE)
    .shuffle(len(en_text))
    .map(preprocess)
    .repeat()
    .prefetch(-1)
)
for (en_in,es_in),targ_in in train_ds.take(1):
    print(en_in.shape,es_in.shape,targ_in.shape)
    print(es_in[0,:10])
    print(targ_in[0,:10])

(64, 18) (64, 19) (64, 19)
tf.Tensor([   2   40   21  110 1295 3517   97  239  309  135], shape=(10,), dtype=int64)
tf.Tensor([  40   21  110 1295 3517   97  239  309  135   51], shape=(10,), dtype=int64)


In [17]:
class PositionEncoding(keras.layers.Layer):

    def __init__(
        self,
        vocab_size:int,
        length:int,
        d_model:int,
        casting:Literal["concat","interleave"],
        **kwargs
        ):

        super(PositionEncoding,self).__init__(**kwargs)
        assert d_model%2==0,f"The provided d_model is not even it should be even"
        assert casting in ["concat","interleave"],f"The provided casting is not in the given values"
        depth = d_model//2
        angle_rads = np.arange(length)[:,np.newaxis] * 1/(10000**(np.arange(depth)[np.newaxis,:]/depth))
        if casting == "concat":
            self.embed = tf.concat([tf.sin(angle_rads),tf.cos(angle_rads)],axis=-1)
        else:
            self.embed = np.zeros(shape=angle_rads.shape)
            self.embed[:,::2] = tf.sin(angle_rads)
            self.embed[:,1::2] = tf.cos(angle_rads)
        self.embedding = keras.layers.Embedding(vocab_size,d_model,mask_zero=True)

    def compute_mask(self,*args,**kwargs):
        return self.embedding.compute_mask(*args,**kwargs)

    def call(self,inputs):
        seq_l = tf.shape(inputs)[1]
        return inputs + self.embed[tf.newaxis,:seq_l,:]

# PositionEncoding(5000,2048,512,"concat")

<__main__.PositionEncoding at 0x7a843509fac0>

In [20]:
class Encoder(keras.layers.Layer):

    def __init__(
        self,
        num_layers:int,
        num_heads:int,
        d_model:int,
        feed_forward_dense_units:int,
        dropout_rate:float,
        **kwargs
    ):

        self.num_layers = num_layers

        self.self_attention = keras.layers.MultiHeadAttention(num_heads=num_heads,key_dim=d_model)
        self.ff_dense = keras.layers.Dense(feed_forward_dense_units,"relu")
        self.scale_dense = keras.layers.Dense(d_model)
        self.dropout = keras.layers.Dropout(dropout_rate)
        self.layer_norm = keras.layers.LayerNormalization()
        self.add = keras.layers.Add()


    def call(self,z):

        for _ in range(self.num_layers):

            z_copy = z
            z = self.self_attention(query=z,key=z,value=z)
            z = self.layer_norm(self.add([z_copy,z]))
            z_copy = z
            z = self.ff_dense(z)
            z = self.scale_dense(z)
            z = self.dropout(z)
            z = self.layer_norm(self.add([z_copy,z]))

        return z

In [23]:
class Decoder(keras.layers.Layer):

    def __init__(
        self,
        num_layers:int,
        num_self_heads:int,
        num_cross_heads:int,
        d_model:int,
        feed_forward_dense_units:int,
        dropout_rate:float,
        **kwargs
    ):

        self.num_layers = num_layers

        self.self_attention = keras.layers.MultiHeadAttention(num_heads=num_self_heads,key_dim=d_model)
        self.cross_attention = keras.layers.MultiHeadAttention(num_heads=num_cross_heads,key_dim=d_model)
        self.ff_dense = keras.layers.Dense(feed_forward_dense_units,"relu")
        self.scale_dense = keras.layers.Dense(d_model)
        self.dropout = keras.layers.Dropout(dropout_rate)
        self.layer_norm = keras.layers.LayerNormalization()
        self.add = keras.layers.Add()

    def call(self,z_enc,z):

        for _ in range(self.num_layers):

            z_copy = z
            z = self.self_attention(query=z,key=z,value=z,use_causal_mask=True)
            z = self.layer_norm(self.add([z_copy,z]))
            z_copy = z
            z = self.cross_attention(query=z,key=z_enc,value=z_enc)
            z = self.layer_norm(self.add([z_copy,z]))
            z_copy = z
            z = self.ff_dense(z)
            z = self.scale_dense(z)
            z = self.dropout(z)
            z = self.layer_norm(self.add([z_copy,z]))

        return z

In [None]:
class Transformer(keras.Model):

    def __init__(
        self,
        encoder_vocab_size:int,
        decoder_vocab_size:int,
        encoder_length:int=2048,
        decoder_length:int=2048,
        d_model:int=512,
        encoder_casting:Literal["concat","interleave"]="concat",
        decoder_casting:Literal["concat","interleave"]="concat",
        encoder_num_layers:int=8,
        encoder_num_heads:int=3,
        encoder_feed_forward_dense_units:int=2048,
        encoder_dropout_rate:float=.1,
        decoder_num_layers:int=12,
        decoder_num_self_heads:int=3,
        decoder_num_cross_heads:int=3,
        decoder_feed_forward_dense_units:int=2048,
        decoder_dropout_rate:float=.1,
        **kwargs
    ):

        self.encoder_embed = PositionEncoding(
            encoder_vocab_size,
            encoder_length,
            d_model,
            encoder_casting
            )
        self.decoder_embed = PositionEncoding(
            decoder_vocab_size,
            decoder_length,
            d_model,
            decoder_casting
            )
        self.encoder_layer = Encoder(
            encoder_num_layers,
            encoder_num_heads,
            d_model,
            encoder_feed_forward_dense_units,
            encoder_dropout_rate
            )
        self.decoder_layer = Decoder(
            decoder_num_layers,
            decoder_num_self_heads,
            decoder_num_cross_heads,
            d_model,
            decoder_feed_forward_dense_units,
            decoder_dropout_rate
            )
        self.total_out = keras.layers.Dense(decoder_vocab_size)