## Generative PreTrained Transformer

In [274]:
import tensorflow as tf
import json
import re
import string
import numpy as np

In [275]:
with open("./Data/wine_review/winemag-data-130k-v2.json") as data:
    wine_review = json.load(data)

In [276]:
print(len(wine_review))
print(wine_review[10])

129971
{'points': '87', 'title': 'Kirkland Signature 2011 Mountain Cuvée Cabernet Sauvignon (Napa Valley)', 'description': 'Soft, supple plum envelopes an oaky structure in this Cabernet, supported by 15% Merlot. Coffee and chocolate complete the picture, finishing strong at the end, resulting in a value-priced wine of attractive flavor and immediate accessibility.', 'taster_name': 'Virginie Boone', 'taster_twitter_handle': '@vboone', 'price': 19, 'designation': 'Mountain Cuvée', 'variety': 'Cabernet Sauvignon', 'region_1': 'Napa Valley', 'region_2': 'Napa', 'province': 'California', 'country': 'US', 'winery': 'Kirkland Signature'}


In [277]:
cleaned_review = []
for i in wine_review:
    if i["country"] is not None and i["province"] is not None and i["variety"] is not None and i["description"] is not None:
        update = "Wine Review : " + i['country'] + " : " + i["province"] + " : " + i["variety"] + " : " + i["description"]
        cleaned_review.append(update)


In [278]:
def punc_padding(sentence):
    sentence = re.sub(f"([{string.punctuation},'\n'])",r" \1 ",sentence)
    sentence = re.sub(" +"," ", sentence)
    return sentence

In [279]:
punc_padded_review = [punc_padding(x) for x in cleaned_review]

In [299]:
VOCAB_SIZE = 10000
N_HEADS = 2
KEY_DIM = 256
DENSE_DIM = 256
EMBEDDING_DIM = 512
STRING_LENGTH = 80
BATCH_SIZE = 64
EPOCHS = 5
SHUFFLE_SIZE = 1000

In [300]:
review_tensor = tf.data.Dataset.from_tensor_slices(punc_padded_review)

In [301]:
review_tensor = review_tensor.batch(BATCH_SIZE).shuffle(SHUFFLE_SIZE)

In [302]:
vectorize_layer = tf.keras.layers.TextVectorization(
    standardize="lower",
    max_tokens=VOCAB_SIZE,
    output_mode="int",
    output_sequence_length=STRING_LENGTH + 1,
)

In [303]:
vectorize_layer.adapt(review_tensor)

In [304]:
vocab = vectorize_layer.get_vocabulary()

In [305]:
vectorize_layer(punc_padded_review[10])

<tf.Tensor: shape=(81,), dtype=int64, numpy=
array([   7,   10,    2,   20,    2,   29,    2,   45,   44,    2,   68,
          3,  431,   67,    1,   52,  309,  120,   17,   12,   45,    3,
       1007,   47,  660,   48,  100,    4,  212,    5,  107, 1158,    6,
       2187,    3,  512,  305,   88,    6,  329,    3, 1928,   17,    8,
        651,   14,  990,    7,    9,  232,  145,    5, 1410, 7537,    4,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0], dtype=int64)>

In [306]:
def train_data(data):
    data = tf.expand_dims(data,-1)
    vectorize_data = vectorize_layer(data)
    train_x = vectorize_data[:,:-1]
    train_y = vectorize_data[:,1:]
    return train_x, train_y
train_dataset = review_tensor.map(train_data)


In [307]:
dataset_check = train_dataset.take(1).get_single_element()

In [308]:
#input
dataset_check[0]

<tf.Tensor: shape=(64, 80), dtype=int64, numpy=
array([[ 7, 10,  2, ...,  0,  0,  0],
       [ 7, 10,  2, ...,  0,  0,  0],
       [ 7, 10,  2, ...,  0,  0,  0],
       ...,
       [ 7, 10,  2, ...,  0,  0,  0],
       [ 7, 10,  2, ...,  0,  0,  0],
       [ 7, 10,  2, ...,  0,  0,  0]], dtype=int64)>

CREATE CAUSAL MASK SO THAT TOKEN LATER IN SEQUENCE DOESNOT IMPACT THE RESULT

In [309]:
def create_causal_mask(batch_size, target,source,dtype = tf.int32):
    query_range = tf.range(target)[:,None] # Expand the dimension to make it like row in matrix
    key_range   = tf.range(source)
    # Now we will create 1 and  0 mask of size query_range * key_range
    mask = query_range >= key_range - source + target
    mask = tf.cast(mask, dtype = dtype)
    mask = tf.reshape(mask,[1,target,source])
    # Now we need to expand the mask across batch
    tile_dimension = tf.concat([tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0) # for our use case we can do [batch_size,1,1]
    return tf.tile(mask,tile_dimension)

In [310]:
create_causal_mask(10,5,5,dtype = tf.int32)[0]

<tf.Tensor: shape=(5, 5), dtype=int32, numpy=
array([[1, 0, 0, 0, 0],
       [1, 1, 0, 0, 0],
       [1, 1, 1, 0, 0],
       [1, 1, 1, 1, 0],
       [1, 1, 1, 1, 1]])>

CREATE TRANSFORMER BLOCK

In [311]:
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, n_head , k_dim , embed_dim , dense_dim , drop_out_rate=0.1):
        super(TransformerBlock,self).__init__()
        self.n_head = n_head
        self.k_dim = k_dim
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.drop_out_rate = drop_out_rate
        self.m_h_attention = tf.keras.layers.MultiHeadAttention(n_head, k_dim, output_shape=embed_dim)
        self.ln_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.ln_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dense1 = tf.keras.layers.Dense(self.dense_dim,activation='relu')
        self.dense2 = tf.keras.layers.Dense(self.embed_dim)
        self.drop1  = tf.keras.layers.Dropout(drop_out_rate)
        self.drop2 = tf.keras.layers.Dropout(drop_out_rate)
    def call(self, inputs):
        input_shape = tf.shape(inputs)
        sequence_length = input_shape[1]
        batch_size = input_shape[0]
        causal_mask = create_causal_mask(batch_size, sequence_length,sequence_length,dtype = tf.bool)
        x_attention, x_score = self.m_h_attention(inputs,inputs,attention_mask=causal_mask, return_attention_scores=True)
        x_attention = self.drop1(x_attention)
        x = self.ln_1(inputs + x_attention)
        out = self.dense1(x)
        out = self.dense2(out)
        out = self.drop2(out)
        return (self.ln_2(x + out),x_score)
    def get_config(self):
        config = super().get_config()
        config.update(
            {
            "n_head" : self.n_head,
            "k_dim"  : self.k_dim,
            "embed_dim" : self.embed_dim,
            "drop_out_rate" : self.drop_out_rate,
            "dense_dim" : self.dense_dim
            }
        )
        return config
        

        




POSITIONAL ENCODING WITH EMBEDDING LAYER

In [312]:
class Text_positional_embedding(tf.keras.layers.Layer):
    def __init__(self,embed_dim, vocab_size, max_length):
        super(Text_positional_embedding,self).__init__()
        self.embed_dim =embed_dim
        self.vocab_size =vocab_size
        self.max_length = max_length
        self.embed1 = tf.keras.layers.Embedding(input_dim=self.vocab_size, output_dim=self.embed_dim)
        self.embed2 = tf.keras.layers.Embedding(input_dim=self.max_length, output_dim=self.embed_dim)
    def call(self, inputs):
        x = self.embed1(inputs)
        y = self.embed2(tf.range(start=0,limit = tf.shape(inputs)[-1],delta=1))
        return x + y
    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim" : self.embed_dim,
                "vocab_size" : self.vocab_size,
                "max_length" : self.max_length
            }
        )
        return config

    

# TRANSFORMER MODEL WITH 1 BLOCK

In [313]:
inputs = tf.keras.layers.Input(shape=(None,), dtype=tf.int32)
x = Text_positional_embedding(EMBEDDING_DIM,VOCAB_SIZE,STRING_LENGTH)(inputs)
x, attention_scores = TransformerBlock(
    n_head = N_HEADS, k_dim=KEY_DIM, embed_dim=EMBEDDING_DIM,dense_dim= DENSE_DIM
)(x)
outputs = tf.keras.layers.Dense(VOCAB_SIZE, activation="softmax")(x)
gpt = tf.keras.models.Model(inputs=inputs, outputs=[outputs, attention_scores])
gpt.compile("adam", loss=[tf.keras.losses.SparseCategoricalCrossentropy(), None])

In [314]:
gpt.summary()

In [315]:
class TextGenerator(tf.keras.callbacks.Callback):
    def __init__(self, index_to_word, top_k=10):
        self.index_to_word = index_to_word
        self.word_to_index = {
            word: index for index, word in enumerate(index_to_word)
        }

    def sample_from(self, probs, temperature):
        probs = probs ** (1 / temperature)
        probs = probs / np.sum(probs)
        return np.random.choice(len(probs), p=probs), probs

    def generate(self, start_prompt, max_tokens, temperature):
        start_tokens = [
            self.word_to_index.get(x, 1) for x in start_prompt.split()
        ]
        sample_token = None
        info = []
        while len(start_tokens) < max_tokens and sample_token != 0:
            x = np.array([start_tokens])
            y, att = self.model.predict(x, verbose=0)
            sample_token, probs = self.sample_from(y[0][-1], temperature)
            info.append(
                {
                    "prompt": start_prompt,
                    "word_probs": probs,
                    "atts": att[0, :, -1, :],
                }
            )
            start_tokens.append(sample_token)
            start_prompt = start_prompt + " " + self.index_to_word[sample_token]
        print(f"\ngenerated text:\n{start_prompt}\n")
        return info

    def on_epoch_end(self, epoch, logs=None):
        self.generate("wine review", max_tokens=80, temperature=1.0)

In [316]:
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath="./Data/checkpoint.weights.h5",
    save_weights_only=True,
    save_freq="epoch",
    verbose=0,
)

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./logs")

# Tokenize starting prompt
text_generator = TextGenerator(vocab)

In [317]:
gpt.fit(
    train_dataset,
    epochs=EPOCHS,
    callbacks=[model_checkpoint_callback, tensorboard_callback, text_generator]
    
)

Epoch 1/5
[1m2030/2030[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - loss: 2.6011
generated text:
wine review : us : california : zinfandel : fantastically ripe blackberry jam and dark chocolate flavors are enveloped in ripe tannins . in this zinfandel , feels melted with moderate tannins and more concentration that acidity and dusty tannins . yet are a little tannic and dry . 

[1m2030/2030[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5671s[0m 3s/step - loss: 2.6009
Epoch 2/5
[1m2030/2030[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - loss: 1.9348
generated text:
wine review : france : bordeaux : bordeaux - style red blend : fruity and fruity , this structured wine also has a mix of perfumed berry fruit flavors by acidity that balance things . a solid core of good energy goes right . there is a tarry structure , solid tannins and concentrated acidity keep this structured with age for 3–4 years . 

[1m2030/2030[0m [32m━━━━━━━━━━━━━━━━━━━━[0m

<keras.src.callbacks.history.History at 0x157e5d73990>

If we need to save the model and reuse it for prediction later follow below steps. To make sure proper deserealization pass ** kwargs in custom class __init__() method like in Transformer, Text and Positional embedding class and then to super()__init__(** kwargs)
so that base class can take care of additional name parameter given to custom class while deserealization

In [None]:
gpt.save("./saved_model/gpt1.keras")

In [None]:
custom_objects = { 'Text_positional_embedding': Text_positional_embedding, 'TransformerBlock': TransformerBlock }

In [None]:
gpt1 = tf.keras.models.load_model("./saved_model/gpt1.keras",custom_objects=custom_objects)

In [None]:
#Some Random prediction to test the proper deserialization 
x =np.array([[2,1,3,4]])
gpt1.predict(x)