# Tensorflow

# Processing and Loading

In [None]:
import tensorflow as tf
import re
from typing import List
import time
import string
import os
import numpy as np
from keras_nlp.tokenizers import WordPieceTokenizer
from keras_nlp.tokenizers import compute_word_piece_vocabulary
from keras_nlp.tokenizers import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
nltk.download('punkt')

start=time.time()
MAX_SEQ_LEN=32
class Tokenizer_Private(Tokenizer):
    def __init__(self,data) -> None:
        super().__init__()
        self.data=data  
        max_vocab_size=10000
        tokenized_text=[nltk.tokenize.word_tokenize(line) for line in self.data]
        flat_token=[token for tokens in tokenized_text for token in tokens]
        self.stop_words=set(stopwords.words('english'))
        filtered_tokens=[token for token in flat_token if 
                         token.isalnum() and token not in self.stop_words]
        freq_dist=nltk.probability.FreqDist(filtered_tokens)
        common_words=freq_dist.most_common(max_vocab_size)
        self.vocabulary={word:idx+1 for idx,(word,_) in enumerate(common_words)}

    def tokenize(self,text=None):
        tokens = nltk.tokenize.word_tokenize(text.lower())
        filtered_tokens = [token for token in tokens if token.isalnum() and token not in self.stop_words]
        numerical_sequence = [self.vocabulary.get(token, 0) for token in filtered_tokens]
        return numerical_sequence

    def detokenize(self, inputs, *args, **kwargs):
        tokens=[self.vocabulary.get(idx,"UNK") for idx in inputs]
        return " ".join(tokens)
    def get_vocabulary(self) -> List[str]:
        return list(self.vocabulary.keys())
    def token_to_id(self,token):
            return self.vocabulary.get(token, 0) 

    def id_to_token(self,id): 
        return self.vocabulary.get(id,"UNK")
class LSTM_Tokenizer:
    def __init__(self,data) -> None:
        self.data=data 
        self.tokenizer=Tokenizer_Private(self.data)
    def tokenize(self):
        tokenized=[]
        for line in self.data:
            token_list=self.tokenizer.tokenize(line)
            for i in range(len(token_list)):
                tokenized.append(token_list[:i+1])
        max_seq_len=max(len(x) for x in tokenized)
        tokenized=np.array(pad_sequences(tokenized,MAX_SEQ_LEN,padding="pre"))
        return self.tokenizer,tokenized
class Bert_Tokenizer:
    def  __init__(self,data):
        self.data=data

    def tokenize(self):
        vocab_size = 30000
        dataset = tf.data.Dataset.from_tensor_slices(self.data)
        vocabulary=compute_word_piece_vocabulary(dataset,
                                                 vocabulary_size=vocab_size)
        tokenizer=WordPieceTokenizer(vocabulary=vocabulary,
                                     sequence_length=MAX_SEQ_LEN,strip_accents=False)            
            
        return tokenizer,tokenizer(self.data)

class Dataset:
    def __init__(self,dir):
        self.paths=[os.path.join(dir,i) for i in os.listdir(dir) if i.endswith("txt")]
    
    def get_data(self,tokenizer:str="BERT"):
        pages=[]
        for i in range(10):
            with open(self.paths[i],"r") as f:
                temp=f.readlines()
                pages.extend(self.preprocess(i) for i in temp if i!="\n")
        assert tokenizer in ["BERT","LSTM"]
        if tokenizer=="BERT":
            return Bert_Tokenizer(pages)
        else:
            return LSTM_Tokenizer(pages)
        
    def preprocess(self,line):
        line=line.lower().strip()
        translator = str.maketrans('', '', string.punctuation)
        line = line.translate(translator)
        line=re.sub(r"\n","",line)
        return line
    
dataset=Dataset("/kaggle/input/gutenberg/txt")
tokenizer,x_data=dataset.get_data("LSTM").tokenize()
print("Time to load dataset and obtain tokens is {}".format(time.time()-start))

In [None]:
vocabulary=tokenizer.get_vocabulary()
print(len(vocabulary))

In [None]:
print(type(x_data))
print(x_data.shape)

# LSTM Architecture

In [None]:
import tensorflow as tf
import keras.layers as nn

class LSTM_Generator(tf.keras.Model):
    def __init__(self,vocab_size:int,embedding_size:int=128,hidden_size:int=1024):
        super(LSTM_Generator,self).__init__()
        self.embedding=nn.Embedding(vocab_size,embedding_size)
        self.drop=nn.Dropout(0.1)
        self.forward=nn.LSTM(512,return_sequences=True)
        self.backward=nn.Bidirectional(nn.LSTM(320,return_sequences=True))
        self.pool=nn.GlobalMaxPooling1D()
        self.linear1=nn.Dense(hidden_size,activation="relu")
        self.linear2=nn.Dense(vocab_size,activation="softmax")
        
    def call(self,x:tf.Tensor)->tf.Tensor:
        embedding=self.embedding(x)
        drop=self.drop(embedding)
        forward=self.forward(drop)
        backward=self.backward(forward)
        pool=self.pool(backward)
        linear1=self.linear1(pool)
        linear2=self.linear2(linear1)
        return linear2
        
class get_lstm_generator:
    def  __init__(self,vocab_size:int,embedding_size:int=128,hidden_size:int=1024):
        self.vocab_size=vocab_size
        self.embedding_size=embedding_size
        self.hidden_size=hidden_size
    def getModel(self)->tf.keras.Model:

        loss_fn=tf.keras.losses.CategoricalCrossentropy()
        optim=tf.keras.optimizers.Adam(learning_rate=0.1)
        model=LSTM_Generator(self.vocab_size,self.embedding_size,self.hidden_size)
        model.compile(optimizer=optim,loss=loss_fn,metrics=["accuracy"])
        return model

In [None]:
from tensorflow.keras.utils import to_categorical
def preprocess_lstm(tokenized_inputs,vocab_size):
    x_train=tokenized_inputs[:,:-1]
    labels=tokenized_inputs[:,-1]
    y_train=to_categorical(labels,num_classes=vocab_size+1)
    train=tf.data.Dataset.from_tensor_slices((x_train,y_train)).batch(batch_size=8,num_parallel_calls=tf.data.AUTOTUNE)
    return train
train=preprocess_lstm(x_data,len(vocabulary))


In [None]:
model=get_lstm_generator(len(vocabulary)+1).getModel()
model.fit(train,epochs=100)

# Transformer Architecture

In [None]:
import tensorflow as tf
import keras.layers as nn
import keras_nlp


SEQ_LENGTH = 128
MASK_RATE = 0.25
PREDICTIONS_PER_SEQ = 32

# Model params.
NUM_LAYERS = 3
MODEL_DIM = 256
INTERMEDIATE_DIM = 512
NUM_HEADS = 4
DROPOUT = 0.1
NORM_EPSILON = 1e-5
class Transformer_Learner(tf.keras.Model):
    def __init__(self,batch_size:int,num_head:int,vocab_size:int,seq_length:int=128,embedding_size:int=128,hidden_size:int=1024):
        super(Transformer_Learner,self).__init__()
        self.embedding_layer=keras_nlp.layers.TokenAndPositionEmbedding(vocab_size,seq_length,embedding_size)
        self.encoder_model=tf.keras.Sequential([nn.Input(batch_shape=(batch_size, seq_length)),
                                                self.embedding_layer,
                                          nn.LayerNormalization(),
                                          nn.Dropout(0.1)])
        for _ in range(num_head):
            self.encoder_model.add(keras_nlp.layers.TransformerEncoder(hidden_size,num_head,dropout=0.1,layer_norm_epsilon=1e-4))
        
        self.outputs = keras_nlp.layers.MaskedLMHead(token_embedding=self.embedding_layer.token_embedding,activation="softmax",)
    def call(self,x:tf.Tensor)->tf.Tensor:
        token_id,mask_pos=x
        output_token=self.encoder_model(token_id)
        output=self.outputs(output_token, mask_positions=mask_pos)
        return output

class get_transformer_learner:
    def  __init__(self,batch_size,num_head:int,vocab_size:int,embedding_size:int=128,hidden_size:int=1024):
        self.batch_size=batch_size
        self.vocab_size=vocab_size
        self.num_head=num_head
        self.embedding_size=embedding_size
        self.hidden_size=hidden_size
    def getModel(self)->tf.keras.Model:
        loss_fn=tf.keras.losses.SparseCategoricalCrossentropy()
        optim=tf.keras.optimizers.Adam(learning_rate=1e-2)
        model=Transformer_Learner(self.batch_size,self.num_head,self.vocab_size,self.embedding_size,self.hidden_size)
        model.compile(optimizer=optim,loss=loss_fn,metrics=["accuracy"])
        return model

In [None]:
import tensorflow as tf
import keras.layers as nn
import keras_nlp


SEQ_LENGTH = 128
MASK_RATE = 0.25
PREDICTIONS_PER_SEQ = 32

NUM_LAYERS = 3
MODEL_DIM = 256
INTERMEDIATE_DIM = 512
NUM_HEADS = 4
DROPOUT = 0.1
NORM_EPSILON = 1e-5

class Transformer_Decoder(tf.keras.Model):
    def __init__(self,batch_size:int,num_head:int,vocab_size:int,seq_length:int=128,embedding_size:int=128,hidden_size:int=1024):
        super(Transformer_Decoder,self).__init__()
        self.encoder=get_transformer_learner.getModel()
        self.embedding_layer=keras_nlp.layers.TokenAndPositionEmbedding(vocab_size,seq_length,embedding_size)
#         Check Shape today
        self.decoder_model=tf.keras.Sequential([nn.Input(batch_shape=(batch_size, seq_length)),
                                                self.embedding_layer,
                                          nn.Dropout(0.1)])
        for _ in range(num_head):
            self.decoder_model.add(keras_nlp.layers.TransformerDecoder(hidden_size,num_head,dropout=0.1,layer_norm_epsilon=1e-4))
        
       
    def call(self,x:tf.Tensor)->tf.Tensor:
        token_id,mask_pos=x
        output_token=self.deccoder_model(token_id)
        return output_token

class get_transformer_decoder:
    def  __init__(self,batch_size,num_head:int,vocab_size:int,embedding_size:int=128,hidden_size:int=1024):
        self.batch_size=batch_size
        self.vocab_size=vocab_size
        self.num_head=num_head
        self.embedding_size=embedding_size
        self.hidden_size=hidden_size
    def getModel(self)->tf.keras.Model:
        loss_fn=tf.keras.losses.SparseCategoricalCrossentropy()
        optim=tf.keras.optimizers.Adam(learning_rate=1e-2)
        model=Transformer_Decoder(self.batch_size,self.num_head,self.vocab_size,self.embedding_size,self.hidden_size)
        model.compile(optimizer=optim,loss=loss_fn,metrics=["accuracy"])
        return model

# Masking For Transformer

In [None]:
masker=keras_nlp.layers.MaskedLMMaskGenerator(
vocabulary_size=len(vocabulary),
mask_selection_rate=0.25,
mask_selection_length=32,
mask_token_id=tokenizer.token_to_id("[MASK]"))

In [None]:
def preprocess_transformer(tokenized_inputs):
    outputs = masker(tokenized_inputs)
    features = {
        "token_ids": outputs["token_ids"],
        "mask_positions": tf.cast(outputs["mask_positions"],tf.int32),
    }
    labels = outputs["mask_ids"]
    weights = outputs["mask_weights"]
    return features, labels, weights

features,labels,weights=preprocess_transformer(x_data)


# Masking

In [None]:
print(labels)

In [None]:
print(features["token_ids"])

In [None]:
model= get_transformer_learner(64,3,len(vocabulary)).getModel()
model.fit([features["token_ids"],features["mask_positions"]],labels,batch_size=64,epochs=100,shuffle=True)

In [None]:
output=model.predict([features["token_ids"][:5],features["mask_positions"][:5]])

In [None]:
print(output.shape)

In [None]:
print(tf.argmax(output))

In [None]:
print(labels[:5])

In [None]:
tokenizer.detokenize(tf.argmax(output,axis=-1))