# NLP Assginment No. 4
### Title: Binary Text Classification using Transformer
Input: A sentence or paragraph

Output: A label (e.g. Positive/Negative review)

In [None]:
!pip install torch torchvision torchaudio
!pip install transformers datasets

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from transformers import BertTokenizer
from datasets import load_dataset

In [None]:
# Load IMDb dataset
dataset = load_dataset("imdb")

# Split data into train/test
train_data = dataset["train"]
test_data = dataset["test"]

# Initialize tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Tokenize the data (we’ll use padding and truncation)
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=256)

# Tokenize the train and test sets
train_data = train_data.map(tokenize_function, batched=True)
test_data = test_data.map(tokenize_function, batched=True)

# Format the datasets into PyTorch tensors
train_data.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
test_data.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])

# Create DataLoader for batch processing
train_dataloader = torch.utils.data.DataLoader(train_data, batch_size=16, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=16)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/7.81k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

unsupervised-00000-of-00001.parquet:   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, max_len=5000): # Increased max_len to 5000
        super().__init__()
        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2) * (-math.log(100.0) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        #print("👉 Before Positional Encoding:", x[0][:5])
        # Pad or truncate positional encodings to match input sequence length
        # If input sequence length is longer than max_len, truncate positional encodings
        seq_len = x.size(1)
        pe = self.pe[:, :min(seq_len, self.pe.size(1)), :]

        # If input sequence length is shorter than max_len, pad positional encodings
        if seq_len > self.pe.size(1):
            pe = F.pad(pe, (0, 0, 0, seq_len - self.pe.size(1)))

        x = x + pe
       # print("✅ After Positional Encoding:", x[0][:5])
        return x

class ScaledDotProductAttention(nn.Module):
    def forward(self, query, key, value, mask=None):
        d_k = query.size(-1)
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = F.softmax(scores, dim=-1)
        return torch.matmul(attn, value)

class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0
        self.d_k = embed_dim // num_heads
        self.num_heads = num_heads

        self.qkv_proj = nn.Linear(embed_dim, embed_dim * 3)
        self.out_proj = nn.Linear(embed_dim, embed_dim)
        self.attention = ScaledDotProductAttention() # Instance of ScaledDotProductAttention

    def forward(self, x):
        B, T, E = x.shape
        qkv = self.qkv_proj(x).view(B, T, 3, self.num_heads, self.d_k)
        q, k, v = qkv.unbind(2)
        q, k, v = [x.transpose(1, 2) for x in (q, k, v)]
        # Call the attention instance with q, k, v, and optionally a mask
        attn_output = self.attention(q, k, v)
        # Assuming you want to print the output of the first head
        #print("🔍 Attention Output (head 0):", attn_output[0, 0, :5])
        attn = attn_output.transpose(1, 2).contiguous().view(B, T, E)
        return self.out_proj(attn)

class FeedForward(nn.Module):
    def __init__(self, embed_dim, ff_dim):
        super().__init__()
        self.fc1 = nn.Linear(embed_dim, ff_dim)  # Define fc1
        self.fc2 = nn.Linear(ff_dim, embed_dim)  # Define fc2
        self.dropout = nn.Dropout(0.1)          # Define dropout (adjust dropout rate as needed)
        self.net = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embed_dim)
        )

    def forward(self, x):
        #print("⚙️ Before FeedForward:", x[0][:5])
        x = self.fc2(self.dropout(F.relu(self.fc1(x)))) # Use defined layers
        #print("✅ After FeedForward:", x[0][:5])
        return x

class TransformerEncoderLayer(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim):
        super().__init__()
        self.attn = MultiHeadAttention(embed_dim, num_heads)
        self.ff = FeedForward(embed_dim, ff_dim)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x, mask=None):
        x = x + self.attn(self.norm1(x))
        x = x + self.ff(self.norm2(x))
       # print("🧠 === Transformer Encoder Layer ===")
        attn_output = self.self_attn(src, src, src, mask)
        src = self.norm1(src + self.dropout(attn_output))
        ff_output = self.ff(src)
        src = self.norm2(src + self.dropout(ff_output))
        return x

class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, ff_dim, num_layers, num_classes, max_len=100):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim)
        self.pos_encoder = PositionalEncoding(embed_dim, max_len)
        self.layers = nn.ModuleList([
            TransformerEncoderLayer(embed_dim, num_heads, ff_dim)
            for _ in range(num_layers)
        ])
        self.classifier = nn.Linear(embed_dim, num_classes)

    def forward(self, src, attention_mask=None): # Changed 'mask' to 'attention_mask'
        x = self.embed(src)
        #print("📦 Embeddings:", x[0][:5])
        x = self.pos_encoder(x)
        for i, layer in enumerate(self.layers):
            #print(f"🚀 Running Encoder Layer {i+1}")
            # Pass attention_mask to the layer
            x = layer(x, attention_mask)
        # Assuming you want to use self.classifier for the final output
        out = self.classifier(x.mean(dim=1))  # Apply mean pooling and then classifier
       # print("📤 Final Output (Logits):", out[0][:5], "\n")
        return out

In [None]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim):
        super().__init__()
        self.attn = MultiHeadAttention(embed_dim, num_heads)
        self.ff = FeedForward(embed_dim, ff_dim)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x, mask=None):
        x = x + self.attn(self.norm1(x)) # Apply self-attention and add the result to the input
        x = x + self.ff(self.norm2(x))    # Apply feed-forward network and add the result
        return x                           # Return the final output

In [None]:
def train_model(model, train_dataloader, test_dataloader, num_epochs=3):
    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-5)

    # Training loop
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        correct_predictions = 0
        total_samples = 0

        for batch in train_dataloader:
            # Get data
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            labels = batch['label']

            # Zero gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(input_ids, attention_mask=attention_mask)

            # Compute loss
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            # Backward pass
            loss.backward()
            optimizer.step()

            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)

        # Calculate average loss and accuracy for the epoch
        avg_loss = total_loss / len(train_dataloader)
        accuracy = correct_predictions / total_samples
        print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f} | Accuracy: {accuracy*100:.2f}%")

    # Evaluate on test set
    evaluate(model, test_dataloader)

def evaluate(model, test_dataloader):
    model.eval()
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():
        for batch in test_dataloader:
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            labels = batch['label']

            outputs = model(input_ids, attention_mask=attention_mask)
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)

    accuracy = correct_predictions / total_samples
    print(f"Test Accuracy: {accuracy*100:.2f}%")

In [None]:
# Set parameter
vocab_size = len(tokenizer)  # Size of vocabulary (from the tokenizer)
embed_dim = 2
num_heads = 2
ff_dim = 2
num_layers = 1
num_classes = 2  # Binary classification (positive/negative)
print(" Size of vocabulary: ",vocab_size,"\n")
# Initialize model
model = TransformerClassifier(vocab_size, embed_dim, num_heads, ff_dim, num_layers, num_classes)

# Train the model
train_model(model, train_dataloader, test_dataloader, num_epochs=3)

 Size of vocabulary:  30522 

Epoch 1/3 | Loss: 0.9121 | Accuracy: 50.00%
Epoch 2/3 | Loss: 0.8589 | Accuracy: 50.00%
Epoch 3/3 | Loss: 0.8164 | Accuracy: 50.00%
Test Accuracy: 50.00%


**Completed**