In [None]:
!pip install torchmetrics

Importing Libraries

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import seaborn as sns

import re

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from collections import Counter

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchmetrics.classification import MulticlassAccuracy

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_recall_fscore_support, roc_curve, auc
from sklearn.preprocessing import label_binarize

import time

Loading Dataset

In [None]:
train_df=pd.read_csv('train.csv', names=['label', 'title', 'description'])
test_df=pd.read_csv('test.csv', names=['label', 'title', 'description'])

In [None]:
train_df['text']=train_df['title']+" "+train_df['description']
test_df['text']=test_df['title']+" "+test_df['description']

In [None]:
train_df['label']-=1
test_df['label']-=1

Dataset Statistics

In [None]:
print(f'The total no: of training samples are: {len(train_df)}')

In [None]:
print(f'The total no: of testing samples are: {len(test_df)}')

In [None]:
print(f'Class Distribution:')
print(train_df['label'].value_counts().sort_index())

In [None]:
train_df['word_count']=train_df['text'].apply(lambda x: len(x.split()))
print('Statistics Pertaining to Word Count:')
print(train_df['word_count'].describe())

**Dataset Visualization**

In [None]:
plt.figure(figsize=(12, 6))
sns.countplot(data=train_df, x='label')
plt.title('Training Data Class Distribution')
plt.xlabel('Class')
plt.ylabel('Count')
plt.show()

In [None]:
plt.figure(figsize=(12, 6))
sns.histplot(train_df['word_count'], bins=50, kde=True)
plt.title('Word Count Distribution')
plt.xlabel('No: of Words')
plt.ylabel('Frequency')
plt.show()

In [None]:
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('stopwords')

In [None]:
all_words=' '.join(train_df['text'].tolist()).lower()
all_words=re.sub(r'[^a-zA-Z0-9\s]', '', all_words)
tokens=[word for word in word_tokenize(all_words) if word not in stopwords.words('english')]
frequent_words=Counter(tokens).most_common(20)
words, frequency=zip(*frequent_words)

In [None]:
plt.figure(figsize=(18, 14))
sns.barplot(x=list(words), y=list(frequency))
plt.title('Top 20 Most Frequent Words')
plt.ylabel('Frequency')
plt.xlabel('Word')
plt.show()

**Data Pre-Processing**

In [None]:
def preprocessing(text):
  text=re.sub(r'[^A-Za-z0-9\s]', '', text)
  text=text.lower()
  tokens=word_tokenize(text)
  tokens=[word for word in tokens if word not in stopwords.words('english')]
  return tokens

In [None]:
train_df['tokens']=train_df['text'].apply(preprocessing)
test_df['tokens']=test_df['text'].apply(preprocessing)

In [None]:
counter=Counter()
for tokens in train_df['tokens']:
  counter.update(tokens)
vocab={word: idx+1 for idx, (word, _)in enumerate(counter.most_common())}
vocab_size=len(vocab)+1

In [None]:
def convert_to_numerical(tokens, vocab):
  return [vocab[word] for word in tokens if word in vocab]

In [None]:
train_df['token_ids']=train_df['tokens'].apply(lambda x: convert_to_numerical(x, vocab))
test_df['token_ids']=test_df['tokens'].apply(lambda x: convert_to_numerical(x, vocab))

Creating a PyTorch Dataset

In [None]:
class AGNewsDataset(Dataset):
  def __init__(self, texts, labels, max_length=256):
    self.texts=[t[:max_length]+[0]*(max_length-len(t)) for t in texts]
    self.labels=labels

  def __len__(self):
    return len(self.texts)

  def __getitem__(self, idx):
    return torch.tensor(self.texts[idx]), torch.tensor(self.labels[idx])

In [None]:
train_texts, val_texts, train_labels, val_labels=train_test_split(train_df['token_ids'].tolist(), train_df['label'].tolist())

train_dataset=AGNewsDataset(train_texts, train_labels)
val_dataset=AGNewsDataset(val_texts, val_labels)
test_dataset=AGNewsDataset(test_df['token_ids'].tolist(), test_df['label'].tolist())

train_loader=DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader=DataLoader(val_dataset, batch_size=32)
test_loader=DataLoader(test_dataset, batch_size=32)

**Defining Transformer Model**

In [None]:
class Encoding(nn.Module):
  def __init__(self, d_model, max_len=256):
    super().__init__()
    pe=torch.zeros(max_len, d_model)
    position=torch.arange(0, max_len).unsqueeze(1)
    div_term=torch.exp(torch.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
    pe[:, 0::2]=torch.sin(position * div_term)
    pe[:, 1::2]=torch.cos(position * div_term)
    self.pe=pe.unsqueeze(0)

  def forward(self, x):
    return x+self.pe[:, :x.size(1)].to(x.device)

class Transformer(nn.Module):
  def __init__(self, vocab_size, embed_dim=128, num_heads=4, num_layers=2, num_classes=4, max_len=256):
    super().__init__()
    self.embedding=nn.Embedding(vocab_size, embed_dim, padding_idx=0)
    self.pos_encoder=Encoding(embed_dim, max_len)
    encoder_layers=nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=512)
    self.transformer_encoder=nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
    self.classifier=nn.Linear(embed_dim, num_classes)

  def forward(self, src):
    x=self.embedding(src)
    x=self.pos_encoder(x)
    x=x.permute(1, 0, 2)
    x=self.transformer_encoder(x)
    x=x.mean(dim=0)
    return self.classifier(x)

The transformer architecture I've defined is an encoder-based setup. Since, the task at hand is a classification one, I don't make use of a decoder.

Firstly, I start with embeddings, to which I add positional encoding that helps the model keep track of the order of words. The positional encoding I've implemented is in line with what has been outlined in the original paper (sine/cosine based).

Next, at its core I implemented a stack of 2 transformer encoder layers (TransformerEncoderLayer instances), each with multi-head self-attention and a feedforward block. Additionally, layer normalization allows for stabilization of training. The attention mechanism enables the capturing on long range dependencies.

Finally, I take the mean of the ouputs (average pooling) to get a fixed-size vector which is then passed through a fully connected layer.

In [None]:
model=Transformer(vocab_size=vocab_size)
device=torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

Defining the Training Loop

In [None]:
loss_fn=torch.nn.CrossEntropyLoss()
optimizer=torch.optim.Adam(model.parameters(), lr=1e-4)

In [None]:
def train(epochs=5):
  epochs=epochs
  epoch_loss=0.0
  train_accuracies, val_accuracies=[], []
  train_losses, val_losses=[], []

  for epoch in range(epochs):
    model.train() #Set model to training mode
    correct=0
    total=0
    running_loss=0.0

    for texts, labels in train_loader:
      texts, labels=texts.to(device), labels.to(device)
      optimizer.zero_grad()

      outputs=model(texts)
      loss=loss_fn(outputs, labels)

      loss.backward()
      optimizer.step()

      running_loss+=loss.item()

      total+=labels.size(0)
      correct+=(outputs.argmax(1)==labels).sum().item()

    epoch_loss=running_loss/len(train_loader)
    train_losses.append(epoch_loss)
    accuracy=100*(correct/total)
    train_accuracies.append(accuracy)
    print(f'Epoch {epoch+1}/{epochs} - loss: {epoch_loss:.2f}')

    model.eval() #Set model to evaluation mode
    correct=0
    total=0
    val_loss=0.0
    with torch.no_grad():
      for texts, labels in val_loader:
        texts, labels=texts.to(device), labels.to(device)
        outputs=model(texts)
        loss=loss_fn(outputs, labels)
        total+=labels.size(0)
        correct+=(outputs.argmax(1)==labels).sum().item()

        val_loss+=loss.item()

    accuracy=100*(correct/total)
    val_accuracies.append(accuracy)
    val_losses.append(val_loss/len(val_loader))
    print(f'The accuracy of the model is: {accuracy:.2f}%')

  return train_accuracies, val_accuracies, train_losses, val_losses

In [None]:
accuracy_metric=MulticlassAccuracy(num_classes=4).to(device)

def evaluate():
  model.eval()
  all_preds=[]
  all_labels=[]
  all_probs=[]
  total_loss=0.0
  total_samples=0
  with torch.no_grad():
    for inputs, labels in test_loader:
      inputs, labels=inputs.to(device), labels.to(device)
      outputs=model(inputs)

      loss=loss_fn(outputs, labels)
      total_loss+=loss.item()*inputs.size(0)
      total_samples+=inputs.size(0)

      probs=F.softmax(outputs, dim=1)
      preds=torch.argmax(probs, dim=1)

      all_preds.extend(preds.cpu().numpy())
      all_probs.extend(probs.cpu().numpy())
      all_labels.extend(labels.cpu().numpy())

  avg_loss=total_loss/total_samples
  print(f'The loss obtained on the model is: {avg_loss:.2f}')

  all_preds_tensor=torch.tensor(all_preds, dtype=torch.int64).to(device)
  all_labels_tensor=torch.tensor(all_labels, dtype=torch.int64).to(device)

  computed_accuracy=accuracy_metric(all_preds_tensor, all_labels_tensor)
  print(f'The accuracy obtained on the model is: {computed_accuracy.item()*100:.2f}%')

  precision, recall, fscore, _=precision_recall_fscore_support(all_labels, all_preds, average='weighted')
  print(f'For the model - precision:{precision:.2f}, recall: {recall:.2f}, F-score: {fscore:.2f}')

  return computed_accuracy.cpu().numpy(), avg_loss, all_labels, all_probs

In [None]:
start_time=time.time()
train_accuracies, val_accuracies, train_losses, val_losses=train()
base_model_time=time.time()-start_time

In [None]:
test_accuracy, test_loss, all_labels, all_probs=evaluate()

**Model Optimization**

Dropout

In [None]:
class Transformer(nn.Module):
  def __init__(self, vocab_size, embed_dim=128, num_heads=4, num_layers=2, num_classes=4, max_len=256):
    super().__init__()
    self.embedding=nn.Embedding(vocab_size, embed_dim, padding_idx=0)
    self.pos_encoder=Encoding(embed_dim, max_len)
    encoder_layers=nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=512, dropout=0.1)
    self.transformer_encoder=nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
    self.classifier=nn.Linear(embed_dim, num_classes)

  def forward(self, src):
    x=self.embedding(src)
    x=self.pos_encoder(x)
    x=x.permute(1, 0, 2)  # Required shape for Transformer [seq_len, batch_size, embed_dim]
    x=self.transformer_encoder(x)
    x=x.mean(dim=0)  # Average pooling over the sequence length
    return self.classifier(x)

In [None]:
model=Transformer(vocab_size=vocab_size)
model.to(device)

In [None]:
loss_fn=torch.nn.CrossEntropyLoss()
optimizer=torch.optim.Adam(model.parameters(), lr=1e-4)

In [None]:
start_time=time.time()
train_accuracies_dropout, val_accuracies_dropout, train_losses_dropout, val_losses_dropout=train()
dropout_model_time=time.time()-start_time

In [None]:
test_accuracy_dropout, test_loss_dropout, all_labels, all_probs=evaluate()

L2 Regularization

In [None]:
model=Transformer(vocab_size=vocab_size)
model.to(device)

In [None]:
loss_fn=torch.nn.CrossEntropyLoss()
optimizer=torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

In [None]:
def train(epochs=5):
  epochs=epochs
  epoch_loss=0.0
  current_best_accuracy=0.0
  train_accuracies, val_accuracies=[], []
  train_losses, val_losses=[], []

  for epoch in range(epochs):
    model.train() #Set model to training mode
    correct=0
    total=0
    running_loss=0.0

    for texts, labels in train_loader:
      texts, labels=texts.to(device), labels.to(device)
      optimizer.zero_grad()

      outputs=model(texts)
      loss=loss_fn(outputs, labels)

      loss.backward()
      optimizer.step()

      running_loss+=loss.item()

      total+=labels.size(0)
      correct+=(outputs.argmax(1)==labels).sum().item()

    epoch_loss=running_loss/len(train_loader)
    train_losses.append(epoch_loss)
    accuracy=100*(correct/total)
    train_accuracies.append(accuracy)
    print(f'Epoch {epoch+1}/{epochs} - loss: {epoch_loss:.2f}')

    model.eval() #Set model to evaluation mode
    correct=0
    total=0
    val_loss=0.0
    with torch.no_grad():
      for texts, labels in val_loader:
        texts, labels=texts.to(device), labels.to(device)
        outputs=model(texts)
        loss=loss_fn(outputs, labels)
        total+=labels.size(0)
        correct+=(outputs.argmax(1)==labels).sum().item()

        val_loss+=loss.item()

    accuracy=100*(correct/total)
    val_accuracies.append(accuracy)
    val_losses.append(val_loss/len(val_loader))
    print(f'The accuracy of the model is: {accuracy:.2f}%')

    if accuracy>current_best_accuracy:
      current_best_accuracy=accuracy
      torch.save(model.state_dict(), 'a2_part_3_dshah22_ramasair.pth')

  return train_accuracies, val_accuracies, train_losses, val_losses

In [None]:
start_time=time.time()
train_accuracies_l2, val_accuracies_l2, train_losses_l2, val_losses_l2=train()
l2_model_time=time.time()-start_time

In [None]:
test_accuracy_l2, test_loss_l2, all_labels, all_probs=evaluate()

Learning Rate Scheduler

In [None]:
model=Transformer(vocab_size=vocab_size)
model.to(device)

In [None]:
loss_fn=torch.nn.CrossEntropyLoss()
optimizer=torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

In [None]:
scheduler=torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.5)

In [None]:
def train(epochs=5):
  epochs=epochs
  epoch_loss=0.0
  train_accuracies, val_accuracies=[], []
  train_losses, val_losses=[], []

  for epoch in range(epochs):
    model.train() #Set model to training mode
    correct=0
    total=0
    running_loss=0.0

    for texts, labels in train_loader:
      texts, labels=texts.to(device), labels.to(device)
      optimizer.zero_grad()

      outputs=model(texts)
      loss=loss_fn(outputs, labels)

      loss.backward()
      optimizer.step()

      running_loss+=loss.item()

      total+=labels.size(0)
      correct+=(outputs.argmax(1)==labels).sum().item()

    epoch_loss=running_loss/len(train_loader)
    train_losses.append(epoch_loss)
    accuracy=100*(correct/total)
    train_accuracies.append(accuracy)
    print(f'Epoch {epoch+1}/{epochs} - loss: {epoch_loss:.2f}')
    scheduler.step()

    model.eval() #Set model to evaluation mode
    correct=0
    total=0
    val_loss=0.0
    with torch.no_grad():
      for texts, labels in val_loader:
        texts, labels=texts.to(device), labels.to(device)
        outputs=model(texts)
        loss=loss_fn(outputs, labels)
        total+=labels.size(0)
        correct+=(outputs.argmax(1)==labels).sum().item()

        val_loss+=loss.item()

    accuracy=100*(correct/total)
    val_accuracies.append(accuracy)
    val_losses.append(val_loss/len(val_loader))
    print(f'The accuracy of the model is: {accuracy:.2f}%')

  return train_accuracies, val_accuracies, train_losses, val_losses

In [None]:
start_time=time.time()
train_accuracies_lr, val_accuracies_lr, train_losses_lr, val_losses_lr=train()
lr_model_time=time.time()-start_time

In [None]:
test_accuracy_lr, test_loss_lr, all_labels, all_probs=evaluate()

In [None]:
train_accuracies_best, val_accuracies_best, train_losses_best, val_losses_best=train_accuracies_lr, val_accuracies_lr, train_losses_lr, val_losses_lr

In [None]:
test_accuracy_best=test_accuracy_lr
test_loss_best=test_loss_lr

To improve the performance of the transformer model, I explored employing dropout, l2 regularization, and learning rate scheduler. Implementing dropout within the core transformer architecture, we notice no significant improvement in the performance. Next, I employed a combination of dropout and l2 regularization which resulted in a slight improvement. Lastly, a combination of dropout, l2 regularization, and learning rate scheduler results in a similar performance as the combination of dropout and l2 regularization.

**Results Visualization**

In [None]:
print(test_accuracy_best)

In [None]:
fig, axs=plt.subplots(2, 2, figsize=(18, 14))

epochs=5

all_labels_encoded=label_binarize(all_labels, classes=[0, 1, 2, 3])
all_probs=np.array(all_probs)

for i in range(4):
  fpr, tpr, _=roc_curve(all_labels_encoded[:, i], all_probs[:, i])
  roc_auc=auc(fpr, tpr)

axs[0, 0].plot(train_accuracies_best, label='Best Model Training Accuracy')
axs[0, 0].plot(train_accuracies, label='Base Model Training Accuracy')
axs[0, 0].plot(train_losses_best, label='Best Model Training Loss')
axs[0, 0].plot(train_losses, label='Base Model Training Loss')
axs[0, 0].plot(val_accuracies_best, label='Best Model Validation Accuracy')
axs[0, 0].plot(val_accuracies, label='Base Model Validation Accuracy')
axs[0, 0].plot(val_losses_best, label='Best Model Validation Loss')
axs[0, 0].plot(val_losses, label='Base Model Validation Loss')
axs[0, 0].plot(test_accuracy_best, label='Best Model Testing Accuracy')
axs[0, 0].plot(test_accuracy, label='Base Model Testing Accuracy')
axs[0, 0].plot(test_loss_best, label='Best Model Testing Loss')
axs[0, 0].plot(test_loss, label='Base Model Testing Loss')
axs[0, 0].set_title('Comparison of Training, Validation, and Testing Accuracies and Losses (Best vs. Base Model)')
axs[0, 0].set_ylabel('Accuracy')
axs[0, 0].set_xticks([])
axs[0, 0].legend()

axs[0, 1].plot(range(1, epochs + 1), train_accuracies_best, label='Tuned Model Training Accuracy')
axs[0, 1].plot(range(1, epochs + 1), val_accuracies_best, label='Tuned Model Validation Accuracy')
axs[0, 1].set_title('Tuned Model Training and Validation Accuracy Over Time (Epochs)')
axs[0, 1].set_xlabel('Epochs')
axs[0, 1].set_ylabel('Accuracy')
axs[0, 1].legend()

axs[1, 0].plot(range(1, epochs + 1), train_losses_best, label='Tuned Model Training Loss')
axs[1, 0].plot(range(1, epochs + 1), val_losses_best, label='Tuned Model Validation Loss')
axs[1, 0].set_title('Tuned Improved Training and Validation Loss Over Time (Epochs)')
axs[1, 0].set_xlabel('Epochs')
axs[1, 0].set_ylabel('Accuracy')
axs[1, 0].legend()

for i in range(4):
  axs[1, 1].plot(fpr, tpr, label=f'Class {i} ROC Curve (AUC={roc_auc:.2f})')

axs[1, 1].plot([0, 1], [0, 1], 'k--')
axs[1, 1].set_title('ROC Curve')
axs[1, 1].set_xlabel('False Positive Rate')
axs[1, 1].set_ylabel('True Positive Rate')
axs[1, 1].legend()

Visualizing the results, we notice that the base model and the best model (tuned with a combination of dropout and l2 regularization) vary in terms of the obtained accuracy and loss by a slight margin, with the best model performing better over the base. The best model converges on the training and testing data over epochs, resulting in good validation accuracy. A high validation accuracy and a low validation loss suggest that the model is able to generalize well on unseen data, effectively capturing meaningful patterns in the text and classify properly.

Additionally, the combination of dropout and l2 regularization helped prevent overfitting, smoothing out the convergence of the model on the data. The metrics obtained indicate that the defined transformer architecture is robust and well suited for the classification task at hand. Beyond the visualization of results, the calculated metrics (precision, recall, F1-score) helps us understand the model performance better.

Finally, the model's (transformer's) architecture being lightweight with 2 encoder layers and an embedding size of 128 keeps the model computationally feasible.

**References**



- Official 'Attention Is All You Need' paper: [Link to the paper](https://arxiv.org/abs/1706.03762)
- Official NLTK documentation: [Link to official NLTK documentation](https://www.nltk.org/)