In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from transformers import AutoTokenizer , AutoModel
from torch import nn
import torch
import torch.nn.functional as F
from transformers import AutoConfig
import math , warnings , os
from dataclasses import dataclass
from typing import List, Optional, Tuple, Union

"""
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show
from bertviz import head_view
"""

'\nfrom bertviz.transformers_neuron_view import BertModel\nfrom bertviz.neuron_view import show\nfrom bertviz import head_view\n'

##### Construct BERT Architecture

Define a scaled dot product function for (q,k,v) for self-attention
so we get a sentance of len and embedded it so we get a [batch_size  , seq_len , hidden_state] 

In [2]:
def scaled_dot_porduct_attention(query,key,value):
  
  dim_k = key.size(-1)
  #batch matrix-matrix product to ignore batch dims
  scores = torch.bmm(query , key.transpose(1,2)) / sqrt(dim_k)
  #apply softmax 
  weights = F.softmax(scores , dim=-1)
  return torch.bmm(weights , value)

In [3]:
def scaled_dot_porduct_attention(query,key,value):
  
  dim_k = key.size(-1)
  #batch matrix-matrix product to ignore batch dims
  scores = torch.bmm(query , key.transpose(1,2)) / sqrt(dim_k)
  #apply softmax 
  weights = F.softmax(scores , dim=-1)
  return torch.bmm(weights , value)
# one head self-attention
class AttentionHead(nn.Module):
    def __init__(self, embed_dim:int, head_dim:int):
        super().__init__()
        self.q = nn.Linear(embed_dim , head_dim)
        self.k = nn.Linear(embed_dim , head_dim)
        self.v = nn.Linear(embed_dim , head_dim)
    def forward(self , hidden_state):
        #compute self-attention
        atten_outputs = scaled_dot_porduct_attention(self.q(hidden_state) ,
                                                     self.k(hidden_state),
                                                     self.v(hidden_state))
        return atten_outputs


In [4]:
#Multi-headed attention 
"""
with one head self-attention :
    the softmax of one head tends to focus on mostly one aspect of similarity.
    Having several heads allows the model to focus on several aspects at once .

"""
class MultiHeadAttention(nn.Module):
    def __init__(self ,num_attention_heads:int,hidden_size:int):
        super().__init__()
        self.embedd_dim =  hidden_size #768
        self.num_heads = num_attention_heads #12 
        self.head_dim = self.embedd_dim // self.num_heads #768//12 = 64
        self.heads = nn.ModuleList(
            AttentionHead(self.embedd_dim , self.head_dim) for _ in range(self.num_heads) 
        )
        self.outputs = nn.Linear(self.embedd_dim , self.embedd_dim)
    
    def forward(self , hidden_state ):
        #concate All 12 head of 64 dim to 768 dim
        concate_heads = torch.cat([head(hidden_state) for head in self.heads] , dim=-1)
        #pass the concate_heads into a Linear layer 
        multi_heads_outputs = self.outputs(concate_heads)
        return multi_heads_outputs 

In [5]:
#position-wise feed-forward layer
"""
instead of processing the whole sequence of embeddings as a single vector,
it processes each embedding independently Like (one-dimensional convolution with a kernel size) .
"""
class FeedForward(nn.Module):
    def __init__(self , hidden_size:int,
                 intermidiate_size:int,
                 hidden_dropout_prob:int):
        super().__init__()
        #four times the size of the embeddings
        self.linear_layer_1 = nn.Linear(hidden_size ,intermidiate_size)
        self.linear_layer_2 = nn.Linear(intermidiate_size , hidden_size)
        self.glue = nn.GELU()
        self.dropout = nn.Dropout(hidden_dropout_prob)
    def forward(self , x):
        x = self.linear_layer_1(x)
        x = self.linear_layer_2(x)
        x = self.glue(x)
        x = self.dropout(x)
        return x

In [7]:
#Normaliztion
"""
The former normalizes each input in the batch to have zero mean and unity variance.
two approaches :
    - Post layer normalization 
    - Pre layer normalization
"""
class TransformerEncoderLayer(nn.Module):
    def __init__(self ,hidden_size,num_attention_heads, intermidiate_size):
        super().__init__()
        self.layer_norm1 = nn.LayerNorm(hidden_size)
        self.layer_norm2 = nn.LayerNorm(hidden_size)
        self.attention = MultiHeadAttention(num_attention_heads , hidden_size)
        self.feedforward = FeedForward(hidden_size , intermidiate_size)
    def forward(self , x):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        # Apply feed-forward layer with a skip connection
        x = x + self.feedforward(self.layer_norm2(x))
        return x

In [5]:
from transformers import PretrainedConfig
class BertConfig(PretrainedConfig):
    r"""
    This is the configuration class to store the configuration of a [`BertModel`] or a [`TFBertModel`]. It is used to
    instantiate a BERT model according to the specified arguments, defining the model architecture. Instantiating a
    configuration with the defaults will yield a similar configuration to that of the BERT
    [bert-base-uncased](https://huggingface.co/bert-base-uncased) architecture.

    Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
    documentation from [`PretrainedConfig`] for more information.


    Args:
        vocab_size (`int`, *optional*, defaults to 30522):
            Vocabulary size of the BERT model. Defines the number of different tokens that can be represented by the
            `inputs_ids` passed when calling [`BertModel`] or [`TFBertModel`].
        hidden_size (`int`, *optional*, defaults to 768):
            Dimensionality of the encoder layers and the pooler layer.
        num_hidden_layers (`int`, *optional*, defaults to 12):
            Number of hidden layers in the Transformer encoder.
        num_attention_heads (`int`, *optional*, defaults to 12):
            Number of attention heads for each attention layer in the Transformer encoder.
        intermediate_size (`int`, *optional*, defaults to 3072):
            Dimensionality of the "intermediate" (often named feed-forward) layer in the Transformer encoder.
        hidden_act (`str` or `Callable`, *optional*, defaults to `"gelu"`):
            The non-linear activation function (function or string) in the encoder and pooler. If string, `"gelu"`,
            `"relu"`, `"silu"` and `"gelu_new"` are supported.
        hidden_dropout_prob (`float`, *optional*, defaults to 0.1):
            The dropout probability for all fully connected layers in the embeddings, encoder, and pooler.
        attention_probs_dropout_prob (`float`, *optional*, defaults to 0.1):
            The dropout ratio for the attention probabilities.
        max_position_embeddings (`int`, *optional*, defaults to 512):
            The maximum sequence length that this model might ever be used with. Typically set this to something large
            just in case (e.g., 512 or 1024 or 2048).
        type_vocab_size (`int`, *optional*, defaults to 2):
            The vocabulary size of the `token_type_ids` passed when calling [`BertModel`] or [`TFBertModel`].
        initializer_range (`float`, *optional*, defaults to 0.02):
            The standard deviation of the truncated_normal_initializer for initializing all weight matrices.
        layer_norm_eps (`float`, *optional*, defaults to 1e-12):
            The epsilon used by the layer normalization layers.
        position_embedding_type (`str`, *optional*, defaults to `"absolute"`):
            Type of position embedding. Choose one of `"absolute"`, `"relative_key"`, `"relative_key_query"`. For
            positional embeddings use `"absolute"`. For more information on `"relative_key"`, please refer to
            [Self-Attention with Relative Position Representations (Shaw et al.)](https://arxiv.org/abs/1803.02155).
            For more information on `"relative_key_query"`, please refer to *Method 4* in [Improve Transformer Models
            with Better Relative Position Embeddings (Huang et al.)](https://arxiv.org/abs/2009.13658).
        is_decoder (`bool`, *optional*, defaults to `False`):
            Whether the model is used as a decoder or not. If `False`, the model is used as an encoder.
        use_cache (`bool`, *optional*, defaults to `True`):
            Whether or not the model should return the last key/values attentions (not used by all models). Only
            relevant if `config.is_decoder=True`.
        classifier_dropout (`float`, *optional*):
            The dropout ratio for the classification head.

    Examples:

    ```python
    >>> from transformers import BertConfig, BertModel

    >>> # Initializing a BERT bert-base-uncased style configuration
    >>> configuration = BertConfig()

    >>> # Initializing a model (with random weights) from the bert-base-uncased style configuration
    >>> model = BertModel(configuration)

    >>> # Accessing the model configuration
    >>> configuration = model.config
    ```"""

    model_type = "bert"

    def __init__(
        self,
        vocab_size=30522,
        hidden_size=768,
        num_hidden_layers=12,
        num_attention_heads=12,
        intermediate_size=3072,
        hidden_act="gelu",
        hidden_dropout_prob=0.1,
        attention_probs_dropout_prob=0.1,
        max_position_embeddings=512,
        type_vocab_size=2,
        initializer_range=0.02,
        layer_norm_eps=1e-12,
        pad_token_id=0,
        position_embedding_type="absolute",
        use_cache=True,
        classifier_dropout=None,
        **kwargs,
    ):
        super().__init__(pad_token_id=pad_token_id, **kwargs)

        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.num_hidden_layers = num_hidden_layers
        self.num_attention_heads = num_attention_heads
        self.hidden_act = hidden_act
        self.intermediate_size = intermediate_size
        self.hidden_dropout_prob = hidden_dropout_prob
        self.attention_probs_dropout_prob = attention_probs_dropout_prob
        self.max_position_embeddings = max_position_embeddings
        self.type_vocab_size = type_vocab_size
        self.initializer_range = initializer_range
        self.layer_norm_eps = layer_norm_eps
        self.position_embedding_type = position_embedding_type
        self.use_cache = use_cache
        self.classifier_dropout = classifier_dropout

## Bert Model For Text classification
> BERT using MLM and NSP 
### BERT EmbeddingLayer :
    consist of three parts
*   **word-embedding** :
        BERT uses an initial token embedding layer to convert input tokens into dense vectors of fixed dimensionality , so its feed into the model and it learned during training allow the model to capture 
        meaning of individual words (USING Word-piece).
*   **segment-embedding** :
        Segment embeddings are basically the sentence number that is encoded into a vector.
*   **positional-embedding** :
        Position embeddings are the position of the word within that sentence that is encoded into a vector.



In [41]:
class BertEmbeddings(nn.Module):
    """Construct the embeddings from word, position and token_type(segment-Embedding) embeddings."""

    def __init__(self, config):
        super().__init__()
        self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        #segment-embeddings
        self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size)

        # self.LayerNorm is not snake-cased to stick with TensorFlow model variable name and be able to load
        # any TensorFlow checkpoint file
        self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        # position_ids (1, len position emb) is contiguous in memory and exported when serialized
        self.position_embedding_type = getattr(config, "position_embedding_type", "absolute")
        #registered as part of a module's state
        
        #They are typically used to store non-learnable state information that is associated with
        #  a module, such as running statistics for batch normalization or position embeddings in
        #    a transformer
        
        self.register_buffer(
            "position_ids", torch.arange(config.max_position_embeddings).expand((1, -1)),
            persistent=False
        )
        self.register_buffer(
            "token_type_ids", torch.zeros(self.position_ids.size(), dtype=torch.long),
            persistent=False
        )

    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        past_key_values_length: int = 0,
    ) -> torch.Tensor:
        
        if input_ids is not None:
            input_shape = input_ids.size()
        else:
            input_shape = inputs_embeds.size()[:-1]

        seq_length = input_shape[1]

        if position_ids is None:
            position_ids = self.position_ids[:, past_key_values_length : seq_length + past_key_values_length]

        # Setting the token_type_ids to the registered buffer in constructor where it is all zeros, which usually occurs
        # when its auto-generated, registered buffer helps users when tracing the model without passing token_type_ids, solves
        # issue #5664
        if token_type_ids is None:
            if hasattr(self, "token_type_ids"):
                buffered_token_type_ids = self.token_type_ids[:, :seq_length]
                buffered_token_type_ids_expanded = buffered_token_type_ids.expand(input_shape[0], seq_length)
                token_type_ids = buffered_token_type_ids_expanded
            else:
                token_type_ids = torch.zeros(input_shape, dtype=torch.long, device=self.position_ids.device)

        if inputs_embeds is None:
            inputs_embeds = self.word_embeddings(input_ids)
        token_type_embeddings = self.token_type_embeddings(token_type_ids)

        embeddings = inputs_embeds + token_type_embeddings
        if self.position_embedding_type == "absolute":
            position_embeddings = self.position_embeddings(position_ids)
            embeddings += position_embeddings
        embeddings = self.LayerNorm(embeddings)
        embeddings = self.dropout(embeddings)

        return embeddings
    

In [53]:
from math import sqrt

#Create Class for one_head_attention
class AttentionHead(nn.Module):
  def __init__(self , embd_dim , head_dim):
    super().__init__()
    self.Q = nn.Linear(embd_dim , head_dim)
    self.K = nn.Linear(embd_dim , head_dim)
    self.V = nn.Linear(embd_dim , head_dim)
  def scaled_dot_porduct_attention(query , key , value , masked=None):
    dim_k = key.size(-1)
    scores = torch.bmm(query , key.transpose(1,2)) / sqrt(dim_k)
    if masked is not None :
      scores = scores.masked_fill(mask==0 , float("inf"))
    weights = F.softmax(scores , dim=-1)
    return torch.bmm(weights , value)
  def forward(self , hidden_state):
    attn_outs = scaled_dot_porduct_attention(self.Q(hidden_state) ,
                                             self.K(hidden_state) , self.V(hidden_state))
    return attn_outs
  

#create a MultiHeaded Attention
class MultiHeadAttention(nn.Module):
  def __init__(self , config):
    super().__init__()
    embd_dim = config.hidden_size #768
    num_heads = config.num_attention_heads #12
    head_dim = embd_dim // num_heads #64
    self.heads = nn.ModuleList(
        AttentionHead(embd_dim , head_dim) for _ in range(num_heads)
    )
    self.output_layer = nn.Linear(embd_dim , embd_dim) #input 768 dim --> get a 768 dim represent a Attention Multi head

  def forward(self , hidden_state):
    #concate All 12 head of 64 dim to 768 dim
    x = torch.cat([h(hidden_state) for h in self.heads] , dim=-1)
    #push them into a Linear layer
    x = self.output_layer(x)
    return x

In [56]:
class BertFeedForward(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.Linear_Layer_1 = nn.Linear(config.hidden_size, config.intermediate_size)
    self.Linear_Layer_2 = nn.Linear(config.intermediate_size, config.hidden_size)
    self.gelu = nn.GELU()
    self.dropout = nn.Dropout(config.hidden_dropout_prob)
  def forward(self, hidden_state):
    hidden_state = self.Linear_Layer_1(hidden_state)
    hidden_state = self.gelu(hidden_state)
    hidden_state = self.Linear_Layer_2(hidden_state)
    hidden_state = self.dropout(hidden_state)
    return hidden_state

In [59]:
class BertEncodingLayer(nn.Module):
  def __init__(self , config):
    super().__init__()
    self.LayerNorm_1 = nn.LayerNorm(config.hidden_size)
    self.LayerNorm_2 = nn.LayerNorm(config.hidden_size)
    self.attention = MultiHeadAttention(config)
    self.feed_forward = FeedForward(config)

  def forward(self , x):
    #apply layer normaliztion and then copy input into Q ,K and V
    hidden_state = self.LayerNorm_1(x)
    #apply Attention with skip connection
    x = x + self.attention(hidden_state)
    #apply FF layer with skip connection
    x = x + self.feed_forward(self.LayerNorm_2(x))
    return x

In [67]:
class BertEncoder(nn.Module):
    def __init__(self,config):
        super().__init__()
        self.Embeddings = BertEmbeddings(config)
        
        self.BertLayers = nn.ModuleList([BertEncodingLayer(config) for _ in range(config.num_hidden_layers)])
    def forward(self , input_tokens):
        
        input_tokens = self.Embeddings(input_tokens)
        for bertlayer in self.BertLayers :
            input_tokens = bertlayer(input_tokens)
        return input_tokens

In [69]:
model_ckp = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckp)
text = "hello i am omar and i love football"
encode = tokenizer(text , return_tensors='pt')
#tokens = tokenizer.convert_ids_to_tokens(encode.input_ids)
print(encode['input_ids'])
####------------------------------ Test Embedding Class-------------


config = BertConfig()
"""
embeddings = BertEmbeddings(config)
embed = embeddings(encode.input_ids)
print(embed.size())

atten = MultiHeadAttention(config)
outs = atten(embed)

feedouts = BertFeedForward(config)
outsoffeed = feedouts(outs)
print(outsoffeed.size())
"""

bert = BertEncoder(config)

outputs = bert(encode)

print(outputs)

tensor([[  101,  7592,  1045,  2572, 13192,  1998,  1045,  2293,  2374,   102]])


AttributeError: 

In [9]:
class BertSelfAttention(nn.Module):
    def __init__(self, config, position_embedding_type=None):
        super().__init__()

        if config.hidden_size % config.num_attention_heads != 0 and not hasattr(config, "embedding_size"):
            raise ValueError(
                f"The hidden size ({config.hidden_size}) is not a multiple of the number of attention "
                f"heads ({config.num_attention_heads})"
            )

        self.num_attention_heads = config.num_attention_heads
        self.attention_head_size = int(config.hidden_size / config.num_attention_heads) 
        self.all_head_size = self.num_attention_heads * self.attention_head_size

        self.query = nn.Linear(config.hidden_size, self.all_head_size)
        self.key = nn.Linear(config.hidden_size, self.all_head_size)
        self.value = nn.Linear(config.hidden_size, self.all_head_size)

        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)
        self.position_embedding_type = position_embedding_type or getattr(
            config, "position_embedding_type", "absolute"
        )
        
        #relative_key is relative distance between two tokens in a sequence of text ,
        #like in machine translation
        if self.position_embedding_type == "relative_key" or self.position_embedding_type == "relative_key_query":
            self.max_position_embeddings = config.max_position_embeddings
            self.distance_embedding = nn.Embedding(2 * config.max_position_embeddings - 1, self.attention_head_size)
        
    def transpose_for_scores(self, x: torch.Tensor) -> torch.Tensor:
        new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
        x = x.view(new_x_shape)
        return x.permute(0, 2, 1, 3)

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.FloatTensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        mixed_query_layer = self.query(hidden_states)

        
        key_layer = self.transpose_for_scores(self.key(hidden_states))
        value_layer = self.transpose_for_scores(self.value(hidden_states))
        query_layer = self.transpose_for_scores(mixed_query_layer)

        use_cache = past_key_value is not None
        # Take the dot product between "query" and "key" to get the raw attention scores.
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))

        
        if self.position_embedding_type == "relative_key" or self.position_embedding_type == "relative_key_query":
            query_length, key_length = query_layer.shape[2], key_layer.shape[2]
            if use_cache:
                position_ids_l = torch.tensor(key_length - 1, dtype=torch.long, device=hidden_states.device).view(
                    -1, 1
                )
            else:
                position_ids_l = torch.arange(query_length, dtype=torch.long, device=hidden_states.device).view(-1, 1)
            position_ids_r = torch.arange(key_length, dtype=torch.long, device=hidden_states.device).view(1, -1)
            distance = position_ids_l - position_ids_r

            positional_embedding = self.distance_embedding(distance + self.max_position_embeddings - 1)
            positional_embedding = positional_embedding.to(dtype=query_layer.dtype)  # fp16 compatibility

            if self.position_embedding_type == "relative_key":
                relative_position_scores = torch.einsum("bhld,lrd->bhlr", query_layer, positional_embedding)
                attention_scores = attention_scores + relative_position_scores
            elif self.position_embedding_type == "relative_key_query":
                relative_position_scores_query = torch.einsum("bhld,lrd->bhlr", query_layer, positional_embedding)
                relative_position_scores_key = torch.einsum("bhrd,lrd->bhlr", key_layer, positional_embedding)
                attention_scores = attention_scores + relative_position_scores_query + relative_position_scores_key

        
        attention_scores = attention_scores / math.sqrt(self.attention_head_size)
        
        if attention_mask is not None:
            # Apply the attention mask is (precomputed for all layers in BertModel forward() function)
            attention_scores = attention_scores + attention_mask

        # Normalize the attention scores to probabilities.
        attention_probs = nn.functional.softmax(attention_scores, dim=-1)

        # This is actually dropping out entire tokens to attend to, which might
        # seem a bit unusual, but is taken from the original Transformer paper.
        attention_probs = self.dropout(attention_probs)

        # Mask heads if we want to
        if head_mask is not None:
            attention_probs = attention_probs * head_mask

        context_layer = torch.matmul(attention_probs, value_layer)

        context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
        
        new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
        
        context_layer = context_layer.view(new_context_layer_shape)

        outputs = (context_layer, attention_probs) if output_attentions else (context_layer,)

        return outputs

In [None]:
class BertMultiHeadAttention(nn.Module):
    def __init__(self , config):
        super().__init__()
        
        

In [28]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x
    
class TransfomersEncodingLayer(nn.Module):
  def __init__(self , config):
    super().__init__()
    self.layer_norm1 = nn.LayerNorm(config.hidden_size)
    self.layer_norm2 = nn.LayerNorm(config.hidden_size)
    self.attention = BertSelfAttention(config)
    self.feed_forward = FeedForward(config)

  def forward(self , x):
    #apply layer normaliztion and then copy input into Q ,K and V
    hidden_state = self.layer_norm1(x)
    #apply Attention with skip connection
    outs = self.attention(hidden_state)
    x = x + outs[0]
    #apply FF layer with skip connection
    x = x + self.feed_forward(self.layer_norm2(x))
    return x

class TransfomersEncoder(nn.Module):
  def __init__(self , config):
    super().__init__()
    self.embeddings = BertEmbeddings(config)
    self.layers = nn.ModuleList([TransfomersEncodingLayer(config) for _ in range(config.num_hidden_layers)])

  def forward(self , x):
    x = self.embeddings(x)
    for layer in self.layers :
      x = layer(x)
    return x

In [29]:
model_ckp = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckp)
text = "hello i am omar and i love football"
encode = tokenizer(text , return_tensors='pt')
#tokens = tokenizer.convert_ids_to_tokens(encode.input_ids)
print(encode.input_ids)
####------------------------------ Test Embedding Class-------------
config = BertConfig()
embeddings = BertEmbeddings(config)
embed = embeddings(encode.input_ids)
print(embed.size())

tensor([[  101,  7592,  1045,  2572, 13192,  1998,  1045,  2293,  2374,   102]])
torch.Size([1, 10, 768])


In [12]:
attetion = BertSelfAttention(config)
outs = attetion(embed)

In [17]:
print(outs[0].size())

torch.Size([1, 10, 768])


In [30]:
BERT = TransfomersEncoder(config)

In [31]:
output = BERT(encode.input_ids)

In [33]:
output.size()

torch.Size([1, 10, 768])

In [34]:
class TransformerForSequenceClassification(nn.Module):
  def __init__(self ,config):
    super().__init__()
    self.encoder = TransfomersEncoder(config)
    self.dropout = nn.Dropout(config.hidden_dropout_prob)
    self.classifier = nn.Linear(config.hidden_size , config.num_labels)
  def forward(self , X):
    hidden = self.encoder(X)[: , 0 , :]
    x = self.dropout(hidden)
    x = self.classifier(x)
    return x

In [35]:
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier

TransformerForSequenceClassification(
  (encoder): TransfomersEncoder(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (layers): ModuleList(
      (0): TransfomersEncodingLayer(
        (layer_norm1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (layer_norm2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attention): BertSelfAttention(
          (query): Linear(in_features=768, out_features=768, bias=True)
          (key): Linear(in_features=768, out_features=768, bias=True)
          (value): Linear(in_features=768, out_features=768, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (feed_forward): FeedForward(
          (linear_1): Linear(in_features

In [38]:

from torchsummary import  summary

In [82]:


#Create Class for one_head_attention
class AttentionHead(nn.Module):
  def __init__(self , embd_dim , head_dim):
    super().__init__()
    
    self.Query = nn.Linear(embd_dim , head_dim)
    self.Key   = nn.Linear(embd_dim , head_dim)
    self.Value = nn.Linear(embd_dim , head_dim)

  def forward(self , hidden_state):

    Query , key , value = self.Query(hidden_state) , self.Key(hidden_state) , self.Value(hidden_state)
    dim_k = key.size(-1)
    scores = torch.bmm(Query , key.transpose(1,2)) / sqrt(dim_k)
    weights = F.softmax(scores , dim=-1)
    attn_outs = torch.bmm(weights , value)

    return attn_outs
  

#create a MultiHeaded Attention
class MultiHeadAttention(nn.Module):
  def __init__(self , config):
    super().__init__()
    embd_dim = config.hidden_size #768
    num_heads = config.num_attention_heads #12
    head_dim = embd_dim // num_heads #64
    self.heads = nn.ModuleList(
        AttentionHead(embd_dim , head_dim) for _ in range(num_heads)
    )
    self.output_layer = nn.Linear(embd_dim , embd_dim) #input 768 dim --> get a 768 dim represent a Attention Multi head

  def forward(self , hidden_state):
    #concate All 12 head of 64 dim to 768 dim
    x = torch.cat([h(hidden_state) for h in self.heads] , dim=-1)
    #push them into a Linear layer
    x = self.output_layer(x)
    return x
class BertFeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, hidden_state):
        hidden_state = self.linear_1(hidden_state)
        hidden_state = self.gelu(hidden_state)
        hidden_state = self.linear_2(hidden_state)
        hidden_state = self.dropout(hidden_state)
        return hidden_state

class BertEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = BertFeedForward(config)

    def forward(self, x):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        # Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

class BertEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = BertEmbeddings(config)
        self.layers = nn.ModuleList([BertEncoderLayer(config) 
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

In [83]:
model_ckp = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckp)
text = "hello i am omar and i love football"
encode = tokenizer(text , return_tensors='pt')
#tokens = tokenizer.convert_ids_to_tokens(encode.input_ids)
print(encode['input_ids'])
####------------------------------ Test Embedding Class-------------


config = BertConfig()
"""
embeddings = BertEmbeddings(config)
embed = embeddings(encode.input_ids)
print(embed.size())

atten = MultiHeadAttention(config)
outs = atten(embed)

feedouts = BertFeedForward(config)
outsoffeed = feedouts(outs)
print(outsoffeed.size())
"""

bert = BertEncoder(config)

outputs = bert(encode.input_ids)

tensor([[  101,  7592,  1045,  2572, 13192,  1998,  1045,  2293,  2374,   102]])
