In [None]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import os
import pickle

In [None]:
with open("data.txt", 'r') as f:
    text_data = f.read()

In [None]:
text_data

In [None]:
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words = 5000, oov_token = '<OOV>')
tokenizer.fit_on_texts([text_data])

In [None]:
tokenizer.word_docs

In [None]:
tokenizer.word_counts

In [None]:
tokenizer.word_index['of']

In [None]:
tokenizer.num_words

In [None]:
sequence = tokenizer.texts_to_sequences([text_data])[0]

In [None]:
len(sequence)

In [None]:
tokenizer.word_index

In [None]:
with open("tokenizer.pkl", 'wb') as f:
    pickle.dump(tokenizer, f)

In [None]:
sequence

In [None]:
max_seq_length = 100

def create_dataset(seq, window_size = 100):
    input, label = [], []
    for i in range(len(seq) - window_size):
        input.append(seq[i: i+window_size])
        label.append(seq[i+1: i+window_size+1])
    
    return np.array(input), np.array(label)

In [None]:
X_data, y_data = create_dataset(sequence, max_seq_length)

In [None]:
X_data

In [None]:
y_data


In [None]:
len(X_data[0])

In [None]:
len(X_data)

In [None]:
len(y_data)

In [None]:
class PositionalEncoding(layers.Layer):
    def __init__(self, max_len, d_model):
        super().__init__()
        pos = np.arange(max_len)[:, np.newaxis]
        i = np.arange(d_model)[np.newaxis, :]
        angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
        angle_rads = pos * angle_rates
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        self.pos_encoding = tf.cast(angle_rads[np.newaxis, ...], dtype = tf.float32)

    def call(self, x):
        return x + self.pos_encoding[:, :tf.shape(x)[1], :]

In [None]:
def transformer_block(embed_dim, num_heads, ff_dim, dropout = 0.1):

    input = layers.Input(shape = (None, embed_dim))
    attn_output = layers.MultiHeadAttention(num_heads = num_heads, key_dim = embed_dim)(input, input)
    attn_output = layers.Dropout(dropout)(attn_output)
    out1 = layers.LayerNormalization(epsilon = 1e-6)(input + attn_output)
    
    ffn = tf.keras.Sequential([
        layers.Dense(ff_dim, activation = 'relu'),
        layers.Dense(embed_dim)
    ])
    
    ffn_output = ffn(out1)
    ffn_output = layers.Dropout(dropout)(ffn_output)
    out2 = layers.LayerNormalization(epsilon = 1e-6)(out1 + ffn_output)
    
    return tf.keras.Model(inputs = input, outputs = out2)

In [None]:
def transformer_block_with_mask_head(embed_dim, num_heads, ff_dim, dropout = 0.1, use_mask = False):

    input = layers.Input(shape = (None, embed_dim))
    mask_input = layers.Input(shape = (1, None, None), name = "attention_mask") if use_mask else None
    attn_output = layers.MultiHeadAttention(num_heads = num_heads, key_dim = embed_dim)(input, input, attention_mask = mask_input if use_mask else None)
    attn_output = layers.Dropout(dropout)(attn_output)
    out1 = layers.LayerNormalization(epsilon = 1e-6)(input + attn_output)
    
    ffn = tf.keras.Sequential([
        layers.Dense(ff_dim, activation = 'relu'),
        layers.Dense(embed_dim)
    ])
    
    ffn_output = ffn(out1)
    ffn_output = layers.Dropout(dropout)(ffn_output)
    out2  =  layers.LayerNormalization(epsilon = 1e-6)(out1 + ffn_output)
    
    return tf.keras.Model(inputs = [input, mask_input], outputs = out2)

In [None]:
vocb_size = 5000
max_seq_length = 100
embed_dim = 256
num_heads = 8
ff_dim = 1024
num_layers = 4
batch_size = 32
epoch = 1

def build_gpt_model():
    input = layers.Input(shape = (max_seq_length,))
    x = layers.Embedding(input_dim = vocb_size, output_dim = embed_dim)(input)
    x = PositionalEncoding(max_seq_length, embed_dim)(x)
    
    for _ in range(num_layers):
        x = transformer_block(embed_dim = embed_dim, num_heads = num_heads, ff_dim = ff_dim)(x)
        
    output = layers.Dense(vocb_size, activation = 'softmax')(x)
    return tf.keras.Model(inputs = input, outputs = output)

In [None]:
model = build_gpt_model()
model.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])

In [None]:
model.summary()

In [None]:
model.fit(X_data, y_data, batch_size = batch_size, epochs = epoch, validation_split = .1, verbose = 1)

In [None]:
model.save('saved_model.keras')

In [None]:
with open('tokenizer.pkl', 'rb') as f:
    tokenizer = pickle.load(f)

In [None]:
import numpy as np
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [None]:
def sample_from_logits(logits, temperature = 1.0, top_k = 10):
    logits = logits / temperature
    sorted_indices = np.argsort(logits)[::-1]
    sorted_logits = logits[sorted_indices]
    
    top_k = min(top_k, len(sorted_logits))
    top_k_indices = sorted_indices[:top_k]
    top_k_logits = sorted_logits[:top_k]
    top_k_probs = np.exp(top_k_logits) / np.sum(np.exp(top_k_logits))

    return np.random.choice(top_k_indices, p = top_k_probs)

In [None]:
def generate_text(prompt, model, tokenizer, max_seq_len, num_tokens = 50, temperature = 1.0, top_k = 10):
    input_seq = tokenizer.texts_to_sequences([prompt])[0]
    input_seq = input_seq[-max_seq_len:]
    padded = pad_sequences([input_seq], maxlen = max_seq_len)

    for _ in range(num_tokens):
        predictions = model(padded, training = False).numpy()
        next_token_logits = predictions[0][-1]
        next_token = sample_from_logits(next_token_logits, temperature, top_k)

        input_seq.append(next_token)
        input_seq = input_seq[-max_seq_len:]
        padded = pad_sequences([input_seq], maxlen = max_seq_len)

    return tokenizer.sequences_to_texts([input_seq])[0]

In [None]:
MODEL_PATH = model       # change if needed / or model can be loaded from the physically stored location on disk as well if required
TOKENIZER_PATH = tokenizer      # tokenizer saved with pickle
MAX_SEQ_LEN = 100                     # should match training
NUM_TOKENS_TO_GENERATE = 50
TEMPERATURE = 1.0
TOP_K = 10

In [None]:
generate_text("Enter your testing prompt here", model = MODEL_PATH, tokenizer = TOKENIZER_PATH, max_seq_len = MAX_SEQ_LEN, num_tokens = NUM_TOKENS_TO_GENERATE, temperature = TEMPERATURE, top_k = TOP_K)