<a href="https://colab.research.google.com/github/ariahosseini/DeepML/blob/main/010_TensorFlow_Proj_Ten_Transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Libraries:
import numpy as np
import tensorflow as tf

In [None]:
# Positional embedding:
class TokenAndPositionEmbedding(tf.keras.layers.Layer): # Define a new class TokenAndPositionEmbedding that inherits from the tf.keras.layers.Layer base class.
    def __init__(self, maxlen, vocab_size, embed_dim): # The constructor takes three parameters:
        # maxlen, vocab_size, and embed_dim
        # super() is used to call the constructor of the base class, which is tf.keras.layers.Layer in this case.
        # This is done to ensure that the initialization code for the base class is executed before the derived class's own initialization code.
        super(TokenAndPositionEmbedding, self).__init__()
        # Instantiate an embedding layer for tokens, with vocab_size input dimensions and embed_dim output dimensions. You can define your own custom embedding as well.
        self.token_emb = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        # Instantiate another embedding layer for positional information with maxlen input dimensions and embed_dim output dimensions.
        self.pos_emb = tf.keras.layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x): # It takes one argument, x (input tensor).
        maxlen = tf.shape(x)[-1] # Obtain the maximum sequence length by calling tf.shape(x)[-1]. In our example = 200
        positions = tf.range(start=0, limit=maxlen, delta=1) # This range represents the positions of the tokens in the input sequence.
        positions = self.pos_emb(positions) # Obtain the positional embeddings by passing the positions through the position embedding layer (self.pos_emb)
        x = self.token_emb(x) # Obtain the token embeddings by passing the input tensor x through the token embedding layer (self.token_emb).
        return x + positions # Add the positional embeddings to the token embeddings, element-wise, and return the result. (32 ,200, 128)

In [None]:
# Define the transformer layer:
class MultiHeadSelfAttention(tf.keras.layers.Layer): # Define a new class MultiHeadSelfAttention that inherits from the tf.keras.layers.Layer base class.
    def __init__(self, embed_dim, num_heads=8): # The constructor for the class takes two parameters, embed_dim and num_heads.
        #  super() is used to call the constructor of the base class, which is tf.keras.layers.Layer in this case.
        # This is done to ensure that the initialization code for the base class is executed before the derived class's own initialization code.
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim # 128
        self.num_heads = num_heads # 2
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )

        # projection_dim, is the dimension of each projected head in the multi-head self-attention mechanism
        self.projection_dim = embed_dim // num_heads # 64
        # The projection_dim is used to divide the input embeddings into separate parts for each attention head.
        # In the multi-head self-attention mechanism, the input embeddings are split into multiple smaller parts,
        # allowing the model to focus on different aspects of the input and capture a variety of patterns.

        # The followings are dense (fully connected) layers responsible for computing the query, key, and value matrices, respectively, from the input embeddings.
        self.query_dense = tf.keras.layers.Dense(embed_dim)
        self.key_dense = tf.keras.layers.Dense(embed_dim)
        self.value_dense = tf.keras.layers.Dense(embed_dim)

        # Another dense layer that combines the outputs from all attention heads.
        self.combine_heads = tf.keras.layers.Dense(embed_dim)

    # This method calculates the attention scores, scales them, applies the softmax function to obtain the attention weights,
    # and then computes the output by multiplying the attention weights with the value matrix.
    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True) # (32, 200, 128) * (32, 128, 200) = (32, 200, 200)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32) # tf.cast is a TensorFlow function used to change the data type of a tensor.  #  128
        # By using [-1], you are selecting the last element of the shape tensor. In this case, it corresponds to the dimension of the key vectors.

        scaled_score = score / tf.math.sqrt(dim_key)   # (32, 200, 200)
        weights = tf.nn.softmax(scaled_score, axis=-1) # (32, 200, 200)
        output = tf.matmul(weights, value)             # (32, 200, 200) * (32, 200, 128) = (32, 200, 128)
        return output, weights

    # This method is used to separate the different heads in the multi-head self-attention mechanism.
    # It reshapes and transposes the input tensor according to the number of attention heads.
    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim)) # (batch_size, seq_length, self.num_heads, self.projection_dim) = (32, 200, 2, 64)
        # By using -1, you're telling TensorFlow to compute the sequence length based on the input tensor's total number of elements,
        # divided by the product of batch_size, self.num_heads, and self.projection_dim.
        return tf.transpose(x, perm=[0, 2, 1, 3]) #  The perm argument specifies the new order of the axes. (batch_size, self.num_heads, seq_length, self.projection_dim) = (32, 2, 200, 64)

    # This method is the main entry point of the layer, and it is called when the layer is used in the model.
    # It takes the input tensor, computes the query, key, and value matrices, separates the heads,
    # calculates the attention and attention weights, combines the heads, and returns the final output.
    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]               # 32
        query = self.query_dense(inputs)               # (32, 200, 128)
        key = self.key_dense(inputs)                   # (32, 200, 128)
        value = self.value_dense(inputs)               # (32, 200, 128)
        query = self.separate_heads(query, batch_size) # (32, 2, 200, 64)
        key = self.separate_heads(key, batch_size)     # (32, 2, 200, 64)
        value = self.separate_heads(value, batch_size) # (32, 2, 200, 64)
        attention, weights = self.attention(query, key, value) # attention (32, 200, 2, 64)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3]) # attention (32, 2, 200, 64)

        # After the attention computation, the outputs from all heads are combined back into a single vector of the original embedding dimension.
        concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim)) # (32, 200, 128)
        output = self.combine_heads(concat_attention) # dense layer that combines the outputs from all attention heads.
        return output

In [None]:
# Define the Transformer block:
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1): # The constructor takes four parameters:
        # embed_dim, num_heads, ff_dim, and rate.

        super().__init__()

        # Instantiate the MultiHeadSelfAttention layer using the provided embed_dim and num_heads parameters.
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)

        # Define the position-wise feed-forward network (FFN) as a sequential model with two dense layers.
        # The first dense layer has ff_dim units and a ReLU activation function, while the second dense layer has embed_dim units.
        self.ffn = tf.keras.Sequential(
            [tf.keras.layers.Dense(ff_dim, activation="relu"), tf.keras.layers.Dense(embed_dim),]
        )

        # Instantiate two layer normalization layers with an epsilon value of 1e-6 to stabilize the layer normalization process.
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        # Instantiate two dropout layers with the given dropout rate rate.
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)


    # This method is the main entry point of the layer, and it is called when the layer is used in the model.
    # It takes two arguments: inputs (input tensor) and training (a boolean indicating whether the model is in training mode).
    def call(self, inputs, training):
        attn_output = self.att(inputs) # Pass the input through the multi-head self-attention layer (self.att).
        attn_output = self.dropout1(attn_output, training=training) # Apply the first dropout layer (self.dropout1) to the attention output.
        out1 = self.layernorm1(inputs + attn_output) # Add the attention output to the original input (residual connection) and apply the first layer normalization (self.layernorm1).
        ffn_output = self.ffn(out1) # Pass the output through the feed-forward network (self.ffn).
        ffn_output = self.dropout2(ffn_output, training=training) # Apply the second dropout layer (self.dropout2) to the feed-forward network's output.
        return self.layernorm2(out1 + ffn_output) # Add the feed-forward network's output to the output from previous step (out1) (another residual connection),
                                                  # and apply the second layer normalization (self.layernorm2).

In [None]:
# Define the model
def create_transformer_model(maxlen, vocab_size, embed_dim, num_heads, ff_dim, num_blocks, num_classes, dropout_rate=0.1):
    inputs = tf.keras.layers.Input(shape=(maxlen,))
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)

    # Loop num_blocks times to create a stacked transformer architecture with the specified number of blocks.
    for _ in range(num_blocks):
        x = TransformerBlock(embed_dim, num_heads, ff_dim, dropout_rate)(x)

    # After passing through all the transformer blocks, apply global average pooling to reduce the tensor's dimensions and capture the most relevant features.
    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    x = tf.keras.layers.Dropout(dropout_rate)(x)

    # Add a fully connected dense layer with 30 hidden units and a ReLU activation function for further feature extraction.
    x = tf.keras.layers.Dense(30, activation="relu")(x)
    x = tf.keras.layers.Dropout(dropout_rate)(x)
    outputs = tf.keras.layers.Dense(num_classes, activation="softmax")(x)

    return tf.keras.Model(inputs=inputs, outputs=outputs)

In [None]:
# Download and prep the data
(x_train, y_train), (x_val, y_val) = tf.keras.datasets.imdb.load_data(num_words=vocab_size)
print(len(x_train), "Training sequences")
print(len(x_val), "Validation sequences")
x_train = tf.keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
x_val = tf.keras.preprocessing.sequence.pad_sequences(x_val, maxlen=maxlen)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb.npz
25000 Training sequences
25000 Validation sequences


In [None]:
# Train the model
vocab_size = 20000 # Consider the top 20k words (size of the vocabulary).
maxlen = 200 # Consider the first 200 words of each movie review (maximum length of input sequences).
embed_dim = 128 # Dimension of the embeddings.
num_heads = 2 # Number of attention heads in the multi-head self-attention mechanism.
ff_dim = 30 # Eimension of the feed-forward network's hidden layer.
num_blocks = 2
num_classes = 2
dropout_rate = 0.1 # Dropout rate

model = create_transformer_model(maxlen, vocab_size, embed_dim, num_heads, ff_dim, num_blocks, num_classes, dropout_rate)

model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=["accuracy"],
)

history = model.fit(x_train, y_train, epochs=1, validation_data=(x_val, y_val))

