In [3]:
# prompt: mount my google drive

from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [1]:
import tensorflow as tf
from tensorflow.keras import layers, Sequential, Model
from tensorflow.keras.layers import Dense, Flatten, Input, Embedding, LayerNormalization, Dropout
import numpy as np
from tensorflow import keras

In [4]:
with open('/content/drive/My Drive/training_data.txt', 'r', encoding='utf-8') as f:
    data = f.read().replace('\n', ' ')

In [5]:
print(len(data))

1115394


In [6]:
characters=list(set(list(data)))
print(len(characters))

64


In [7]:
character_to_integer_encoding={}
integer_to_character_encoding={}
for i in range(len(characters)):
    character_to_integer_encoding[characters[i]]=i+1
    integer_to_character_encoding[i+1]=characters[i]

In [8]:
def encode(string):
    global character_to_integer_encoding
    return [character_to_integer_encoding[char] for char in string]

def decode(lst):
    global integer_to_character_encoding
    return ''.join([integer_to_character_encoding[i] for i in lst])

In [9]:
input_data=encode(data)
train_data=input_data[:int(0.9*len(input_data))]
test_data=input_data[int(0.9*len(input_data)):]

In [18]:
batch_size=16
block_size=64
num_heads=4 # Experiment with other values if you want
num_transformer_blocks = 3
input_vocab_size=len(characters)+1
feed_forward_dim = 256

In [19]:
def causal_attention_mask(batch_size, n_dest, n_src):
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    mask = tf.cast(m, tf.bool)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat(
        [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
    )
    return tf.tile(mask, mult)


class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.3):
        super(TransformerBlock, self).__init__()
        # Give code for an attention layer, feedforward layers, and normalization layers. The attention layer is first, then normalization and dropout, then forward the data passed through a non-linear function, and call the dropout layer again
        self.att = layers.MultiHeadAttention(num_heads, embed_dim)
        self.feed_forward_network = Sequential(
            [layers.Dense(ff_dim, activation="relu"), Dense(embed_dim),]
        )
        self.normalization_layer_1 = LayerNormalization(epsilon=1e-6)
        self.normalization_layer_2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        block_size = input_shape[1]
        causal_mask = causal_attention_mask(batch_size, block_size, block_size)
        attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
        attention_output = self.dropout1(attention_output)
        out1 = self.normalization_layer_1(inputs + attention_output)
        feed_forward_output = self.feed_forward_network(out1)
        feed_forward_output = self.dropout2(feed_forward_output)
        return self.normalization_layer_2(out1 + feed_forward_output)

In [20]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.token_embedding = Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_embedding = Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_embedding(positions)
        x = self.token_embedding(x)
        return x + positions

In [21]:
class Transformer(Model):
    def __init__(self, maxlen, vocab_size, embed_dim, num_heads, feed_forward_dim, num_transformer_blocks):
        super().__init__()
        self.inputs = Input(shape=(maxlen,), dtype=tf.int32)
        self.embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
        self.embedding_dim = embed_dim
        self.num_transformer_blocks = num_transformer_blocks
        self.transformer_blocks = [TransformerBlock(embed_dim, num_heads, feed_forward_dim) for _ in range(num_transformer_blocks)]
        self.dense = Dense(vocab_size)

    def call(self, inputs):
        x = self.embedding_layer(inputs)
        for i in range(self.num_transformer_blocks):
            x = self.transformer_blocks[i](x)
        output = self.dense(x)
        return output


def get_transformer_model(
    maxlen,
    vocab_size,
    embed_dim,
    num_heads,
    feed_forward_dim,
    num_transformer_blocks=1
):
    inputs = Input(shape=(maxlen,), dtype=tf.int32)
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    for i in range(num_transformer_blocks):
        transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
        x = transformer_block(x)
    outputs = Dense(vocab_size)(x)
    model = Model(inputs=inputs, outputs=[outputs])
    return model

In [22]:
model = get_transformer_model(
    block_size,
    input_vocab_size,
    feed_forward_dim,
    num_heads,
    feed_forward_dim,
    num_transformer_blocks
)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(
    "adam",
    loss=[loss_fn],
    metrics=["accuracy"]
)

In [23]:
inputs = [train_data[i:i+block_size] for i in range(0, len(train_data)-block_size-1)]
targets = [train_data[i+1:i+block_size+1] for i in range(0, len(train_data)-block_size-1)]

inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs, maxlen=block_size, padding='post')
targets = tf.keras.preprocessing.sequence.pad_sequences(targets, maxlen=block_size, padding='post')

inputs = tf.convert_to_tensor(inputs, dtype=tf.int64)
targets = tf.convert_to_tensor(targets, dtype=tf.int64)

dataset= tf.data.Dataset.from_tensor_slices((inputs, targets))
dataset = dataset.shuffle(10000)
dataset = dataset.batch(batch_size, drop_remainder=True)

In [24]:
model.summary()

In [None]:
dataset= tf.data.Dataset.from_tensor_slices((inputs, targets))
dataset=dataset.shuffle(1000)
dataset = dataset.batch(batch_size, drop_remainder=True)
model.fit(dataset, epochs=10)

Epoch 1/10
[1m 2359/62736[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m16:20:18[0m 974ms/step - accuracy: 0.5310 - loss: 1.6209

In [None]:
def generate_text(model, start_index, num_generate=1):
    # Ensure train_data[start_index:start_index + block_size] is properly shaped
    input_sequence = train_data[start_index:start_index + block_size]
    generated_text = decode(input_sequence)
    probabilistic_text = decode(input_sequence)
    for i in range(num_generate):
        input_eval = tf.convert_to_tensor([input_sequence], dtype=tf.int32)
        predictions = model.predict(input_eval)
        probabilities = tf.nn.softmax(predictions[0, -1]).numpy()
        next_token = np.random.choice(len(probabilities), p=probabilities)
        next_token = np.argmax(probabilities)
        input_sequence += [next_token]
        input_sequence = input_sequence[1:]
        generated_text += decode([next_token])

    return generated_text

In [None]:
generate_text(model, start_index=len(train_data)-block_size, num_generate=1000)

In [None]:
model.save_weights('transformer_model.weights.h5')

In [None]:
model.save("saved_model.keras")

# **TESTING THE ACCURACY**

In [None]:
# Assuming you have a test dataset available (similar to the train dataset)
test_inputs = [test_data[i:i + block_size] for i in range(0, len(test_data) - block_size - 1)]
test_targets = [test_data[i + 1:i + block_size + 1] for i in range(0, len(test_data) - block_size - 1)]

test_inputs = tf.keras.preprocessing.sequence.pad_sequences(test_inputs, maxlen=block_size, padding='post')
test_targets = tf.keras.preprocessing.sequence.pad_sequences(test_targets, maxlen=block_size, padding='post')

test_inputs = tf.convert_to_tensor(test_inputs, dtype=tf.int64)
test_targets = tf.convert_to_tensor(test_targets, dtype=tf.int64)

# Create the test dataset
test_dataset = tf.data.Dataset.from_tensor_slices((test_inputs, test_targets))
test_dataset = test_dataset.batch(batch_size, drop_remainder=True)

# Evaluate the model on the test data
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"Test Accuracy: {test_accuracy}")


**block_size = 64**  # Reduced block size for faster training and to prevent overfitting


**num_heads = 4**   # Lower number of attention heads to reduce model complexity


**num_transformer_blocks = 3**  # Reduced layers to avoid overfitting


**feed_forward_dim = 128**  # Smaller feed-forward dimensions to reduce model complexity


**dropout_rate = 0.3**  # Increased dropout to prevent overfitting


**batch_size = 16**  # Smaller batch size for better generalization

