# ðŸ“˜ NLP Assignment: Custom BERT Model for Natural Language Inference (NLI)

âœ… **Task 1:** Training a custom BERT model from scratch using the BookCorpus dataset.

In [1]:
import math
import re
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from datasets import load_dataset
from transformers import BertTokenizer
import os

In [2]:
# Set GPU device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [3]:
# Load dataset (BookCorpus subset)
dataset = load_dataset('bookcorpus', split='train[:1%]')
sentences = dataset['text'][:50000]  # Using only 100K sentences

In [4]:
# Load tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

In [5]:
# Tokenization & Masking
def tokenize_and_mask(sentences, tokenizer, mask_prob=0.20):
    inputs = tokenizer(sentences, padding=True, truncation=True, max_length=128, return_tensors="pt")
    labels = inputs.input_ids.clone()
    rand = torch.rand(labels.shape)
    mask_arr = (rand < mask_prob) & (labels != tokenizer.pad_token_id)
    
    inputs.input_ids[mask_arr] = tokenizer.mask_token_id
    labels[~mask_arr] = -100  # Ignore loss for unmasked tokens
    
    return inputs, labels

In [6]:
# Define Custom BERT Model
class CustomBERT(nn.Module):
    def __init__(self, vocab_size, hidden_dim=512, num_heads=8, num_layers=6):
        super(CustomBERT, self).__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_dim)
        self.encoder_layers = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads), num_layers
        )
        self.output_layer = nn.Linear(hidden_dim, vocab_size)

    def forward(self, input_ids):
        x = self.embedding(input_ids)
        x = self.encoder_layers(x)
        logits = self.output_layer(x)
        return logits


In [7]:
# Prepare tokenizer and model
tokenized_data, labels = tokenize_and_mask(sentences, tokenizer)
tokenized_data = {key: val.to(device) for key, val in tokenized_data.items()}
labels = labels.to(device)

vocab_size = tokenizer.vocab_size
model = CustomBERT(vocab_size).to(device)

In [8]:
# Training setup
optimizer = optim.AdamW(model.parameters(), lr=1e-5)
criterion = nn.CrossEntropyLoss()

In [9]:
accumulation_steps = 4  # Adjust based on memory

def train_model(model, data, labels, optimizer, criterion, epochs=10):
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        for i in range(0, len(data["input_ids"]), accumulation_steps):
            batch_input = data["input_ids"][i : i + accumulation_steps]
            batch_labels = labels[i : i + accumulation_steps]

            outputs = model(batch_input)
            loss = criterion(outputs.view(-1, vocab_size), batch_labels.view(-1))
            loss = loss / accumulation_steps  # Normalize loss

            loss.backward()

            if (i + 1) % accumulation_steps == 0:
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()
                optimizer.zero_grad()

        print(f"Epoch {epoch+1}, Loss: {loss.item()}")


In [10]:
# Train the model
train_model(model, tokenized_data, labels, optimizer, criterion)

Epoch 1, Loss: 2.6512725353240967
Epoch 2, Loss: 2.635631561279297
Epoch 3, Loss: 2.6408047676086426
Epoch 4, Loss: 2.6295759677886963
Epoch 5, Loss: 2.658684253692627
Epoch 6, Loss: 2.648127794265747
Epoch 7, Loss: 2.6480934619903564
Epoch 8, Loss: 2.645524024963379
Epoch 9, Loss: 2.6441519260406494
Epoch 10, Loss: 2.643312692642212


In [11]:
# Save trained model weights
torch.save(model.state_dict(), "trained_bert_model.pth")
print("Model training complete and saved for Task 2.")

Model training complete and saved for Task 2.


âœ… **Task 2:** Fine-tuning the model for Sentence-BERT (SBERT) on the SNLI dataset.

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertTokenizer
from datasets import load_dataset
from torch.utils.data import DataLoader, TensorDataset

# Load trained model configuration
checkpoint = torch.load("trained_bert_model.pth", map_location="cpu")

# Get hidden size dynamically (512 or 768)
hidden_dim = checkpoint["embedding.weight"].shape[1]

# Ensure num_heads is a valid divisor of hidden_dim
num_heads = max(1, hidden_dim // 64)  # Ensure divisibility

In [2]:
class CustomBERT(nn.Module):
    def __init__(self, vocab_size, hidden_dim=hidden_dim, num_heads=num_heads, num_layers=12):
        super(CustomBERT, self).__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_dim)
        self.encoder_layers = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads), num_layers
        )
        # Keep the original output layer dimensions from checkpoint
        self.output_layer = nn.Linear(hidden_dim, vocab_size)
        # Add a new projection layer for the classifier
        self.projection = nn.Linear(vocab_size, hidden_dim)

    def forward(self, input_ids):
        x = self.embedding(input_ids)
        x = self.encoder_layers(x)
        x = x.mean(dim=1)  # Pooling to get a single vector per input
        x = self.output_layer(x)  # First get the vocab_size dimensional output
        x = self.projection(x)    # Project back to hidden_dim
        return x

In [3]:
# Load trained Custom BERT model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocab_size = 30522  # Standard BERT vocab size
bert_model = CustomBERT(vocab_size, hidden_dim=hidden_dim, num_heads=num_heads).to(device)

# Load model weights (ignore extra/missing keys)
bert_model.load_state_dict(checkpoint, strict=False)

# Load dataset
dataset = load_dataset("snli")  # Use SNLI dataset
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

In [4]:
# Tokenize function
def tokenize_pairs(example):
    tokens1 = tokenizer(example["premise"], padding="max_length", truncation=True, max_length=128, return_tensors="pt")
    tokens2 = tokenizer(example["hypothesis"], padding="max_length", truncation=True, max_length=128, return_tensors="pt")

    # Convert label to integer (Handle SNLI 'test' set, which has no labels)
    label = example["label"]
    if isinstance(label, str) and not label.isdigit():
        label = -1  # Assign -1 for test set (since it has no labels)
    else:
        label = int(label)  # Convert to integer
    
    return {
        "input_ids1": tokens1["input_ids"].squeeze(0),
        "attention_mask1": tokens1["attention_mask"].squeeze(0),
        "input_ids2": tokens2["input_ids"].squeeze(0),
        "attention_mask2": tokens2["attention_mask"].squeeze(0),
        "label": label
    }

In [5]:
# Apply mapping function and ensure dataset is correctly formatted
dataset = dataset.map(tokenize_pairs, remove_columns=[col for col in ["premise", "hypothesis"] if col in dataset["train"].column_names])  # Remove only existing columns

# Convert dataset to list correctly
dataset_list = [dict(d) for d in dataset["train"]]

# Debugging output: Check first sample
print("First data sample:", dataset_list[0])
print(f"Total samples in dataset: {len(dataset_list)}")

First data sample: {'label': 1, 'input_ids1': [101, 1037, 2711, 2006, 1037, 3586, 14523, 2058, 1037, 3714, 2091, 13297, 1012, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask1': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'input_ids2': [101, 1037, 2711, 2003, 2731, 2010, 3586, 2005, 1037, 2971, 1012, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

In [6]:
# Ensure we have valid tensors before stacking
input_ids1_list = [torch.tensor(d["input_ids1"], dtype=torch.long) for d in dataset_list if "input_ids1" in d]
attention_mask1_list = [torch.tensor(d["attention_mask1"], dtype=torch.long) for d in dataset_list if "attention_mask1" in d]
input_ids2_list = [torch.tensor(d["input_ids2"], dtype=torch.long) for d in dataset_list if "input_ids2" in d]
attention_mask2_list = [torch.tensor(d["attention_mask2"], dtype=torch.long) for d in dataset_list if "attention_mask2" in d]
labels_list = [torch.tensor(d["label"], dtype=torch.long) for d in dataset_list if isinstance(d["label"], int)]

# Ensure lists are not empty
if not input_ids1_list or not input_ids2_list:
    raise RuntimeError("Tokenized data is empty. Verify tokenize_pairs() function.")

# Stack tensors
input_ids1 = torch.stack(input_ids1_list)
attention_mask1 = torch.stack(attention_mask1_list)
input_ids2 = torch.stack(input_ids2_list)
attention_mask2 = torch.stack(attention_mask2_list)
labels = torch.stack(labels_list)

# Ensure labels are within the valid range
valid_labels = (labels >= 0) & (labels < 3)
input_ids1 = input_ids1[valid_labels]
attention_mask1 = attention_mask1[valid_labels]
input_ids2 = input_ids2[valid_labels]
attention_mask2 = attention_mask2[valid_labels]
labels = labels[valid_labels]

In [7]:
dataset = TensorDataset(input_ids1, attention_mask1, input_ids2, attention_mask2, labels)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

# Define Classifier Head
classifier_head = nn.Linear(hidden_dim * 3, 3).to(device)

# Optimizers
optimizer = optim.Adam(bert_model.parameters(), lr=1e-5)
optimizer_classifier = optim.Adam(classifier_head.parameters(), lr=1e-5)

criterion = nn.CrossEntropyLoss()

# Training loop
for epoch in range(10):
    bert_model.train()
    classifier_head.train()
    
    for batch in dataloader:
        input_ids1, attention_mask1, input_ids2, attention_mask2, labels = [b.to(device) for b in batch]
        
        optimizer.zero_grad()
        optimizer_classifier.zero_grad()
        
        # Get embeddings from BERT
        output1 = bert_model(input_ids1)  # Shape: (batch_size, hidden_dim)
        output2 = bert_model(input_ids2)  # Shape: (batch_size, hidden_dim)
        
        # Combine embeddings
        combined = torch.cat([
            output1,
            output2,
            torch.abs(output1 - output2)
        ], dim=1)  # Shape: (batch_size, hidden_dim * 3)
        
        outputs = classifier_head(combined)
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        optimizer_classifier.step()
    
    print(f"Epoch {epoch+1} Loss: {loss.item()}")


Epoch 1 Loss: 0.8682435154914856
Epoch 2 Loss: 0.7968205213546753
Epoch 3 Loss: 0.6686006188392639
Epoch 4 Loss: 1.0666700601577759
Epoch 5 Loss: 0.92558354139328
Epoch 6 Loss: 0.4347539246082306
Epoch 7 Loss: 0.9607365727424622
Epoch 8 Loss: 1.2120252847671509
Epoch 9 Loss: 0.6323928833007812
Epoch 10 Loss: 0.17150042951107025


In [8]:
# Save model and classifier for Task 3
torch.save(bert_model.state_dict(), "sbert_model.pth")
torch.save(classifier_head.state_dict(), "classifier_head.pth")


âœ… **Task 3:** Evaluating the trained model and computing performance metrics.

In [3]:
import torch
import torch.nn as nn
from transformers import BertTokenizer
from datasets import load_dataset
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# Load trained SBERT model and classifier head
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

class CustomBERT(nn.Module):
    def __init__(self, vocab_size, hidden_dim, num_heads, num_layers=12):
        super(CustomBERT, self).__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_dim)
        self.encoder_layers = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads), num_layers
        )
        self.output_layer = nn.Linear(hidden_dim, vocab_size)
        self.projection = nn.Linear(vocab_size, hidden_dim)

    def forward(self, input_ids):
        x = self.embedding(input_ids)
        x = self.encoder_layers(x)
        x = x.mean(dim=1)
        x = self.output_layer(x)
        x = self.projection(x)
        return x

# Load trained model and classifier
checkpoint = torch.load("sbert_model.pth", map_location=device)
hidden_dim = checkpoint["embedding.weight"].shape[1]
num_heads = max(1, hidden_dim // 64)

bert_model = CustomBERT(30522, hidden_dim, num_heads).to(device)
bert_model.load_state_dict(checkpoint)
bert_model.eval()

classifier_head = nn.Linear(hidden_dim * 3, 3).to(device)
classifier_head.load_state_dict(torch.load("classifier_head.pth", map_location=device))
classifier_head.eval()

# Load dataset
dataset = load_dataset("snli", split="test")

def tokenize_pairs(example):
    tokens1 = tokenizer(example["premise"], padding="max_length", truncation=True, max_length=128, return_tensors="pt")
    tokens2 = tokenizer(example["hypothesis"], padding="max_length", truncation=True, max_length=128, return_tensors="pt")
    return {
        "input_ids1": tokens1["input_ids"].squeeze(0),
        "attention_mask1": tokens1["attention_mask"].squeeze(0),
        "input_ids2": tokens2["input_ids"].squeeze(0),
        "attention_mask2": tokens2["attention_mask"].squeeze(0),
        "label": example["label"] if isinstance(example["label"], int) else -1
    }

# Apply mapping function and filter valid labels
dataset = dataset.map(tokenize_pairs, remove_columns=["premise", "hypothesis"])
dataset_list = [dict(d) for d in dataset if d["label"] >= 0]

input_ids1 = torch.stack([torch.tensor(d["input_ids1"]) for d in dataset_list])
attention_mask1 = torch.stack([torch.tensor(d["attention_mask1"]) for d in dataset_list])
input_ids2 = torch.stack([torch.tensor(d["input_ids2"]) for d in dataset_list])
attention_mask2 = torch.stack([torch.tensor(d["attention_mask2"]) for d in dataset_list])
labels = torch.tensor([d["label"] for d in dataset_list])

dataset = TensorDataset(input_ids1, attention_mask1, input_ids2, attention_mask2, labels)
dataloader = DataLoader(dataset, batch_size=16, shuffle=False)

# Evaluation
all_preds, all_labels = [], []
with torch.no_grad():
    for batch in dataloader:
        input_ids1, attention_mask1, input_ids2, attention_mask2, labels = [b.to(device) for b in batch]
        output1 = bert_model(input_ids1)
        output2 = bert_model(input_ids2)
        combined = torch.cat([output1, output2, torch.abs(output1 - output2)], dim=1)
        outputs = classifier_head(combined)
        preds = torch.argmax(outputs, dim=1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(labels.cpu().numpy())

# Compute metrics
accuracy = accuracy_score(all_labels, all_preds)
precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average="weighted")

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")


Map:   0%|          | 0/10000 [00:00<?, ? examples/s]

Accuracy: 0.7544
Precision: 0.7561, Recall: 0.7544, F1-score: 0.7531
