# Bert Base:
BERTBASE 
(L=12, H=768, A=12, Total Parameters=110M)

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Attention

In [15]:
class MultiHeadedAttentionBERT(nn.Module):
    def __init__(self, d_in, d_out, context_length, num_heads, dropout=None, qkv_bias=False):
        super().__init__()
        self.d_out = d_out
        
        assert d_out % num_heads == 0, "embedding dimension must be divisible by number of heads"

        self.num_heads = num_heads
        self.head_dim = d_out // num_heads

        self.context_length = context_length

        self.W_q = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_k = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_v = nn.Linear(d_in, d_out, bias=qkv_bias)

        self.W_o = nn.Linear(d_out, d_out)

        self.drop_out = nn.Dropout(dropout) if dropout is not None else None

    def forward(self, x, mask=None):
        '''
        x: (batch_size, seq_len, d_in)
        mask: (batch_size, seq_len)
        '''
        # x: (batch_size, seq_len, d_in)
        batch_size, num_tokens, d_in = x.shape 

        queries = self.W_q(x) # (batch_size, seq_len, d_out)
        keys = self.W_k(x) # (batch_size, seq_len, d_out)
        values = self.W_v(x) # (batch_size, seq_len, d_out)

        queries = queries.view(batch_size, num_tokens, self.num_heads, self.head_dim).transpose(1, 2) # batch_size, num_heads, num_tokens, head_dim
        keys = keys.view(batch_size, num_tokens, self.num_heads, self.head_dim).transpose(1, 2)
        values = values.view(batch_size, num_tokens, self.num_heads, self.head_dim).transpose(1, 2)


        attn_scores = queries @ keys.transpose(2, 3)
        attn_scores = attn_scores / (self.head_dim ** 0.5)
        print(attn_scores.shape)
        # masking tokens that are of padding sentence
        if mask is not None:
            mask = mask.unsqueeze(1) # adding num_heads dimension for broadcasting
            attn_scores = attn_scores.masked_fill(mask == 0, -torch.inf)

        

        attn_scores = F.softmax(attn_scores, dim=-1)

        if self.drop_out is not None:
            attn_scores = self.drop_out(attn_scores)
        
        context = attn_scores @ values

        context = context.transpose(1, 2).contiguous().view(batch_size, num_tokens, self.d_out)

        context = self.W_o(context)

        return context


In [23]:
test = MultiHeadedAttentionBERT(2, 4, 8, 2, 0.1)
x = torch.randn(2, 8, 2)
mask = torch.tensor([[1, 1, 1, 1, 0, 0, 0, 0], [1, 1, 1, 1, 1, 0, 0, 0]]) # masking the last 4 tokens in the first sentence, and the last 3 tokens in the second sentence
print(test(x, mask))

torch.Size([2, 2, 8, 8])
tensor([[[-0.1287,  0.4158,  0.4212, -0.4352],
         [-0.1745,  0.3595,  0.4331, -0.4234],
         [-0.1653,  0.4299,  0.3920, -0.4328],
         [-0.2608,  0.5488,  0.2637, -0.4357],
         [-0.2646,  0.6225,  0.2103, -0.4449],
         [-0.1569,  0.3242,  0.4662, -0.4210],
         [-0.1582,  0.4255,  0.3965, -0.4337],
         [-0.3025,  0.6829,  0.1446, -0.4497]],

        [[-0.1433,  0.3972,  0.4615, -0.4219],
         [-0.1015,  0.3080,  0.4609, -0.4369],
         [-0.0761,  0.1232,  0.6144, -0.4131],
         [-0.1528,  0.3314,  0.3892, -0.4396],
         [-0.1022,  0.2103,  0.5551, -0.4168],
         [-0.1574,  0.3818,  0.3766, -0.4410],
         [-0.0901,  0.1804,  0.5464, -0.4220],
         [-0.1271,  0.3990,  0.3895, -0.4443]]], grad_fn=<ViewBackward0>)


# Feed Forward

In [33]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        x = self.fc1(x)
        x = F.gelu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Encoder Layer

In [34]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, context_length, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.multi_headed_attention = MultiHeadedAttentionBERT(d_model, d_model, context_length, num_heads, dropout=dropout)
        self.feed_forward = FeedForward(d_model, d_ff, dropout=dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, mask):
        x = self.norm1(x + self.dropout(self.multi_headed_attention(x, mask)))
        x = self.norm2(x + self.dropout(self.feed_forward(x)))
        return x


# BERT

In [35]:
class BERT(nn.Module):
    def __init__(self, vocab_size, d_model=768, num_heads=12, num_layers=12, d_ff=3072, max_seq_len=512, dropout=0.1):
        super().__init__()
        self.word_embeddings = nn.Embedding(vocab_size, d_model)
        self.position_embeddings = nn.Embedding(max_seq_len, d_model)
        self.token_type_embeddings = nn.Embedding(2, d_model) # # For [SEP] and [CLS] tokens

        self.layers = nn.ModuleList([EncoderLayer(d_model, max_seq_len, num_heads, d_ff, dropout) for _ in range(num_layers)])

        # Final normalization layer
        self.layernorm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)


    def forward(self, input_ids, token_type_ids=None, attention_mask=None):
        '''
        input_ids: (batch_size, seq_len)
        token_type_ids: (batch_size, seq_len)
        attention_mask: (batch_size, seq_len)
        '''
        batch_size, seq_len = input_ids.shape
        position_ids = torch.arange(0, seq_len, dtype=torch.long, device=input_ids.device).unsqueeze(0).expand_as(input_ids)

        # Word embeddings
        word_embeddings = self.word_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        token_type_embeddings = self.token_type_embeddings(token_type_ids)

        embeddings = word_embeddings + position_embeddings + token_type_embeddings
        embeddings = self.layernorm(embeddings)
        embeddings = self.dropout(embeddings)


        # Pass through the layers
        for layer in self.layers:
            embeddings = layer(embeddings, attention_mask)
        
        return embeddings
    
# Model parameters
vocab_size = 30522  # Vocabulary size from BERT
model = BERT(vocab_size)

# # Dummy input (batch_size=2, seq_len=512)
# input_ids = torch.randint(0, vocab_size, (2, 512))
# token_type_ids = torch.zeros(2, 512, dtype=torch.long)  # All 0s for simplicity
# attention_mask = torch.ones(2, 512)  # No padding, all attention is valid

# # Forward pass
# output = model(input_ids, token_type_ids, attention_mask)
# print(output.shape)  # Should be [batch_size, seq_len, d_model]

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Ví dụ với mô hình BERT ở trên
model = BERT(vocab_size=30522)  # Instantiate the BERT model
num_params = count_parameters(model)
print(f"Total trainable parameters: {num_params}")


Total trainable parameters: 108864000


# Combined Codes

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiHeadedSelfAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super(MultiHeadedSelfAttention, self).__init__()
        assert d_model % num_heads == 0, "Hidden size must be divisible by number of heads."
        
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        
        # Linear layers for query, key, value
        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(dropout)
        self.out_proj = nn.Linear(d_model, d_model)
    
    def forward(self, x, mask=None):
        batch_size, seq_len, d_model = x.shape
        
        # Linear projections
        Q = self.query(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        K = self.key(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = self.value(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        
        # Attention score computation
        attn_scores = torch.matmul(Q, K.transpose(-1, -2)) / (self.head_dim ** 0.5)
        
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -torch.inf)
        
        attn_probs = F.softmax(attn_scores, dim=-1)
        attn_probs = self.dropout(attn_probs)
        
        # Attention output
        attn_output = torch.matmul(attn_probs, V)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)
        
        # Final linear projection
        output = self.out_proj(attn_output)
        return output

class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(FeedForwardNetwork, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        x = self.fc1(x)
        x = F.gelu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadedSelfAttention(d_model, num_heads, dropout)
        self.ffn = FeedForwardNetwork(d_model, d_ff, dropout)
        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, mask=None):
        # Self-attention layer
        attn_output = self.self_attn(x, mask)
        x = self.layernorm1(x + attn_output)  # Add & Norm
        
        # Feed-Forward Network
        ffn_output = self.ffn(x)
        x = self.layernorm2(x + ffn_output)  # Add & Norm
        return x

class BERT(nn.Module):
    def __init__(self, vocab_size, d_model=768, num_heads=12, num_layers=12, d_ff=3072, max_seq_len=512, dropout=0.1):
        super(BERT, self).__init__()
        # Embedding layers
        self.word_embeddings = nn.Embedding(vocab_size, d_model)
        self.position_embeddings = nn.Embedding(max_seq_len, d_model)
        self.token_type_embeddings = nn.Embedding(2, d_model)  # For [SEP] and [CLS] tokens
        
        # Encoder layers
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        
        # Final normalization layer
        self.layernorm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, input_ids, token_type_ids=None, attention_mask=None):
        batch_size, seq_len = input_ids.shape
        position_ids = torch.arange(0, seq_len, dtype=torch.long, device=input_ids.device).unsqueeze(0).expand_as(input_ids)
        
        # Word, position, and token type embeddings
        word_embeddings = self.word_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        token_type_embeddings = self.token_type_embeddings(token_type_ids) if token_type_ids is not None else 0
        
        embeddings = word_embeddings + position_embeddings + token_type_embeddings
        embeddings = self.layernorm(embeddings)
        embeddings = self.dropout(embeddings)
        
        # Pass through the layers
        for layer in self.layers:
            embeddings = layer(embeddings, attention_mask)
        
        return embeddings

# Model parameters
vocab_size = 30522  # Vocabulary size from BERT
model = BERT(vocab_size)

# # Dummy input (batch_size=2, seq_len=512)
# input_ids = torch.randint(0, vocab_size, (2, 512))
# token_type_ids = torch.zeros(2, 512, dtype=torch.long)  # All 0s for simplicity
# attention_mask = torch.ones(2, 512)  # No padding, all attention is valid

# # Forward pass
# output = model(input_ids, token_type_ids, attention_mask)
# print(output.shape)  # Should be [batch_size, seq_len, d_model]

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


model = BERT(vocab_size=30522)  # Instantiate the BERT model
num_params = count_parameters(model)
print(f"Total trainable parameters: {num_params}")

Total trainable parameters: 108891648


In [13]:
test = MultiHeadedSelfAttention(4, 2, 0.1)
x = torch.randn(2, 8, 4)
mask = torch.tensor([[1, 1, 1, 1, 0, 0, 0, 0], [1, 1, 1, 1, 1, 0, 0, 0]]) # masking the last 4 tokens in the first sentence, and the last 3 tokens in the second sentence
print(mask.shape)
print(test(x, mask))

torch.Size([2, 8])


RuntimeError: The size of tensor a (2) must match the size of tensor b (8) at non-singleton dimension 2