<a href="https://colab.research.google.com/github/Ashuku001/ML_NLP/blob/main/Ch3Transformer_anatomy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
!pip install transformers
!pip install bertviz

Collecting bertviz
  Downloading bertviz-1.4.0-py3-none-any.whl (157 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m157.6/157.6 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
Collecting boto3 (from bertviz)
  Downloading boto3-1.28.25-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.8/135.8 kB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
Collecting sentencepiece (from bertviz)
  Downloading sentencepiece-0.1.99-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
Collecting botocore<1.32.0,>=1.31.25 (from boto3->bertviz)
  Downloading botocore-1.31.25-py3-none-any.whl (11.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.1/11.1 MB[0m [31m29.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jmespath<2.0.0,>=0.7.1 (from boto3->bertviz)
  Downloading jmespath-1.0.1-py3-none-

In [1]:
from transformers import AutoTokenizer
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show

model_ckpt = "bert-base-uncased" # the model key
tokenizer = AutoTokenizer.from_pretrained(model_ckpt) # tokenizer from the pretrained model key from hub
model = BertModel.from_pretrained(model_ckpt) # model from pretrained model from hub
text = "time flies like an arrow" # a random text
show(model,"bert", tokenizer, text, display_mode="light", layer=0, head=0)

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

100%|██████████| 433/433 [00:00<00:00, 302588.08B/s]
100%|██████████| 440473133/440473133 [00:09<00:00, 45920605.75B/s]


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [2]:
# Using PyTorch to implement the transformer self attention
# first tokenize the text
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False) # tokenize the tokens and remove [SEP] and [CLS]
inputs.input_ids # confirm each word id tokenized by assigning an id

tensor([[ 2051, 10029,  2066,  2019,  8612]])

In [6]:
# adding a dense embeddings
from torch import nn
from transformers import AutoConfig

# load the config.json file for bert-base-uncased checkpoint
config = AutoConfig.from_pretrained(model_ckpt)
# from the config.json there's vocab_size and hidden_size to create a dense layer
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb

Embedding(30522, 768)

In [7]:
# now that we have a lookuptable we generate the embeddings
# by feeding in the token's input IDs
input_embeds = token_emb(inputs.input_ids)
input_embeds.size() #[batch_size, seq_len, hidden_dim]

torch.Size([1, 5, 768])

In [14]:
# create query, key and value vectors and calculate the attention scores using dot product
import torch
from math import sqrt
# create the query, key and value vectors they are equal for simplicity
query = key = value = input_embeds
# the dimension of the token use to scale the scores
dim_k = key.size(-1)
# fot product of between qiery and key vector(similarity function)
scores = torch.bmm(query, key.transpose(1, 2))  / sqrt(dim_k)

scores.size() # returns a 5 x 5 matrix of a scaled attention scores per sample in the batch

torch.Size([1, 5, 5])

In [19]:
# applying softmax to the scaled attention scores
import torch.nn.functional as F
# softmax on scaled attention score returns weight
weights = F.softmax(scores, dim=-1)
weights.sum(dim=-1)

tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)

In [17]:
# finally multiply the attention weights by the values
attn_outputs = torch.bmm(weights, value)
attn_outputs.shape
# end fo self attention

torch.Size([1, 5, 768])

In [20]:
# wrapping this steps into a function that we can use later
def scaled_dot_product_attention(query, key, value):
  """returns matrix multiplication of weights and value of each embedding"""
  dim_k = query.size(-1) # the dimension of 768
  scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k) # dot product and scale
  weights = F.softmax(scores, dim=-1) # apply softmax to ge the weights
  return torch.bmm(weights, value) # return a matrix multiplication of weight and value

# multi head attention


In [27]:
# single attention head
class AttentionHead(nn.Module):
  def __init__(self, embed_dim, head_dim):
    super().__init__()
    # initialize 3 independent linear layers
    self.q = nn.Linear(embed_dim, head_dim)
    self.k = nn.Linear(embed_dim, head_dim)
    self.v = nn.Linear(embed_dim, head_dim)

  def forward(self, hidden_state):
    # returns we
    attn_outputs = scaled_dot_product_attention(self.q(hidden_state),
                                                self.k(hidden_state),
                                                self.v(hidden_state))
    return attn_outputs

In [31]:
# concatenate outputs of each single attention head to create a full multi-head attention layer
class MultiHeadAttention(nn.Module):
  def __init__(self, config):
    super().__init__()
    embed_dim = config.hidden_size # returns 768 dimenstion for BERT-base-uncased
    num_heads = config.num_attention_heads # returns 12 for BERT-base-uncased
    head_dim = embed_dim // num_heads # 768/12 = 64
    # a list of attention heads creates (64 attention heads layers from single attention head)
    self.heads = nn.ModuleList(
        [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
    )
    self.output_linear = nn.Linear(embed_dim, embed_dim)

  def forward(self, hidden_state):
    x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
    x = self.output_linear(x)
    return x

In [32]:
config.num_attention_heads

12

In [33]:
# let us see if the multihead produces the expected shape of our inputs
# pass the config from BERT
multihead_attn = MultiHeadAttention(config)
att_output = multihead_attn(input_embeds)
att_output.size() # we still get [1, 5, 768] so it works

torch.Size([1, 5, 768])

In [44]:
# Using BertViz to visualize the attention for two different uses of the word flies
from bertviz import head_view
from transformers import AutoModel

model = AutoModel.from_pretrained(model_ckpt, output_attentions=True)

sentence_a = "i'd like to make an order"
sentence_b = "Hi, do you have conta glue"

viz_inputs = tokenizer(sentence_a, sentence_b, return_tensors="pt")
attention = model(**viz_inputs).attentions
sentence_b_start = (viz_inputs.token_type_ids == 0).sum(dim=1)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs.input_ids[0])

head_view(attention, tokens, sentence_b_start, heads=[8])

<IPython.core.display.Javascript object>

In [48]:
# The Feed-Forward Layer
class FeedForward(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
    self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
    self.gelu = nn.GELU()
    self.dropout = nn.Dropout(config.hidden_dropout_prob)

  def forward(self, x):
    x = self.linear_1(x)
    x = self.gelu(x)
    x = self.linear_2(x)
    x = self.dropout(x)
    return x


In [49]:
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()

torch.Size([1, 5, 768])

We now have a fully fledged transformer encoder layer.

In [52]:
class TransformerEncoderLayer(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
    self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
    self.attention = MultiHeadAttention(config)
    self.feed_forward = FeedForward(config)

  def forward(self, x):
    # Apply layer normalization and then copy input into query, key, value
    hidden_state = self.layer_norm_1(x)
    # Apply attention with a skip connection
    x = x + self.attention(hidden_state)
    # Apply feed-forward layer with a skip connection
    x = x + self.feed_forward(self.layer_norm_2(x))
    return x

In [54]:
# testing it with our input embeddings
encoder_layer = TransformerEncoderLayer(config)
(input_embeds.shape, encoder_layer(input_embeds).size())

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

With that we have implemented a transformer encoder layer from scratch

In [62]:
# positional Embeddings
class Embeddings(nn.Module):
  def __init__(self, config):
    super().__init__()
    # token embedding vocab size and the hidden size dim
    self.token_embeddings = nn.Embedding(config.vocab_size,
                                         config.hidden_size)
    # position embedding
    self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                              config.hidden_size)
    # normalizatin layer
    self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
    # a dropout layer
    self.dropout = nn.Dropout()

  def forward(self, input_ids):
    # Create position IDs for input sequence
    seq_length = input_ids.size(1)
    position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
    # Create token and position embeddings
    token_embeddings = self.token_embeddings(input_ids)
    position_embeddings = self.position_embeddings(position_ids)
    # Combine token and position embeddings
    embeddings = token_embeddings + position_embeddings
    embeddings = self.layer_norm(embeddings) # add a normalization layer
    embeddings - self.dropout(embeddings) # add a dropout layer
    return embeddings

# with that the embedding layer creates a sinle dense embeddng for each token


In [63]:
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()

torch.Size([1, 5, 768])

In [65]:
# combining the embendings with the encoder layers
class TransformerEncoder(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.embeddings = Embeddings(config)
    self.layers = nn.ModuleList([TransformerEncoderLayer(config)
    for _ in range(config.num_hidden_layers)
    ])

  def forward(self, x):
    x = self.embeddings(x)
    for layer in self.layers:
      x = layer(x)
      return x

In [66]:
# checking the output shapes
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

torch.Size([1, 5, 768])

In [67]:
# Adding a classificatio head
# we extends the existing encodet for sequence classification
class TransformerSequenceClassification(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.encoder = TransformerEncoder(config)
    self.dropout = nn.Dropout(config.hidden_dropout_prob)
    self.classifier = nn.Linear(config.hidden_size, config.num_labels)

  def forward(self, x):
    x = self.encoder(x)[:, 0, :] #select hidden state of [CLS] token
    x = self.dropout(x)
    x = self.classifier(x)
    return x

In [68]:
# B4 initializing model we define how many classes we would like to predict
config.num_labels = 3
encoder_classifier = TransformerSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])

In [70]:
encoder_classifier(inputs.input_ids)

tensor([[0.9585, 0.5555, 0.5415]], grad_fn=<AddmmBackward0>)

# Decoder


In [71]:
# a naseked self attention
seq_len = inputs.input_ids.size(-1) # returns 5
# create the lower triangle of the matrix 5 x 5
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
mask[0]

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])

In [72]:
# prevent each attention head from peeking at future tokens
# set where therse is 0 in mask to negative infinity
scores.masked_fill(mask==0, -float("inf"))

tensor([[[ 2.9338e+01,        -inf,        -inf,        -inf,        -inf],
         [-8.5113e-03,  2.5657e+01,        -inf,        -inf,        -inf],
         [ 1.5861e+00,  9.0171e-01,  2.5969e+01,        -inf,        -inf],
         [ 6.1246e-01, -7.3900e-01, -8.8938e-02,  2.9068e+01,        -inf],
         [-5.9163e-01, -1.2576e+00,  4.0235e-01,  1.6226e+00,  2.5661e+01]]],
       grad_fn=<MaskedFillBackward0>)

In [73]:
def scaled_dot_product_attention(query, key, value, mask=None):
  dim_k = query.size(-1)
  scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
  if mask is not None:
    scores = scores.masked_fill(mask == 0, float("-inf"))
  weights = F.softmax(scores, dim=-1)
  return weights.bmm(value)