<a href="https://colab.research.google.com/github/Nuri-Tas/Transformers/blob/main/Basics/Encoder_from_Scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

We will implement an encoder layer using Pytorch and Hugginface.

In [29]:
!pip install transformers



We will test the encoder layer on a simple text given below.

In [8]:
# import tokenizer and config for distilbert-base-uncased
from transformers import DistilBertTokenizer, AutoConfig

text = "Building an encoder layer from scratch"
model_ckpt = "distilbert-base-uncased"
tokenizer = DistilBertTokenizer.from_pretrained(model_ckpt)
# We will ignore the [CLS] and [SEP] tokens by setting add_special_tokens to False
encoded_text = tokenizer(text, return_tensors="pt", add_special_tokens=False)
encoded_text

{'input_ids': tensor([[ 2311,  2019,  4372, 16044,  2099,  6741,  2013, 11969]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1]])}

Get the embedding for the input ids. The number of embeddings and the embedding dim will be the vocabulary and hidden size of config for bert-base-uncased, respectively.

In [20]:
from torch import nn

config_model_ckpt = "bert-base-uncased"
config = AutoConfig.from_pretrained(config_model_ckpt)
embedding = nn.Embedding(config.vocab_size, config.hidden_size)
input_ids = encoded_text.input_ids
input_embeddings = embedding(input_ids)
print(f"The shape of the embeddings is {input_embeddings.size()} ")
input_embeddings

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

The shape of the embeddings is torch.Size([1, 8, 768]) 


tensor([[[ 0.4857,  2.3533,  1.7693,  ..., -0.3481, -2.2066, -0.2394],
         [ 0.5659,  1.3918,  0.0129,  ..., -0.8636, -0.4001,  0.8655],
         [ 0.3604, -0.4166, -1.6015,  ...,  1.0989,  0.4959, -0.0708],
         ...,
         [-0.7831, -0.1895,  0.8419,  ..., -0.1426,  1.5697, -0.0375],
         [ 1.0065, -2.8559, -0.2870,  ...,  0.4628,  1.0541, -0.5038],
         [-0.8176, -0.7503, -0.7139,  ...,  2.5549, -0.0542,  0.0290]]],
       grad_fn=<EmbeddingBackward0>)

We define the scaled dot production function which will first calculate the probability distribution for the similarity between query and key values and return the multiplication of that distribution by values.

In [21]:
from math import sqrt
import torch

def scaled_dot_product_attention(query, key, value):
  dim_query = query.size(-1)
  scores = torch.bmm(query, key.transpose(1,2) / sqrt(dim_query))
  attention_weights = F.softmax(scores, dim=-1)
  attention_outputs = torch.bmm(attention_weights, value)
  return attention_outputs

We first define a single attention head which will simply execute scaled dot productions for hidden states. `MultiAttentionHead`, however, will include 12 attention heads in our case, and will concatenate the attention outputs for each head.

In [22]:
import torch.nn.functional as F

class AttentionHead(nn.Module):
  def __init__(self, embed_dim, hidden_size):
    super().__init__()
    self.q = nn.Linear(embed_dim, hidden_size)
    self.k = nn.Linear(embed_dim, hidden_size)
    self.v = nn.Linear(embed_dim, hidden_size)

  def forward(self, hidden_state):
    attention_outputs = scaled_dot_product_attention(
        self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
    return attention_outputs

class MultiHeadAttention(nn.Module):
  def __init__(self, config):
    super().__init__()
    embed_dim = config.hidden_size
    head_numbers = config.num_attention_heads
    head_dim = embed_dim // head_numbers
    self.heads = nn.ModuleList([AttentionHead(embed_dim, head_dim) for _ in range(head_numbers)])
    self.output_layer = nn.Linear(embed_dim, embed_dim)

  def forward(self, hidden_state):
        x = torch.cat([head(hidden_state) for head in self.heads], dim=-1)
        x = self.output_layer(x)
        return x

We can confirm that `MultiAttentionHead` returns the outpuf of shape `[batch_size, seq_len, embedding_dim]`

In [23]:
multi_head_attention = MultiHeadAttention(config)
attention_outputs = multi_head_attention(input_embeddings)
attention_outputs.size()

torch.Size([1, 8, 768])

The outputs from the multi attention heads will be fed to a feed forward layers, which we define below. We also use `GELU` activation function to the first layer and apply a `Dropout` at the end.

In [24]:
class FeedForward(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.linear1 = nn.Linear(config.hidden_size, config.intermediate_size)
    self.linear2 = nn.Linear(config.intermediate_size, config.hidden_size)
    self.activation = nn.GELU()
    self.dropout = nn.Dropout(config.hidden_dropout_prob)

  def forward(self, x):
    x = self.linear1(x)
    x = self.activation(x)
    x = self.linear2(x)
    x = self.dropout(x)
    return x

Similarly, we can easily confirm that the outputs from the feed forward layers are as expected

In [26]:
ff = FeedForward(config)
ff_outputs = ff(attention_outputs)
ff_outputs.size()

torch.Size([1, 8, 768])

We now add layer normalizations and skip connections both for multi attention heads and feed forward layers

In [27]:
class EncoderLayers(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.attention = MultiHeadAttention(config)
    self.ff = FeedForward(config)
    self.layer_norm1 = nn.LayerNorm(config.hidden_size)

  def forward(self, x):
    # normalization
    hidden_state = self.layer_norm1(x)
    # skip connections
    x = x + self.attention(hidden_state)
    # apply feed forward with skip connections
    x = x + self.ff(x)
    return x

So far we have ignored the positional indexes of tokens. We will now merge the positional embeddings of input ids with token embeddings to take the positions of tokens into account as well

In [28]:
class Embeddings(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
    # note the 'vocab size' for positional embeddings is the maximum length a BERT model can be used with (512)
    self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
    # we will apply normalization and dropout to merged embeddings as well
    self.layer_norm = nn.LayerNorm(config.hidden_size)
    self.dropout = nn.Dropout(config.hidden_dropout_prob)

  def forward(self, input_ids):
    position_arange = torch.arange(input_ids.size(1), dtype=torch.long).unsqueeze(0)
    position_embeds = self.position_embeddings(position_arange)
    token_embeds = self.token_embeddings(input_ids)
    token_and_positions = token_embeds + position_embeds
    normalized = self.layer_norm(token_and_positions)
    dropped = self.dropout(normalized)
    return dropped

The task-independent part of our Encoder layers is now ready

In [30]:
class Encoder(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.embedding_layer = Embeddings(config)
    self.encoder_layers = nn.ModuleList([EncoderLayers(config) for _ in range(config.num_hidden_layers)])

  def forward(self, x):
    x = self.embedding_layer(x)
    for layer in self.encoder_layers:
      x = layer(x)
    return x

In [31]:
# We can initialize encoder layers with a simple config and feed input ids to receive encoder outputs that bcan be
encoder = Encoder(config)
encoder_outputs = encoder(input_ids)
encoder_outputs.size()

torch.Size([1, 8, 768])

Adding a task-specific layer, such as a classification head, is a breeze. Conventionally, only the first result of sequence is taken into account in classification tasks in NLP.

In [32]:
class TransformerForSequenceClassification(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.encoder = Encoder(config)
    self.classifier = nn.Linear(config.hidden_size, config.num_labels)
    self.dropout = nn.Dropout(config.hidden_dropout_prob)

  def forward(self, x):
    # only take out the outputs corresponding the first element in the sequence
    x = self.encoder(x)[:, 0, :]
    x = self.classifier(x)
    x = self.dropout(x)
    return x

In [34]:
# We can adjust the number of labels in the config file to our specific problem
config.num_labels = 3
model = TransformerForSequenceClassification(config)
# We will get 3 logits value corresponding to each class label
model(input_ids)

tensor([[ 1.1153, -0.6365, -0.3180]], grad_fn=<MulBackward0>)