In [8]:
from transformers import AutoTokenizer


model_ckpt = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
text = "time flies like an arrow"
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
inputs.input_ids

tensor([[ 2051, 10029,  2066,  2019,  8612]])

In [9]:
from torch import nn
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb

Embedding(30522, 768)

In [10]:
inputs_embeds = token_emb(inputs.input_ids)
inputs_embeds.size()

torch.Size([1, 5, 768])

In [13]:
import torch
from math import sqrt
query = key = value = inputs_embeds
dim_k = key.size(-1)
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
print(scores.size())

import torch.nn.functional as F 
weights = F.softmax(scores, dim=-1)
print(weights.sum(dim=-1))

attn_outputs = torch.bmm(weights, value)
print(attn_outputs.shape)



torch.Size([1, 5, 5])
tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)
torch.Size([1, 5, 768])


### Encoder

In [39]:
def scaled_dot_product_attention_without_mask(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k) 
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

In [15]:
# self-attention layer applies three independent linear transformations to each embedding to generate the query, key, and value vectors. These transformations project the embeddings and each projection carries its own set of learnable parameters, which allows the self-attention layer to focus on different semantic aspects of the sequence.
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        # [batch_size, seq_len, head_dim]
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)
    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs

In [18]:
# beneficial to have multiple sets of linear projections, each one representing a so-called attention head.
class MultiHeadAttention(nn.Module): 
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList([AttentionHead(embed_dim, head_dim) for _ in range(num_heads)])
        self.output_linear = nn.Linear(embed_dim, embed_dim)
    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x

In [19]:
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(inputs_embeds)
attn_output.size()

torch.Size([1, 5, 768])

In [26]:
#  instead of processing the whole sequence of embeddings as a single vector, it processes each embedding independently. For this reason, this layer is often referred to as a position-wise feed-forward layer. You may also see it referred to as a one-dimensional convolution with a kernel size of one. A rule of thumb from the literature is for the hidden size of the first layer to be four times the size of the embeddings, and a GELU activation function is most commonly used. This is where most of the capacity and memorization is hypothesized to happen, and it’s the part that is most often scaled when scaling up the models.

class FeedForward(nn.Module): 
    def __init__(self, config):
        super().__init__()
        # 'DistilBertConfig' object has no attribute 'intermediate_size'
        self.linear_1 = nn.Linear(config.hidden_size, 3072)
        self.linear_2 = nn.Linear(3072, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(0.1)
    def forward(self, x):
        x = self.linear_1(x) 
        x = self.gelu(x)
        x = self.linear_2(x) 
        x = self.dropout(x) 
        return x

In [27]:
# nn.Linear is usually applied to a tensor of shape (batch_size, input_dim), where it acts on each element of the batch dimension independently. This is actually true for any dimension except the last one, so when we pass a tensor of shape (batch_size, seq_len, hidden_dim) the layer is applied to all token embeddings of the batch and sequence independently
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()


torch.Size([1, 5, 768])

In [29]:
class TransformerEncoderLayer(nn.Module): 
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)
    def forward(self, x):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        # Apply feed-forward layer with a skip connection 
        x = x + self.feed_forward(self.layer_norm_2(x)) 
        return x

In [30]:
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape, encoder_layer(inputs_embeds).size()

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

In [31]:
class Embeddings(nn.Module): 
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size,  config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()
    
    def forward(self, input_ids):
        # Create position IDs for input sequence 
        seq_length = input_ids.size(1) 
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # Create token and position embeddings 
        token_embeddings = self.token_embeddings(input_ids) 
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings 
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()

torch.Size([1, 5, 768])

In [32]:
class TransformerEncoder(nn.Module): 
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config) for _ in range(config.num_hidden_layers)])
    def forward(self, x):
        x = self.embeddings(x) 
        for layer in self.layers:
            x = layer(x) 
        return x

encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()


torch.Size([1, 5, 768])

In [34]:
class TransformerForSequenceClassification(nn.Module): 
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(0.1) # DistilBert doesn't have config.hidden_dropout_prob
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
    def forward(self, x):
        x = self.encoder(x)[:, 0, :] # select hidden state of [CLS] token
        x = self.dropout(x)
        x = self.classifier(x) 
        return x

config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])

### Decoder

In [35]:
seq_len = inputs.input_ids.size(-1)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
mask[0]

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])

In [36]:
scores.masked_fill(mask == 0, -float("inf")) # attention weights are all zero once we take the softmax over the scores because 

tensor([[[27.2641,    -inf,    -inf,    -inf,    -inf],
         [ 0.5911, 27.7673,    -inf,    -inf,    -inf],
         [ 0.7643, -0.1611, 27.4965,    -inf,    -inf],
         [ 0.1593, -0.0332,  0.3982, 27.4464,    -inf],
         [ 0.0527, -0.1541, -0.2480, -0.3516, 30.6703]]],
       grad_fn=<MaskedFillBackward0>)

In [38]:
def scaled_dot_product_attention(query, key, value, mask=None): 
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k) 
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float("-inf")) 
    weights = F.softmax(scores, dim=-1)
    return weights.bmm(value)