<a href="https://colab.research.google.com/github/Buggia11/Transformer-Architecture/blob/main/Transformer_Architecture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import re
from IPython.display import clear_output

# Layer per il Transformer
from tensorflow.keras.layers import (
    Dense, Input, Embedding, Dropout,
    LayerNormalization, MultiHeadAttention,
    GlobalAveragePooling1D
)

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.callbacks import LambdaCallback, ModelCheckpoint
from tensorflow.keras.preprocessing.sequence import pad_sequences

import tensorflow as tf

In [None]:
try:
    with open('/content/train.dat', 'r') as file:
        text = file.read().lower()
except FileNotFoundError:
    text = ""
    print("Error: The file '/content/train.dat' was not found.")

print(f'Total characters in text: {len(text)}')


Total characters in text: 8388608


In [None]:
import string
import re

def pad_punctuation(s):
  # Replace punctuation with an empty string to eliminate it
  s = re.sub(f"([{string.punctuation}])", '', s)
  s = re.sub(' +', ' ', s)
  return s

In [None]:
import tensorflow as tf

# Apply pad_punctuation to the entire text string once
processed_text = pad_punctuation(text)
# Create a dataset from this single processed string
text_ds = tf.data.Dataset.from_tensor_slices([processed_text])

vectorize_layer = tf.keras.layers.TextVectorization(
    standardize = 'lower',
    max_tokens = 10000,
    output_mode = "int",
    output_sequence_length = 200 + 1,
)

vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary()

In [None]:
def prepare_inputs(text):
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y

train_ds = text_ds.map(prepare_inputs)

# **Multihead Attention**
Creating a MultiHeadAttention layer in Keras

In [None]:
tf.keras.layers.MultiHeadAttention(
    num_heads = 4, # Multihead attention layer with 4 heads
    key_dim = 128, # The Keys (and query) are vectors of length 128
    value_dim = 32, # The values (and therefore also the output from each head) are vectors of length 32 (128/4)
    output_shape = 256 # The output vector has length 256
    )

<MultiHeadAttention name=multi_head_attention, built=False>

# **Causal Masking**
Causal mask function

In [None]:
def casual_attention_mask (batch_size, n_dest, n_src, dtype):
    i = tf.range(n_dest)[:, None] # Creating the Grid (i and j) # Column vector
    j = tf.range(n_src) # Row vector
    m = i >= j - n_src + n_dest # Create Lower Triangular Matrix
    mask = tf.cast(m, dtype) # Turns True/False into 1s and 0s
    mask = tf.reshape(mask, [1, n_dest, n_src]) # Adds a "batch" dimension
    mult = tf.concat(
        [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
    )
    return tf.tile(mask, mult)

np.transpose(casual_attention_mask(1, 10, 10, dtype = tf.int32)[0])

array([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [0, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [0, 0, 1, 1, 1, 1, 1, 1, 1, 1],
       [0, 0, 0, 1, 1, 1, 1, 1, 1, 1],
       [0, 0, 0, 0, 1, 1, 1, 1, 1, 1],
       [0, 0, 0, 0, 0, 1, 1, 1, 1, 1],
       [0, 0, 0, 0, 0, 0, 1, 1, 1, 1],
       [0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
       [0, 0, 0, 0, 0, 0, 0, 0, 1, 1],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 1]], dtype=int32)

# **Transformer Block**
TransformerBlock layer in Keras

In [None]:
class TransformerBlock(tf.keras.layers.Layer):
  def __init__(self, num_heads, key_dim, embed_dim, ff_dim, dropout_rate=0.1): # Definition of the sublayers that make up the TransformerBlock layer.
    super(TransformerBlock, self).__init__()
    self.num_heads = num_heads
    self.key_dim = key_dim
    self.embed_dim = embed_dim
    self.ff_dim = ff_dim
    self.dropout_rate = dropout_rate
    self.attn = tf.keras.layers.MultiHeadAttention(
        num_heads, key_dim, output_shape = embed_dim
    )
    self.dropout_1 = tf.keras.layers.Dropout(self.dropout_rate)
    self.ln_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
    self.ffn_1 = tf.keras.layers.Dense(self.ff_dim, activation="relu")
    self.ffn_2 = tf.keras.layers.Dense(self.embed_dim)
    self.dropout_2 = tf.keras.layers.Dropout(self.dropout_rate)
    self.ln_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
  def call(self, inputs):
    input_shape = tf.shape(inputs)
    batch_size = input_shape[0]
    seq_len = input_shape[1]
    causal_mask = casual_attention_mask(
        batch_size, seq_len, seq_len, tf.bool
    ) # The causal mask is created to hide future keys from the query
    attention_output, attention_scores = self.attn(
        inputs,
        inputs,
        attention_mask=causal_mask,
        return_attention_scores=True
    ) # The multihead attention layer is created, with the attention masks specified.
    attention_output = self.dropout_1(attention_output)
    out1 = self.ln_1(inputs + attention_output) # The first add and normalization layer
    ffn_1 = self.ffn_1(out1) # The feed-forward layers
    ffn_2 = self.ffn_2(ffn_1)
    ffn_output = self.dropout_2(ffn_2)
    return (self.ln_2(out1 + ffn_output), attention_scores) # The second add and normalization layer

# **The Token and Position Embedding layer**

In [None]:
class TokenAndPositionEmbedding(tf.keras.layers.Layer):
  def __init__(self, maxlen, vocab_size, embed_dim):
    super(TokenAndPositionEmbedding, self).__init__()
    self.maxlen = maxlen
    self.vocab_size =vocab_size
    self.embed_dim = embed_dim
    self.token_emb = tf.keras.layers.Embedding(
        input_dim=vocab_size, output_dim=embed_dim
    )
    self.pos_emb = tf.keras.layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

  def call(self, x):
    maxlen = tf.shape(x)[-1]
    positions = tf.range(start=0, limit=maxlen, delta=1)
    positions = self.pos_emb(positions)
    x = self.token_emb(x)
    return x + positions

In [None]:
MAX_LEN = 200
VOCAB_SIZE = 10000
EMBEDDING_DIM = 256
N_HEADS = 4
KEY_DIM = 256
FEED_FORWARD_DIM = 256

inputs = Input(shape=(None,), dtype=tf.int32)
x = TokenAndPositionEmbedding(MAX_LEN, VOCAB_SIZE, EMBEDDING_DIM)(inputs)
x, attention_scores = TransformerBlock(
    N_HEADS, KEY_DIM, EMBEDDING_DIM, FEED_FORWARD_DIM
)(x)
outputs = Dense(VOCAB_SIZE, activation = 'softmax')(x)
gpt = Model(inputs=inputs, outputs=[outputs, attention_scores])
gpt.compile("adam", loss=[tf.keras.losses.SparseCategoricalCrossentropy(), None])
gpt.fit(train_ds, epochs=50)

Epoch 1/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 7s/step - loss: 9.2094
Epoch 2/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 431ms/step - loss: 8.7002
Epoch 3/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 438ms/step - loss: 8.2003
Epoch 4/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 417ms/step - loss: 7.7470
Epoch 5/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 418ms/step - loss: 7.3134
Epoch 6/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 429ms/step - loss: 6.8916
Epoch 7/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 418ms/step - loss: 6.4613
Epoch 8/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 426ms/step - loss: 6.0285
Epoch 9/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 409ms/step - loss: 5.6069
Epoch 10/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 426ms/step - loss: 5.1823
Epoch 11/50


<keras.src.callbacks.history.History at 0x7f97d0cbf110>

In [None]:
from tensorflow.keras.callbacks import Callback # Import the Callback class
from IPython.display import clear_output, Markdown # Import Markdown for display
import numpy as np # Import numpy for numerical operations

class TextGenerator(Callback):
  def __init__(self, model, index_to_word, top_k=10):
    super(TextGenerator, self).__init__() # Call the parent class's __init__
    self._generator_model = model # Assign the model here using a different attribute name
    self.index_to_word = index_to_word
    self.word_to_index = {
        word: index for index, word in enumerate(index_to_word)
    }

  def sample_from(self, probs, temperature):
    probs = probs ** (1 / temperature)
    probs = probs / np.sum(probs)
    return np.random.choice(len(probs), p=probs), probs

  def generate(self, start_prompt, max_tokens, temperature, epoch_num):
    start_tokens = [
        self.word_to_index.get(x, 1) for x in start_prompt.split()
    ]
    sample_token = None
    info = []
    while len(start_tokens) < max_tokens and sample_token != 0:
      x = np.array([start_tokens])
      # Predict using the custom attribute for the model and unpack outputs
      y, _ = self._generator_model.predict(x, verbose=0)
      sample_token, probs = self.sample_from(y[0][-1], temperature)
      info.append({'prompt': start_prompt , 'word_probs': probs})
      start_tokens.append(sample_token)
      start_prompt = start_prompt + ' ' + self.index_to_word[sample_token]
    display(Markdown(f"\n**Generated text after epoch {epoch_num + 1}:**\n{start_prompt}\n"))
    return info

  def on_epoch_end(self, epoch, logs=None):
    clear_output(wait=True)
    self.generate("Detection of tissue injury", max_tokens=50, temperature=0.8, epoch_num=epoch)

In [None]:
text_generator_callback = TextGenerator(gpt, vocab)
text_generator_callback.generate("Detection of tissue injury", max_tokens=50, temperature=0.8, epoch_num=50)


**Generated text after epoch 51:**
Detection of tissue injury and hospital outcome with direct angioplasty for acute myocardial infarction to assess the safety of direct infarct angioplasty without antecedent thrombolytic therapy catheterization laboratory and hospital events were assessed in consecutively treated patients with infarctions involving the left anterior descending n 100 patients right n 100


[{'prompt': 'Detection of tissue injury',
  'word_probs': array([1.13372884e-07, 1.27417209e-07, 1.67942594e-03, ...,
         1.42970023e-07, 1.21950393e-07, 1.11294888e-07], dtype=float32)},
 {'prompt': 'Detection of tissue injury and',
  'word_probs': array([1.0676764e-07, 7.3208810e-08, 1.7283497e-05, ..., 8.9226184e-08,
         6.7745269e-08, 9.6228071e-08], dtype=float32)},
 {'prompt': 'Detection of tissue injury and hospital',
  'word_probs': array([1.3369565e-07, 1.2369412e-07, 2.6937660e-05, ..., 1.7171563e-07,
         1.2391578e-07, 9.4497892e-08], dtype=float32)},
 {'prompt': 'Detection of tissue injury and hospital outcome',
  'word_probs': array([1.1109459e-08, 5.3804188e-09, 4.3137294e-05, ..., 1.0712761e-08,
         1.7557619e-08, 1.2026059e-08], dtype=float32)},
 {'prompt': 'Detection of tissue injury and hospital outcome with',
  'word_probs': array([4.5403304e-08, 3.1174487e-08, 1.7679216e-04, ..., 6.3189411e-08,
         4.4223921e-08, 5.4511677e-08], dtype=float3