### 🧩 Code: Imports and Config (0.1)
Set up the required Python libraries and configure global settings (e.g., seed, device).

In [None]:
# ✅ Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score, precision_score, recall_score
from sklearn.preprocessing import LabelEncoder
import gensim.downloader as gensim_downloader
from gensim.models import KeyedVectors
from tqdm import tqdm
import re
from collections import Counter
from torch.nn.utils.rnn import pad_sequence


# ✅ Config
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

In [None]:
print(torch.cuda.is_available())  # True if your system is GPU-ready
# print(torch.cuda.get_device_name(0))  # Name of your GPU (if available)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

### 🧩 Code: Load and Inspect Dataset (0.2)
Load the compressed dataset and inspect its structure, size, and label distribution.

In [None]:
df = pd.read_csv("data/arxiv100.csv")

# Basic inspection
print(df.head())
print(df["label"].value_counts())
print(f"Dataset size: {df.shape}")

### 🧩 Code: Train-Dev Split (0.3)
Split the dataset into training and development sets using stratified sampling to preserve class balance.


In [None]:
# Stratified split on label
train_texts, dev_texts, train_labels, dev_labels = train_test_split(
    df["abstract"], df["label"], 
    test_size=0.2, 
    stratify=df["label"], 
    random_state=SEED
)

### 🧩 Code: Text Preprocessing (0.4)
Clean the abstract text (lowercasing, removing punctuation, etc.) and prepare it for feature extraction.


In [None]:
def clean_text(text):
    text = text.lower() # Converts to lower
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text) # Removes punctuation and special characters
    text = re.sub(r"\d+", "", text) # Removes numbers
    return text

train_texts = train_texts.apply(clean_text)
dev_texts = dev_texts.apply(clean_text)

### 🧩 Code: Load Pre-trained Word Embeddings (B.1)
Load pre-trained embeddings (e.g., GloVe, Word2Vec) and map vocabulary to embedding vectors.
https://nlp.stanford.edu/projects/glove/ for Glove download
https://fasttext.cc/docs/en/english-vectors.html for fasttext download wiki-news-300d-1M.vec

In [None]:
def load_glove_embeddings(path="data/glove.6B.300d.txt", embedding_dim=300):
    """
    Load GloVe embeddings from file
    """
    print(f"Loading GloVe embeddings from {path}...")
    embeddings = {}
    
    with open(path, 'r', encoding='utf-8') as f:
        for line in tqdm(f, desc="Loading embeddings"):
            values = line.strip().split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            embeddings[word] = vector
                
    print(f"Loaded {len(embeddings)} word embeddings of dimension {embedding_dim}")
    return embeddings

# Load GloVe embeddings
glove_embeddings = load_glove_embeddings("data/glove.6B.300d.txt", embedding_dim=300)

def load_word2vec_embeddings(embedding_dim=300):
    """
    Load Word2Vec embeddings using gensim
    """
    print(f"Loading Word2Vec embeddings (dimension: {embedding_dim})...")
    
    # Choose the appropriate model based on dimension
    if embedding_dim == 300:
        model_name = 'word2vec-google-news-300'
    else:
        raise ValueError(f"Word2Vec is only available in 300d. Got {embedding_dim}d")
    
    # Load pre-trained model
    try:
        wv_model = gensim_downloader.load(model_name)
        print(f"Loaded {len(wv_model.key_to_index)} word vectors")
        
        # Convert to dictionary for consistency with other embedding functions
        embeddings = {word: wv_model.get_vector(word) for word in tqdm(wv_model.key_to_index, desc="Processing vectors")}
        return embeddings
    except Exception as e:
        print(f"Error loading Word2Vec: {e}")
        print("Falling back to random embeddings")
        return {}

# Load Word2vec embeddings
word2vec_embeddings = load_word2vec_embeddings()


def load_fasttext_embeddings(path="data/wiki-news-300d-1M.vec", embedding_dim=300):
    """
    Load FastText embeddings from file
    """
    print(f"Loading FastText embeddings from {path}...")
    embeddings = {}
    
    try:
        # Check if file exists
        if not os.path.exists(path):
            print(f"FastText embeddings file not found at {path}")
            print("Please download from https://fasttext.cc/docs/en/crawl-vectors.html")
            print("Falling back to random embeddings")
            return embeddings
        
        # Load first line to get dimension info
        with open(path, 'r', encoding='utf-8') as f:
            first_line = f.readline().strip().split()
            vocab_size, dim = int(first_line[0]), int(first_line[1])
            
            if dim != embedding_dim:
                print(f"Warning: FastText file has {dim}d vectors, but {embedding_dim}d was requested")
            
            # Load the vectors (limit to 1M words for memory efficiency)
            count = 0
            max_words = 1000000
            
            for line in tqdm(f, desc="Loading embeddings", total=min(vocab_size, max_words)):
                if count >= max_words:
                    break
                    
                tokens = line.strip().split()
                word = tokens[0]
                vector = np.asarray(tokens[1:], dtype='float32')
                embeddings[word] = vector
                count += 1
                
        print(f"Loaded {len(embeddings)} word embeddings of dimension {dim}")
        return embeddings
    except Exception as e:
        print(f"Error loading FastText: {e}")
        print("Falling back to random embeddings")
        return {}
    
# Load fasttext embedding
fasttext_embedding = load_fasttext_embeddings()

### 🧩 Code: Text-to-Sequence Pipeline (B.2)
Convert preprocessed abstracts into padded sequences of word indices aligned with the embedding matrix.


In [None]:
class SimpleTokenizer:
    def __init__(self, texts=None, max_vocab=20000, min_freq=2):
        self.word2idx = {"<PAD>": 0, "<UNK>": 1}
        self.idx2word = {0: "<PAD>", 1: "<UNK>"}
        
        if texts:
            self.fit(texts, max_vocab, min_freq)
    
    def fit(self, texts, max_vocab=20000, min_freq=2):
        # Count word frequencies
        word_counts = Counter()
        for text in texts:
            words = text.split()
            word_counts.update(words)
        
        # Keep only words that appear at least min_freq times
        vocab = [word for word, count in word_counts.most_common(max_vocab) 
                if count >= min_freq]
        
        # Create word-to-index mapping
        for word in vocab:
            idx = len(self.word2idx)
            self.word2idx[word] = idx
            self.idx2word[idx] = word
            
        print(f"Vocabulary size: {len(self.word2idx)}")
        
    def texts_to_sequences(self, texts):
        """Convert texts to sequences of word indices"""
        sequences = []
        for text in texts:
            words = text.split()
            seq = [self.word2idx.get(word, 1) for word in words]  # 1 is <UNK>
            sequences.append(seq)
        return sequences
    
    def get_vocab_size(self):
        return len(self.word2idx)

def create_sequence_datasets(train_texts, dev_texts, train_labels, dev_labels, max_vocab=20000, max_len=128):
    """
    Convert texts to padded sequences of word indices
    """
    # Create and fit tokenizer
    tokenizer = SimpleTokenizer(train_texts, max_vocab=max_vocab)
    
    # Convert texts to sequences
    train_seqs = tokenizer.texts_to_sequences(train_texts)
    dev_seqs = tokenizer.texts_to_sequences(dev_texts)
    
    # Compute sequence length statistics
    train_lengths = [len(seq) for seq in train_seqs]
    avg_len = sum(train_lengths) / len(train_lengths)
    max_observed = max(train_lengths)
    
    # Print sequence length stats
    print(f"Average sequence length: {avg_len:.1f}")
    print(f"Maximum sequence length: {max_observed}")
    print(f"Using max_len = {max_len}")
    
    # Truncate sequences if they're too long
    train_seqs = [seq[:max_len] for seq in train_seqs]
    dev_seqs = [seq[:max_len] for seq in dev_seqs]
    
    # Convert to PyTorch tensors
    X_train = [torch.tensor(seq, dtype=torch.long) for seq in train_seqs]
    X_dev = [torch.tensor(seq, dtype=torch.long) for seq in dev_seqs]
    
    # Pad sequences
    X_train_pad = pad_sequence(X_train, batch_first=True, padding_value=0)
    X_dev_pad = pad_sequence(X_dev, batch_first=True, padding_value=0)
    
    y_train_tensor = torch.tensor(train_labels, dtype=torch.long)
    y_dev_tensor = torch.tensor(dev_labels, dtype=torch.long)
    
    print(f"Training tensor shape: {X_train_pad.shape}")
    print(f"Dev tensor shape: {X_dev_pad.shape}")
    
    return {
        "tokenizer": tokenizer,
        "X_train": X_train_pad,
        "y_train": y_train_tensor,
        "X_dev": X_dev_pad,
        "y_dev": y_dev_tensor,
        "max_len": max_len
    }



### 🧩 Code: RNN Model Definition (B.3)
Define the RNN model architecture using PyTorch (Simple RNN, LSTM, or GRU).


In [None]:
class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, 
                 embeddings_matrix=None, rnn_type="lstm", bidirectional=True, 
                 n_layers=1, dropout=0.5, freeze_embeddings=False):
        super().__init__()
        
        # Setup embedding layer
        self.freeze_embeddings = freeze_embeddings
        
        if embeddings_matrix is not None:
            # Use pre-trained embeddings
            self.embedding = nn.Embedding.from_pretrained(
                torch.FloatTensor(embeddings_matrix),
                padding_idx=0,
                freeze=freeze_embeddings  # Now controlled by parameter
            )
        else:
            # Initialize random embeddings
            self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        
        # Select RNN type
        if rnn_type == "lstm":
            self.rnn = nn.LSTM(embedding_dim, hidden_dim, 
                              num_layers=n_layers,
                              bidirectional=bidirectional, 
                              dropout=dropout if n_layers > 1 else 0,
                              batch_first=True)
        elif rnn_type == "gru":
            self.rnn = nn.GRU(embedding_dim, hidden_dim, 
                             num_layers=n_layers,
                             bidirectional=bidirectional, 
                             dropout=dropout if n_layers > 1 else 0,
                             batch_first=True)
        else:  # Simple RNN
            self.rnn = nn.RNN(embedding_dim, hidden_dim, 
                             num_layers=n_layers,
                             bidirectional=bidirectional, 
                             dropout=dropout if n_layers > 1 else 0,
                             batch_first=True)
            
        # Define output dimensions based on bidirectionality
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
        # Save for forward pass
        self.bidirectional = bidirectional
        self.rnn_type = rnn_type
        
    def forward(self, x):
        # x shape: [batch_size, seq_len]
        
        # Pass through embedding layer
        # Only apply dropout if embeddings are trainable
        if self.freeze_embeddings:
            embedded = self.embedding(x)
        else:
            embedded = self.dropout(self.embedding(x))
        # embedded shape: [batch_size, seq_len, embedding_dim]
        
        if self.rnn_type == "lstm":
            # Get all outputs and hidden/cell states
            output, (hidden, cell) = self.rnn(embedded)
        else:
            # Get all outputs and hidden state
            output, hidden = self.rnn(embedded)
        
        # Use the last hidden state from all layers
        if self.bidirectional:
            # Concatenate the final forward and backward hidden states
            hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        else:
            # Just take the final hidden state
            hidden = self.dropout(hidden[-1,:,:])
        
        # Pass through linear layer for classification
        return self.fc(hidden)

### 🧩 Code: Create Embedding Matrix (B.4)
This function will map your tokenizer vocabulary to the pre-trained embeddings:


In [None]:
# Modified create_embedding_matrix to handle sparse embeddings
def create_embedding_matrix(tokenizer, word_embeddings, embedding_dim):
    """
    Create an embedding matrix for the vocabulary in tokenizer
    using pre-trained word embeddings
    """
    vocab_size = tokenizer.get_vocab_size()
    embedding_matrix = np.zeros((vocab_size, embedding_dim))
    
    # Initialize with random vectors for better training
    # Uses a small range to avoid extreme initial values
    embedding_matrix = np.random.uniform(-0.1, 0.1, (vocab_size, embedding_dim))
    
    # Count words found in embeddings
    found = 0
    
    # Fill embedding matrix with pre-trained embeddings
    for word, idx in tokenizer.word2idx.items():
        if word in word_embeddings:
            embedding_matrix[idx] = word_embeddings[word]
            found += 1
    
    print(f"Found embeddings for {found}/{vocab_size-2} words ({found/(vocab_size-2)*100:.2f}%)")
    return embedding_matrix

# Convert Pandas Series to lists first
train_texts_list = train_texts.tolist()
dev_texts_list = dev_texts.tolist()

# Encode labels
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(train_labels)
y_dev_encoded = label_encoder.transform(dev_labels)

# Create sequence datasets - now passing lists instead of Series
seq_data = create_sequence_datasets(
    train_texts_list,  
    dev_texts_list,    
    y_train_encoded, 
    y_dev_encoded,
    max_vocab=20000,
    max_len=128
)

# Create embedding matrix
embedding_matrix = create_embedding_matrix(
    seq_data["tokenizer"], 
    glove_embeddings, 
    embedding_dim=100
)

In [None]:
# PyTorch DataLoaders for efficient batch processing:
batch_size = 64

train_dataset = TensorDataset(seq_data["X_train"], seq_data["y_train"])
dev_dataset = TensorDataset(seq_data["X_dev"], seq_data["y_dev"])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = DataLoader(dev_dataset, batch_size=batch_size)

### 🧩 Code: RNN Training Loop (B.5)
Evaluate the RNN using dev data and compute relevant classification metrics.


In [None]:
def train_rnn(model, train_loader, dev_loader, epochs=10, lr=0.001):
    """Train RNN model with early stopping based on dev set performance"""
    model = model.to(DEVICE)
    
    # Loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # For early stopping
    best_valid_loss = float('inf')
    best_valid_acc = 0
    patience = 3
    patience_counter = 0
    best_model_state = None
    
    print(f"Training for {epochs} epochs...")
    for epoch in range(epochs):
        # Training phase
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0
        
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            
            # Forward pass
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()
            
        # Calculate average loss and accuracy
        train_loss = train_loss / train_total
        train_acc = train_correct / train_total
        
        # Validation phase
        model.eval()
        valid_loss = 0
        valid_correct = 0
        valid_total = 0
        
        with torch.no_grad():
            for inputs, labels in dev_loader:
                inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                valid_loss += loss.item() * inputs.size(0)
                _, predicted = outputs.max(1)
                valid_total += labels.size(0)
                valid_correct += predicted.eq(labels).sum().item()
        
        valid_loss = valid_loss / valid_total
        valid_acc = valid_correct / valid_total
        
        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | "
              f"Valid Loss: {valid_loss:.4f} | Valid Acc: {valid_acc:.4f}")
        
        # Early stopping logic
        if valid_acc > best_valid_acc:
            best_valid_acc = valid_acc
            best_valid_loss = valid_loss
            best_model_state = model.state_dict().copy()
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping at epoch {epoch+1}")
                break
    
    # Load best model
    if best_model_state:
        model.load_state_dict(best_model_state)
    
    return model, best_valid_acc

### 🧩 Code: RNN Evaluation (B.6)


In [None]:
def evaluate_rnn(model, data_loader, label_encoder, title="Confusion Matrix"):
    """Evaluate RNN model performance"""
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs = inputs.to(DEVICE)
            outputs = model(inputs)
            preds = outputs.argmax(dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.numpy())
    
    # Convert to numpy arrays
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)
    
    # Calculate metrics
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average="macro")
    precision = precision_score(all_labels, all_preds, average="macro")
    recall = recall_score(all_labels, all_preds, average="macro")
    
    print(f"Accuracy: {acc:.4f}")
    print(f"F1-Score: {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    
    # Full classification report
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=label_encoder.classes_))
    
    # Confusion Matrix
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=label_encoder.classes_,
                yticklabels=label_encoder.classes_)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title(title)
    plt.tight_layout()
    plt.show()
    
    return {
        "accuracy": acc,
        "f1": f1,
        "precision": precision,
        "recall": recall
    }

In [None]:
# Define LSTM model
lstm_model = RNNClassifier(
    vocab_size=seq_data["tokenizer"].get_vocab_size(),
    embedding_dim=100,
    hidden_dim=256,
    output_dim=len(label_encoder.classes_),
    embeddings_matrix=embedding_matrix,
    rnn_type="lstm",
    bidirectional=True,
    n_layers=2,
    dropout=0.3
)

# Train LSTM
lstm_model, lstm_best_acc = train_rnn(
    lstm_model, 
    train_loader, 
    dev_loader, 
    epochs=15,
    lr=0.001
)

# Evaluate LSTM
lstm_metrics = evaluate_rnn(
    lstm_model, 
    dev_loader, 
    label_encoder,
    title="LSTM Performance"
)

In [None]:
# Define GRU model
gru_model = RNNClassifier(
    vocab_size=seq_data["tokenizer"].get_vocab_size(),
    embedding_dim=100,
    hidden_dim=256,
    output_dim=len(label_encoder.classes_),
    embeddings_matrix=embedding_matrix,
    rnn_type="gru",
    bidirectional=True,
    n_layers=1,
    dropout=0.3
)

# Train GRU
gru_model, gru_best_acc = train_rnn(
    gru_model, 
    train_loader, 
    dev_loader, 
    epochs=15,
    lr=0.001
)

# Evaluate GRU
gru_metrics = evaluate_rnn(
    gru_model, 
    dev_loader, 
    label_encoder,
    title="GRU Performance"
)

In [None]:
# Define Simple RNN model
simple_rnn_model = RNNClassifier(
    vocab_size=seq_data["tokenizer"].get_vocab_size(),
    embedding_dim=100,
    hidden_dim=256,
    output_dim=len(label_encoder.classes_),
    embeddings_matrix=embedding_matrix,
    rnn_type="rnn",
    bidirectional=False,
    n_layers=1,
    dropout=0.3
)

# Train Simple RNN
simple_rnn_model, simple_rnn_best_acc = train_rnn(
    simple_rnn_model, 
    train_loader, 
    dev_loader, 
    epochs=15,
    lr=0.001
)

# Evaluate Simple RNN
simple_rnn_metrics = evaluate_rnn(
    simple_rnn_model, 
    dev_loader, 
    label_encoder,
    title="Simple RNN Performance"
)

In [None]:
# Create comparison dataframe
rnn_results = pd.DataFrame({
    "Model": ["LSTM", "GRU", "Simple RNN"],
    "Accuracy": [lstm_metrics["accuracy"], gru_metrics["accuracy"], simple_rnn_metrics["accuracy"]],
    "F1-Score": [lstm_metrics["f1"], gru_metrics["f1"], simple_rnn_metrics["f1"]],
    "Precision": [lstm_metrics["precision"], gru_metrics["precision"], simple_rnn_metrics["precision"]],
    "Recall": [lstm_metrics["recall"], gru_metrics["recall"], simple_rnn_metrics["recall"]]
})

# Display results
print(rnn_results.sort_values(by="F1-Score", ascending=False))

# Save the best model
torch.save(lstm_model.state_dict(), "models/best_rnn_model.pt")
print("Best model saved.")