In [30]:
from transformers import AutoTokenizer, AutoConfig

model_cpk = 'bert-base-uncased'

tokenizer = AutoTokenizer.from_pretrained(model_cpk)
config = AutoConfig.from_pretrained(model_cpk)

print(f'tokenizer: {tokenizer}')
print(f"configuration: {config}")

tokenizer: BertTokenizerFast(name_or_path='bert-base-uncased', vocab_size=30522, model_max_length=512, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=True),  added_tokens_decoder={
	0: AddedToken("[PAD]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	100: AddedToken("[UNK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	101: AddedToken("[CLS]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	102: AddedToken("[SEP]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	103: AddedToken("[MASK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
}
configuration: BertConfig {
  "_name_or_path": "bert-base-uncased",
  "architectures": [
    "BertForMaskedLM"
  ],


In [31]:
from torch import nn

text = 'time flies like an arrow'

inputs = tokenizer(text, return_tensors='pt', add_special_tokens=False)
print(f"inputs IDs: {inputs.input_ids}")

embedded_layer = nn.Embedding(config.vocab_size, config.hidden_size)
inputs_emb = embedded_layer(inputs.input_ids)
print(f'inputs embedding size: {inputs_emb.size()}')

inputs IDs: tensor([[ 2051, 10029,  2066,  2019,  8612]])
inputs embedding size: torch.Size([1, 5, 768])


### This function calculate an attention value using dot product method

In [32]:
import torch.nn.functional as F
import torch
from torch import nn
from math import sqrt

def scaled_dot_preduct_attention(query, key, values) -> torch.Tensor:
    size_k = query.size(-1)
    score = torch.bmm(query, key.transpose(1, 2))/sqrt(size_k)
    weights = F.softmax(score, dim=-1)

    return torch.bmm(weights, values)

### Attension head - one of the basic componet of attension mechanism

In [33]:
class AttentionHead(nn.Module):
    def __init__(self, embedded_dim, head_num) -> None:
        super().__init__()
        self.q = nn.Linear(embedded_dim, head_num)
        self.k = nn.Linear(embedded_dim, head_num)
        self.v = nn.Linear(embedded_dim, head_num)

    def forward(self, hidden_state: torch.Tensor) -> torch.Tensor:
        atten_out = scaled_dot_preduct_attention(self.q(hidden_state),
                                                 self.k(hidden_state),
                                                 self.v(hidden_state))
        return atten_out

### Multi head attention - some AttentionHead connected to one. Typice BERT model have 12 head|


In [34]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config) -> None:
        super().__init__()
        embedding_dim = config.hidden_size
        num_head = config.num_attention_heads
        head_dim = embedding_dim//num_head
        
        self.heads = nn.ModuleList([AttentionHead(embedding_dim, num_head) for _ in range(head_dim)])
        self.output_liner = nn.Linear(embedding_dim, embedding_dim)

    def forward(self, x):
        x = torch.cat([h(x) for h in self.heads], dim=-1)
        x = self.output_liner(x)

        return x

In [35]:
# testing
MultiHead = MultiHeadAttention(config)
MultiHead(inputs_emb).size()

torch.Size([1, 5, 768])

### Feed forward nn

In [36]:
class FeedForward(nn.Module):
    def __init__(self, config) -> None:
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size,
                                  config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size,
                                  config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)

        return x

In [37]:
# testing
feed_forward = FeedForward(config)
feed_forward(inputs_emb).size()

torch.Size([1, 5, 768])

### Implement pre post layer normalization in transformer encoder layer

In [38]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_normalization_1 = nn.LayerNorm(config.hidden_size)
        self.layer_normalization_2 = nn.LayerNorm(config.hidden_size)

        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        x = self.layer_normalization_1(x)
        x = x + self.attention(x)
        x = self.layer_normalization_2(x)
        x = x + self.feed_forward(x)
        x = self.layer_normalization_2(x)

        return x

In [39]:
# Testing
Transformer_encoder = TransformerEncoderLayer(config)
Transformer_encoder(inputs_emb).size()

torch.Size([1, 5, 768])

### Positional Embeding - its make embeding on enkoded position of token in sequence

In [40]:
class Embedding(nn.Module):
    def __init__(self, config) -> None:
        super().__init__()
        self.embeding_token = nn.Embedding(config.vocab_size, config.hidden_size)
        self.embeding_position = nn.Embedding(config.max_position_embeddings, config.hidden_size)

        self.layer_norm = nn.LayerNorm(config.hidden_size)
        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Create positions od IDs for input sequence
        seq_len = x.size(1)
        position_ids = torch.arange(seq_len, dtype=torch.long).unsqueeze(dim=0)

        # Create token and position embeded
        token_emb = self.embeding_token(x)
        position_emb = self.embeding_position(position_ids)

        embedings = token_emb + position_emb
        embedings = self.layer_norm(embedings)

        embedings = self.dropout(embedings)

        return embedings

In [41]:
# Test
emb = Embedding(config)
emb(inputs.input_ids).size()

torch.Size([1, 5, 768])

### Full tansformer encoder using config.num_hidden_layers (12) typice for BERT model

In [42]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeded = Embedding(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config) for _ in range(config.num_hidden_layers)])

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.embeded(x)
        for layer in self.layers:
            x = layer(x)

        return x

In [43]:
# Testing
Transformer_enc = TransformerEncoder(config)
Transformer_enc(inputs.input_ids).size()

torch.Size([1, 5, 768])

### Clf head

In [44]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config) -> None:
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)
        self.clf = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, x):
        # Wywołanie metody forward encoder'a
        x = self.encoder(x)
        # Pobranie wektora dla [CLS] (pierwszy token)
        x = x[:, 0, :]
        x = self.dropout(x)
        x = self.clf(x)
        return x

In [45]:
config.num_labels = 3

Transformer_clf = TransformerForSequenceClassification(config)
Transformer_clf(inputs.input_ids).size()

torch.Size([1, 3])

### Transformer with Decoder layer

Only modify relative to encoder layer is that we use mask (triangular matrix of weights attention to protect model to cheat and copy output. No predict|)

In [46]:
def scaled_dot_product_attention(query, key, values, mask=None):
    dim_k = query.size(-1)
    score = torch.bmm(query, key.transpose(1, 2))/sqrt(dim_k)

    if mask is not None:
        seq_len = query.size(-2)
        mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)

        score = score.masked_fill(mask==0, -float('inf'))

    weight = F.Softmax(score)

    return torch.bmm(values, weight)

### Attension head similar to encoder Attension head

In [47]:
class AttentionHeadDecoder(nn.Module):
    def __init__(self, embedded_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embedded_dim, head_dim)
        self.k = nn.Linear(embedded_dim, head_dim)
        self.v = nn.Linear(embedded_dim, head_dim)

    def forward(self, query, key, value) -> torch.Tensor:
        return scaled_dot_product_attention(self.q(query), self.k(key), self.v(value), mask=1)

In [48]:
class MultiHeadAttentionDecoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        embedding_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embedding_dim // num_heads

        self.heads = nn.ModuleList([AttentionHeadDecoder(embedding_dim, head_dim) for _ in range(num_heads)])
        self.linear_output = nn.Linear(embedding_dim, embedding_dim)

    def forward(self, query, key, value) -> torch.Tensor:
        x = torch.cat([head(query, key, value) for head in self.heads], dim=-1)
        return self.linear_output(x)

Diferent on decoder and encoder layer is that the decoder have cross attension. Thats mean we use encoder output to calulate a attension (secound attenson cause first is calcualte the same in encoder layer)

In [49]:
class DecoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_3 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttentionDecoder(config)
        self.cross_attention = MultiHeadAttentionDecoder(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x, encoder_out):
        attn_out = self.attention(self.layer_norm_1(x), x, x)
        x = x + attn_out
        cross_attn_out = self.cross_attention(self.layer_norm_2(x), encoder_out, encoder_out)
        x = x + cross_attn_out
        ff_out = self.feed_forward(self.layer_norm_3(x))
        x = x + ff_out
        return x

In [50]:
class TransformerDecoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layers = nn.ModuleList([DecoderLayer(config) for _ in range(config.num_hidden_layers)])
        self.layer_norm = nn.LayerNorm(config.hidden_size)

    def forward(self, x, encoder_output):
        for layer in self.layers:
            x = layer(x, encoder_output)
        return self.layer_norm(x)

In [51]:
class Transformer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.decoder = TransformerDecoder(config)

    def forward(self, src, tgt):
        encoder_output = self.encoder(src)
        decoder_output = self.decoder(tgt, encoder_output)
        return decoder_output


In [52]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.transformer = Transformer(config)
        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)
        self.clf = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, src, tgt):
        transformer_output = self.transformer(src, tgt)
        x = transformer_output[:, 0, :]
        x = self.dropout(x)
        x = self.clf(x)
        return x