# Implementing a full transformer encoder

__Objective:__ implement the encoder part of a transformer model from scratch.

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Embedding, Layer, Dense, Dropout, LayerNormalization
from tensorflow.keras.activations import gelu
from transformers import AutoTokenizer, AutoConfig

## Tokenization

An attention layer works with __token embeddings__ as the input, so we need to start by tokenizing the input text and creating the vector embeddings.

In [None]:
# Instantiate a tokenizer from a model.
model_ckpt = 'distilbert-base-uncased'

tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

Test.

In [None]:
# Test the tokenizer.
test_text = """
I know all about the honor of God, Mary Jane.
"""

test_output = tokenizer(
    test_text,
    return_tensors='tf',
    padding=True,
    # In this case we exclude the start- and end-of-sentence tokens.
    add_special_tokens=False
)

test_output

In [None]:
test_output['input_ids'].numpy()

In [None]:
tokenizer.convert_ids_to_tokens(test_output['input_ids'][0, :])

## Creation of embeddings

Create the word embeddings (vectors) from the tokenized text.

Keras' `Embedding` layer maps positive integers (tokenized text) to dense vectors of fixed size.

__Notes:__
- At this point the embeddings of the tokens know nothing about the context - each token's embedding is always the same, __irrespective of the context__ (i.e. the embedding operation is deterministic). The attention layer is there right to modify the embeddings to include context-depending information.
- We skip positional encoding for simplicity, but the information thereof should be added to the token embeddings at this point!

In [None]:
# Load the configuration parameters of the pretrained model.
config = AutoConfig.from_pretrained(model_ckpt)

config

In [None]:
# Initialize the embedding layer.
token_emb = Embedding(
    input_dim=config.vocab_size,  # We could have used tokenizer.vocab_size, it's the same.
    output_dim=config.hidden_size
)

token_emb

In [None]:
# Test the creation of embedding for some tokenized text.
# Output shape: (batch_size, seq_len, hidden_dim).
test_embeddings = token_emb(test_output['input_ids'])

test_embeddings

### Add positional encoding

We now add positional encoding to the embeddings, so each embedding also contains information of the position of the corresponding token in the sequence.

In [None]:
class Embeddings(Layer):
    """
    Class implementing the embedding layer, with output
    embeddings incorporating both information from token
    and from position embeddings.
    """
    def __init__(self, config):
        """
        Initializes the inner layers.
        """
        super().__init__()

        # Initialize all the inner layers.
        # Simple dense embedding of the numerical tokens.
        self.token_embeddings = Embedding(
            input_dim=config.vocab_size,
            output_dim=config.hidden_size
        )

        # For the positional embedding, we use the Keras
        # `Embedding` layer again, this time with an input
        # dimension equal to the maximum positional embedding
        # (the maximum index of a token within a sequence, closely
        # related to the maximum length of a sequence) rather
        # than to the size of the vocabulary.
        self.position_embeddings = Embedding(
            input_dim=config.max_position_embeddings,
            output_dim=config.hidden_size
        )

        self.layer_norm = LayerNormalization(epsilon=1e-12)

        self.dropout = Dropout(rate=config.dropout)

    def call(self, input_ids):
        """
        Forward pass of the embedding layer.
        Input: token IDs.
        Output: embeddings.

        Token and posiiton embeddings are generated for the
        input IDs and then added up. The resulting embeddings
        are then normalized with layer normalization and regularized
        with a dropout layer.
        """
        # Get the sequence length.
        seq_length = input_ids.shape[1]

        # Get all the position IDs as the rage from 0 to seq_length - 1.
        position_ids = tf.range(
            seq_length,
            dtype=tf.int64
        )

        # Create token embeddings.
        token_embeddings = self.token_embeddings(input_ids)

        # Create position embeddings.
        position_embeddings = self.position_embeddings(position_ids)[tf.newaxis, ...]

        # Combine the information in token and position embeddings
        # by adding them up. The shape of `position_embeddings` is
        # broadcast to that of `token_embeddings`.
        embeddings = token_embeddings + position_embeddings

        # Normalize the combined embeddings.
        embeddings = self.layer_norm(embeddings)

        # Dropout regularization on the embeddings.
        embeddings = self.dropout(embeddings)

        return embeddings

Test.

In [None]:
text = [
    "Six o' clock on the Christmas morning...",
    "...and for what?"
]

embedding_layer = Embeddings(config=config)

test_embeddings = embedding_layer(
    tokenizer(
        text,
        return_tensors='tf',
        padding=True,
        add_special_tokens=True
    )['input_ids']
)

test_embeddings

## A basic self-attention mechanism

We reproduce the basic operations for a single-head attention layer, acting on the test embeddings obtained above.

### Creation of query, key and value vectors

For simplicity, we can take the query, key and value vectors associated to each token embedding equal to the token embedding itself (and thus also equal to one another). This need not be the case: in general, independent weight matrices (__trainable__) are applied to get the query, key and value vectors from the token embeddings.

In [None]:
query = test_embeddings
key = test_embeddings
value = test_embeddings

dim_k = key.shape[-1]

dim_k

### Attention scores

Given an input, the attention scores (not the weights yet!) are computed as the dot product of each query vector with each key vector. This measures the similarity (relevance) of each key w.r.t. each query.

In [None]:
query.shape, key.shape

In [None]:
# Output shape: (batch_size, seq_len, seq_len).
scores = tf.matmul(
    query,
    # Leaving the batch shape as the first dimension, it's ignored
    # in the matrix multiplication.
    tf.transpose(key, perm=(0, 2, 1))
)

scores

### Attention weights

Attention weights are obtained from attention scores by:
1. Rescaling the scores dividing by $\sqrt{\text{hidden dim}}$. This is done to avoid too large scores, which would mess up with the gradient descent steps in the training phase.
2. Applying the `softmax` function to the last axis.

In [None]:
weights = tf.math.softmax(
    scores / tf.sqrt(tf.cast(dim_k, tf.float32)),
    axis=-1
)

weights

Check: row by row, if we add up all the entries in the columns we should get a value close to 1.

In [None]:
tf.reduce_sum(weights, axis=-1)

### Output of the self-attention layer

The output of the layer is a linear combination of the value vectors with weights given by the attention weights.

In [None]:
# Output shape: (batch_size, seq_len, value_size).
test_attention_output = tf.matmul(weights, value)

test_attention_output

## Multi-headed attention

Implement a multi-head attention layer.

In [None]:
def scaled_dot_product_attention(query, key, value):
    """
    Implements the scaled dot product attention operation.
    """
    # Dimension of the key vectors.
    dim_k = key.shape[-1]

    # Compute the attention scores.
    scores = tf.matmul(
        query,
        # Leaving the batch shape as the first dimension, it's ignored
        # in the matrix multiplication.
        tf.transpose(key, perm=(0, 2, 1))
    )

    # Compute the attention weights.
    weights = tf.math.softmax(
        scores / tf.sqrt(tf.cast(dim_k, tf.float32)),
        axis=-1
    )

    # Return a linear combination of the value vectors
    # with weights equal to the attention weights.
    return tf.matmul(weights, value)

Define a single attention head layer.

In [None]:
class AttentionHead(Layer):
    """
    Implementation of a single-head self-attention layer.
    """
    def __init__(self, embed_dim, head_dim):
        """
        Initializes the layers that project the input to the
        AttentionHead layer onto the corresponding query (q),
        key (k) and value (v) vectors.

        Parameters
        ----------
        embed_dim : int
            Size of the input embeddings. This is established by
            the particular token embedding chosen.
        head_dim : int
            Size of the output of the AttentionHead layer. In
            multi-head attention this will be < embed_dim, and
            the full dimension of the embeddings is recovered
            when the outputs of each head are concatenated back
            together.
        """
        # Execute the parent class' constructor.
        super().__init__()

        # Initialize the query, key and value weighting matrices.
        self.q = Dense(units=head_dim)
        self.k = Dense(units=head_dim)
        self.v = Dense(units=head_dim)

    def call(self, hidden_state):
        """
        Forward pass of the layer. The query, key and value
        vectors are computed applying the q, k and v layers
        to the input.

        Input shape: (batch_shape, seq_len, embed_dim)
        Ouput shape: (batch_shape, seq_len, head_dim)
        """
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state),
            self.k(hidden_state),
            self.v(hidden_state)
        )

        return attn_outputs

Test.

In [None]:
n_heads = 2

att_head = AttentionHead(
    embed_dim=test_embeddings.shape[-1],
    head_dim=test_embeddings.shape[-1] / n_heads
)

att_head(test_embeddings)

Define a multi-head attention layer.

In [None]:
class MultiHeadAttention(Layer):
    """
    Implementation of a multi-head self-attention layer.
    """
    def __init__(self, config):
        """
        Initializes a list of single-head self-attention layers and
        the final dense (fully-connected) layer.
        """
        super().__init__()
        
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads

        # Initialize a list of attention heads.
        self.heads = [
            AttentionHead(embed_dim=embed_dim, head_dim=head_dim)
            for _ in range(num_heads)
        ]

        # Initialize the final dense layer.
        self.output_linear = Dense(units=embed_dim)

    def call(self, hidden_state):
        """
        Forward pass: the input is passed through each head
        independently, then the outputs are concatenated back
        together and passed through the final dense layer.

        Input shape: (batch_shape, seq_len, hidden_dim)
        Output layer: (batch_shape, seq_len, hidden_dim)
        """
        # Pass the input through each head and concatenate
        # the outputs.
        x = tf.concat(
            [h(hidden_state) for h in self.heads],
            axis=-1
        )

        # Pass the concatenated outputs through the final
        # linear layer.
        x = self.output_linear(x)

        return x

Test.

In [None]:
mah_layer = MultiHeadAttention(config=config)

mah_layer(test_embeddings)

## Final feed-forward (FFN) layer

The FFN layer is a fully-connected feed-forward layer put after the MHA layer, with the architecture of a __position-wise feed-forward layer__, i.e. processing each token embedding outputted by the MHA layer __independently from the others__.

In [None]:
class FeedForward(Layer):
    """
    Implementation of the feed-forward layer to be added
    after the MHA layers (both in the encoder and in the
    decoder part of a transformer).
    """
    def __init__(self, config):
        """
        Initializes the inner layers and activation function.
        Rule of thumb for the intermediate size: 4 * [hidden_size].
        """
        super().__init__()

        self.linear_1 = Dense(
            units=config.hidden_dim,
            activation='gelu'
        )
        self.linear_2 = Dense(units=config.hidden_size)

        self.dropout = Dropout(rate=config.dropout)

    def call(self, x):
        """
        Forward pass for the layer. Sequentially, the input passes
        through:
          1. A dense layer with GELU activation function.
          2. Another dense layer with identity activation.
          3. A dropout regularization layer.

        Input shape: (batch_size, seq_len, hidden_dim)
        Output shape: (batch_size, seq_len, hidden_dim)

        Note: by default, the Dense layers act on the last (right-most)
              dimension of an input tensor, leaving any other dimension
              untouched - which is exactly what we want to process
              each embedding independently from the others.
        """
        x = self.linear_1(x)
        x = self.linear_2(x)
        x = self.dropout(x)

        return x

Test.

In [None]:
feed_forward = FeedForward(config=config)

feed_forward(
    mah_layer(test_embeddings)
)

## Layer normalization and skip connection: building the full encoder

The full encoder will have both an MHA and an FFN layer, but on top of these will also include __layer normalization__ and __skip connections__.

Layer normalization can happen __pre-layer__ or __post_layer__, according to where the layer normalization operation is put w.r.t. the skip connections. We'll implement __pre-layer normalization__, which is more numerically stable during training.

__Note:__ the input and output shapes of the encoder are __the same__ - the operations performed are not about altering the shape, but rather adding contextual information without changing the sape itself.

In [None]:
class TransformerEncoderLayer(Layer):
    """
    Class implementing the full encoder part of a transformer.
    """
    def __init__(self, config):
        """
        Initializes all the inner layers.
        """
        super().__init__()

        # Initialize all the layers.
        self.layer_norm_1 = LayerNormalization()
        self.layer_norm_2 = LayerNormalization()
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def call(self, x):
        """
        Forward pass: the input passes through the MHA and FFN
        layers, with layer normalization and skip connections
        in between. Pre-layer normalization is used.
        """
        # Compute the normalized input.
        # Note: we don't put this back into the x variable
        #       because we need to have them both for skip
        #       connections.
        hidden_state = self.layer_norm_1(x)

        # Skip connection: the input to the first layer normalization
        # is added to the output obtained from the MHA layer acting
        # on the normalized input.
        x = x + self.attention(hidden_state)

        # Same as before, but with the FFN layer instead of the MHA
        # one.
        # Note: we could have avoided defining the additional `hidden_state`
        #       variable above by doing as we are doing here.
        x = x + self.feed_forward(self.layer_norm_2(x))

        return x

Test.

In [None]:
encoder_layer = TransformerEncoderLayer(config=config)

encoder_layer(test_embeddings)