In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import re
from collections import Counter
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt
from torch.nn import TransformerEncoder, TransformerEncoderLayer
import matplotlib.pyplot as plt


In [2]:
seed = 43
torch.manual_seed(seed)
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")

In [None]:
batch_size = 64

In [4]:
def simple_tokenizer(text):
    return re.findall(r'\b\w+\b', str(text).lower())

In [5]:
train_df = pd.read_csv("Datasets/TrainData.csv")
test_df = pd.read_csv("Datasets/TestLabels.csv")

train_df.dropna(subset=['Text', 'Category'], inplace=True)
test_df.dropna(subset=['Text', 'Label - (business, tech, politics, sport, entertainment)'], inplace=True)

print(f"Train samples: {len(train_df)}, Test samples: {len(test_df)}")

train_texts = train_df['Text'].tolist()
train_labels = train_df['Category'].tolist()
test_texts = test_df['Text'].tolist()
test_labels = test_df['Label - (business, tech, politics, sport, entertainment)'].tolist()

Train samples: 1490, Test samples: 735


In [6]:
train_tokenized = [simple_tokenizer(t) for t in train_texts]
test_tokenized = [simple_tokenizer(t) for t in test_texts]

In [7]:
lengths = [len(inner_array) for inner_array in train_tokenized]
max_len = int(np.percentile(lengths, 90))

In [8]:
class NewsDataset(Dataset):
    def __init__(self, tokenized_text, labels, vocab=None, label2idx=None, max_len=300):
        self.texts = tokenized_text
        self.max_len = max_len
        if vocab is None:
            words = [word for text in self.texts for word in text]
            word_freq = Counter(words)
            self.vocab = {'<PAD>': 0, '<UNK>': 1}
            for word in word_freq:
                self.vocab[word] = len(self.vocab)
        else:
            self.vocab = vocab

        self.texts = [self.encode(text) for text in self.texts]

        if label2idx is None:
            unique_labels = sorted(set(label for label in labels if pd.notna(label)))
            self.label2idx = {label: i for i, label in enumerate(unique_labels)}
        else:
            self.label2idx = label2idx

        self.labels = [self.label2idx[label] for label in labels if pd.notna(label)]

    def encode(self, tokens):
        encoded = [self.vocab.get(tok, self.vocab['<UNK>']) for tok in tokens]
        return encoded[:self.max_len] + [0]*(self.max_len - len(encoded))

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return torch.tensor(self.texts[idx]), torch.tensor(self.labels[idx])

In [9]:
train_data = NewsDataset(train_tokenized, train_labels)
test_data = NewsDataset(test_tokenized, test_labels, vocab=train_data.vocab, label2idx=train_data.label2idx)

In [10]:
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size)

In [11]:
class TextTransformer(nn.Module):
    def __init__(self, vocab_size, embed_dim=256, num_classes=5,
                 num_layers=4, num_heads=8, max_len=300, pos_embed=True):
        super().__init__()
        self.embed_dim = embed_dim
        self.pos_embed_enabled = pos_embed
        
        # Embedding layers
        self.word_embedding = nn.Embedding(vocab_size, embed_dim)
        if pos_embed:
            self.position_embedding = nn.Embedding(max_len, embed_dim)
        
        # Transformer encoder layer
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dim_feedforward=512,
            dropout=0.1,
            activation="gelu"
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # Classification layer
        self.classifier = nn.Linear(embed_dim, num_classes)
    
    def forward(self, x):
        positions = torch.arange(0, x.size(1), dtype=torch.long).unsqueeze(0).to(device)
        
        x_embed = self.word_embedding(x)
        if self.pos_embed_enabled:
            x_embed += self.position_embedding(positions)
        
        x_transformed = self.transformer_encoder(x_embed)
        x_pooled = x_transformed.mean(dim=1)  # Mean pooling across sequence length
        
        return self.classifier(x_pooled)



In [12]:
def evaluate_model(model, loader, criterion):
    model.eval()
    correct = 0
    total = 0
    total_loss = 0
    all_preds ,all_labels = [] ,[]
    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            all_preds.extend(predicted.cpu().tolist())
            all_labels.extend(y_batch.cpu().tolist())

            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()

    avg_loss = total_loss / len(test_loader)
    accuracy = 100 * correct / total
    f1 = f1_score(all_labels,all_preds,average='micro')
    return avg_loss, accuracy,f1

In [13]:
def train_model(model, loader, criterion, optimizer):
    model.train()
    total_loss = 0
    correct_train = 0
    total_train = 0
    for texts, labels in loader:
        texts, labels = texts.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()
    avg_train_loss = total_loss / len(train_loader)
    train_accuracy = 100 * correct_train / total_train
    return avg_train_loss , train_accuracy

In [14]:
EPOCHS = 50
lr = 0.0003
weight_decay = 1e-3

In [None]:
model_params = {
    "vocab_size": len(train_data.vocab),
    "embed_dim": 512,
    "num_classes": len(train_data.label2idx),
    "num_layers": 2,
    "num_heads": 8,
    "max_len": max_len,
    "pos_embed": True
}

model = TextTransformer(**model_params).to(device)




In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.0002)



In [None]:
patience = 15 
best_val_loss = float('inf')
counter = 0  
for epoch in range(EPOCHS):
    avg_train_loss , train_accuracy = train_model(model, train_loader, criterion, optimizer)
    avg_val_loss, val_accuracy ,f1 = evaluate_model(model, test_loader, criterion)
    print(f"Epoch {epoch+1}/{EPOCHS} | "
              f"Train Loss: {avg_train_loss:.4f} | Train Acc: {train_accuracy:.2f}% | "
              f"Val Loss: {avg_val_loss:.4f} | Val Acc: {val_accuracy:.2f}% | f1: {f1:.2f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        counter = 0 
        best_model = model.state_dict()  
    else:
        counter += 1

    if counter >= patience:
        print(f"Early stopping at epoch {epoch}. Best val loss: {best_val_loss:.4f}")
        break

Epoch 1/50 | Train Loss: 1.5849 | Train Acc: 23.29% | Val Loss: 1.5235 | Val Acc: 28.30% | f1: 0.28
Epoch 2/50 | Train Loss: 1.5151 | Train Acc: 29.40% | Val Loss: 1.4880 | Val Acc: 36.87% | f1: 0.37
Epoch 3/50 | Train Loss: 1.3785 | Train Acc: 40.67% | Val Loss: 1.3682 | Val Acc: 36.87% | f1: 0.37
Epoch 4/50 | Train Loss: 1.2530 | Train Acc: 46.38% | Val Loss: 1.2676 | Val Acc: 42.31% | f1: 0.42
Epoch 5/50 | Train Loss: 1.1642 | Train Acc: 52.08% | Val Loss: 1.2061 | Val Acc: 51.43% | f1: 0.51
Epoch 6/50 | Train Loss: 1.0562 | Train Acc: 57.58% | Val Loss: 1.0619 | Val Acc: 56.73% | f1: 0.57
Epoch 7/50 | Train Loss: 0.9416 | Train Acc: 64.70% | Val Loss: 0.9561 | Val Acc: 67.07% | f1: 0.67
Epoch 8/50 | Train Loss: 0.8567 | Train Acc: 69.33% | Val Loss: 0.8898 | Val Acc: 70.34% | f1: 0.70
Epoch 9/50 | Train Loss: 0.7646 | Train Acc: 75.97% | Val Loss: 0.8264 | Val Acc: 77.28% | f1: 0.77
Epoch 10/50 | Train Loss: 0.6465 | Train Acc: 83.62% | Val Loss: 0.6977 | Val Acc: 82.45% | f1: 0.82

In [None]:
model.load_state_dict(best_model)

<All keys matched successfully>

In [None]:
# Final evaluation and confusion matrix
avg_val_loss, val_accuracy, f1 = evaluate_model(model, test_loader, criterion)
print(f"\nFinal Micro F1 Score = {f1:.4f}")
label_names = [label for label, _ in sorted(train_data.label2idx.items(), key=lambda x: x[1])]



Final Micro F1 Score = 0.9429
