# Hybrid CNN-LSTM - ArXiv Classification

CNN 1D (abstracts) + Bidirectional LSTM (titles) + Attention mechanisms

In [None]:
!pip install -q torch scikit-learn pandas matplotlib seaborn

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, f1_score
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import re
import pickle

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

## Upload Dataset

Upload `arxiv_papers_raw.csv`

In [None]:
import os
if not os.path.exists('arxiv_papers_raw.csv'):
    print("Upload arxiv_papers_raw.csv")
else:
    print("Dataset found")

## Model Architecture

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.attention = nn.Linear(hidden_dim, 1)

    def forward(self, lstm_output, mask=None):
        scores = self.attention(lstm_output).squeeze(-1)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention_weights = F.softmax(scores, dim=1)
        context = torch.bmm(attention_weights.unsqueeze(1), lstm_output).squeeze(1)
        return context, attention_weights


class GlobalAttention(nn.Module):
    def __init__(self, feature_dim):
        super().__init__()
        self.attention = nn.Linear(feature_dim, 1)

    def forward(self, cnn_features):
        features_t = cnn_features.transpose(1, 2)
        scores = self.attention(features_t).squeeze(-1)
        attention_weights = F.softmax(scores, dim=1)
        context = torch.bmm(attention_weights.unsqueeze(1), features_t).squeeze(1)
        return context, attention_weights


class WeightedAttentionFusion(nn.Module):
    def __init__(self, title_dim, abstract_dim):
        super().__init__()
        self.title_weight = nn.Linear(title_dim, 1)
        self.abstract_weight = nn.Linear(abstract_dim, 1)

    def forward(self, title_repr, abstract_repr):
        w_title = self.title_weight(title_repr)
        w_abstract = self.abstract_weight(abstract_repr)
        weights = torch.cat([w_title, w_abstract], dim=1)
        fusion_weights = F.softmax(weights, dim=1)
        weighted_title = title_repr * fusion_weights[:, 0:1]
        weighted_abstract = abstract_repr * fusion_weights[:, 1:2]
        fused = torch.cat([weighted_title, weighted_abstract], dim=1)
        return fused, fusion_weights


class HybridCNNLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim=300, num_filters=256, kernel_sizes=[3,4,5],
                 lstm_hidden=256, num_classes=4, dropout=0.5):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.embed_dropout = nn.Dropout(dropout * 0.5)
        
        self.convs = nn.ModuleList([nn.Conv1d(embed_dim, num_filters, k, padding=k//2) for k in kernel_sizes])
        self.conv_bn = nn.ModuleList([nn.BatchNorm1d(num_filters) for _ in kernel_sizes])
        total_filters = num_filters * len(kernel_sizes)
        self.cnn_attention = GlobalAttention(total_filters)
        
        self.lstm = nn.LSTM(embed_dim, lstm_hidden, num_layers=2, batch_first=True, bidirectional=True, dropout=dropout if dropout > 0 else 0)
        self.lstm_attention = SelfAttention(lstm_hidden * 2)
        
        self.fusion = WeightedAttentionFusion(lstm_hidden * 2, total_filters)
        
        fused_dim = lstm_hidden * 2 + total_filters
        self.classifier = nn.Sequential(
            nn.BatchNorm1d(fused_dim),
            nn.Dropout(dropout),
            nn.Linear(fused_dim, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(dropout * 0.8),
            nn.Linear(256, num_classes)
        )
        self._init_weights()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

    def forward(self, title_ids, abstract_ids, title_mask=None):
        title_embed = self.embed_dropout(self.embedding(title_ids))
        lstm_out, _ = self.lstm(title_embed)
        title_repr, title_attn = self.lstm_attention(lstm_out, title_mask)
        
        abstract_embed = self.embed_dropout(self.embedding(abstract_ids)).transpose(1, 2)
        conv_outputs = []
        for conv, bn in zip(self.convs, self.conv_bn):
            x = F.relu(bn(conv(abstract_embed)))
            conv_outputs.append(x)
        cnn_features = torch.cat(conv_outputs, dim=1)
        abstract_repr, abstract_attn = self.cnn_attention(cnn_features)
        
        fused_repr, fusion_weights = self.fusion(title_repr, abstract_repr)
        logits = self.classifier(fused_repr)
        
        attention_maps = {
            'title_attention': title_attn,
            'abstract_attention': abstract_attn,
            'fusion_weights': fusion_weights
        }
        return logits, attention_maps

## Preprocessing

In [None]:
class Vocabulary:
    def __init__(self, max_vocab_size=50000, min_freq=2):
        self.max_vocab_size = max_vocab_size
        self.min_freq = min_freq
        self.word2idx = {'<PAD>': 0, '<UNK>': 1}
        self.idx2word = {0: '<PAD>', 1: '<UNK>'}
        self.word_counts = Counter()

    def build_vocab(self, texts):
        for text in texts:
            words = self.tokenize(text)
            self.word_counts.update(words)
        filtered_words = [word for word, count in self.word_counts.most_common() if count >= self.min_freq][:self.max_vocab_size - 2]
        for idx, word in enumerate(filtered_words, start=2):
            self.word2idx[word] = idx
            self.idx2word[idx] = word

    @staticmethod
    def tokenize(text):
        text = text.lower()
        text = re.sub(r'[^a-z0-9\s\-]', ' ', text)
        return [w.strip() for w in text.split() if w.strip()]

    def encode(self, text, max_len=None):
        words = self.tokenize(text)
        if max_len:
            words = words[:max_len]
        return [self.word2idx.get(word, 1) for word in words]

    def __len__(self):
        return len(self.word2idx)


class HybridDataset(Dataset):
    def __init__(self, titles, abstracts, labels, vocab, max_title_len=30, max_abstract_len=200):
        self.titles = titles
        self.abstracts = abstracts
        self.labels = labels
        self.vocab = vocab
        self.max_title_len = max_title_len
        self.max_abstract_len = max_abstract_len

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        title_ids = self.vocab.encode(self.titles[idx], self.max_title_len)
        title_len = len(title_ids)
        title_ids += [0] * (self.max_title_len - title_len)
        
        abstract_ids = self.vocab.encode(self.abstracts[idx], self.max_abstract_len)
        abstract_len = len(abstract_ids)
        abstract_ids += [0] * (self.max_abstract_len - abstract_len)
        
        title_mask = [1] * title_len + [0] * (self.max_title_len - title_len)
        
        return {
            'title_ids': torch.tensor(title_ids, dtype=torch.long),
            'abstract_ids': torch.tensor(abstract_ids, dtype=torch.long),
            'title_mask': torch.tensor(title_mask, dtype=torch.float),
            'label': torch.tensor(self.labels[idx], dtype=torch.long)
        }

## Data Preparation

In [None]:
df = pd.read_csv('arxiv_papers_raw.csv')
print(f"Samples: {len(df)}")
print(df['category'].value_counts())

le = LabelEncoder()
labels = le.fit_transform(df['category'])

vocab = Vocabulary(max_vocab_size=50000, min_freq=2)
all_texts = df['title'].tolist() + df['abstract'].tolist()
vocab.build_vocab(all_texts)
print(f"Vocab size: {len(vocab)}")

X_temp, X_test, y_temp, y_test = train_test_split(
    df[['title', 'abstract']].values, labels, test_size=0.15, random_state=42, stratify=labels
)
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.15/(1-0.15), random_state=42, stratify=y_temp
)

train_ds = HybridDataset(X_train[:,0], X_train[:,1], y_train, vocab)
val_ds = HybridDataset(X_val[:,0], X_val[:,1], y_val, vocab)
test_ds = HybridDataset(X_test[:,0], X_test[:,1], y_test, vocab)

print(f"Train: {len(train_ds)} | Val: {len(val_ds)} | Test: {len(test_ds)}")

## Training Setup

In [None]:
BATCH_SIZE = 64
EPOCHS = 20
LR = 0.001
DROPOUT = 0.5
CLASS_WEIGHTS = [2.0, 1.0, 1.0, 1.0]
PATIENCE = 5

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

model = HybridCNNLSTM(
    vocab_size=len(vocab),
    embed_dim=300,
    num_filters=256,
    kernel_sizes=[3,4,5],
    lstm_hidden=256,
    num_classes=4,
    dropout=DROPOUT
).to(device)

print(f"Params: {sum(p.numel() for p in model.parameters()):,}")

class_weights = torch.FloatTensor(CLASS_WEIGHTS).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=LR, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=3)

## Training

In [None]:
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    all_preds, all_labels = [], []
    for batch in tqdm(loader, desc='Train'):
        title_ids = batch['title_ids'].to(device)
        abstract_ids = batch['abstract_ids'].to(device)
        title_mask = batch['title_mask'].to(device)
        labels = batch['label'].to(device)
        
        optimizer.zero_grad()
        logits, _ = model(title_ids, abstract_ids, title_mask)
        loss = criterion(logits, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        
        total_loss += loss.item()
        all_preds.extend(torch.argmax(logits, dim=1).cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
    
    return total_loss / len(loader), accuracy_score(all_labels, all_preds)


def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds, all_labels = [], []
    with torch.no_grad():
        for batch in tqdm(loader, desc='Val'):
            title_ids = batch['title_ids'].to(device)
            abstract_ids = batch['abstract_ids'].to(device)
            title_mask = batch['title_mask'].to(device)
            labels = batch['label'].to(device)
            
            logits, _ = model(title_ids, abstract_ids, title_mask)
            loss = criterion(logits, labels)
            
            total_loss += loss.item()
            all_preds.extend(torch.argmax(logits, dim=1).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    return total_loss / len(loader), acc, f1, all_preds, all_labels

In [None]:
history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'val_f1': []}
best_val_acc = 0
best_model_state = None
patience_counter = 0

for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_f1, _, _ = evaluate(model, val_loader, criterion, device)
    
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    history['val_f1'].append(val_f1)
    
    print(f"Train: {train_acc:.4f} | Val: {val_acc:.4f} | F1: {val_f1:.4f} | Gap: {abs(train_acc-val_acc):.4f}")
    
    scheduler.step(val_acc)
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_model_state = model.state_dict().copy()
        patience_counter = 0
        print(f"Best: {val_acc:.4f}")
    else:
        patience_counter += 1
        if patience_counter >= PATIENCE:
            print("Early stop")
            break

model.load_state_dict(best_model_state)
print(f"\nBest val: {best_val_acc:.4f}")

## Test Evaluation

In [None]:
test_loss, test_acc, test_f1, test_preds, test_labels = evaluate(model, test_loader, criterion, device)

print(f"Test Acc: {test_acc:.4f} ({test_acc*100:.2f}%)")
print(f"Test F1: {test_f1:.4f}")
print(f"\n{classification_report(test_labels, test_preds, target_names=le.classes_, digits=4)}")

cs_ai_recall = classification_report(test_labels, test_preds, target_names=le.classes_, output_dict=True)['cs.AI']['recall']
print(f"\nAcc >=60%: {test_acc >= 0.60} | cs.AI >30%: {cs_ai_recall > 0.30}")

## Save Model

In [None]:
torch.save({
    'model_state_dict': model.state_dict(),
    'vocab_size': len(vocab),
    'class_names': le.classes_
}, 'best_hybrid_model.pth')

with open('vocab_hybrid.pkl', 'wb') as f:
    pickle.dump(vocab, f)

print("Saved: best_hybrid_model.pth, vocab_hybrid.pkl")

## Training History

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].plot(history['train_loss'], label='Train', marker='o')
axes[0].plot(history['val_loss'], label='Val', marker='o')
axes[0].set_title('Loss')
axes[0].legend()
axes[0].grid(True)

axes[1].plot(history['train_acc'], label='Train', marker='o')
axes[1].plot(history['val_acc'], label='Val', marker='o')
axes[1].set_title('Accuracy')
axes[1].legend()
axes[1].grid(True)

plt.tight_layout()
plt.savefig('history.png', dpi=150, bbox_inches='tight')
plt.show()

## Attention Visualization

In [None]:
model.eval()
sample = test_ds[0]
title_ids = sample['title_ids'].unsqueeze(0).to(device)
abstract_ids = sample['abstract_ids'].unsqueeze(0).to(device)
title_mask = sample['title_mask'].unsqueeze(0).to(device)

with torch.no_grad():
    logits, attn_maps = model(title_ids, abstract_ids, title_mask)
    pred = torch.argmax(logits, dim=1).item()

title_attn = attn_maps['title_attention'].cpu().numpy()[0]
abstract_attn = attn_maps['abstract_attention'].cpu().numpy()[0]
fusion_weights = attn_maps['fusion_weights'].cpu().numpy()[0]

print(f"Prediction: {le.classes_[pred]}")
print(f"True label: {le.classes_[sample['label']]}")
print(f"Fusion weights - Title: {fusion_weights[0]:.3f}, Abstract: {fusion_weights[1]:.3f}")

fig, axes = plt.subplots(2, 1, figsize=(12, 4))
axes[0].bar(range(len(title_attn[:20])), title_attn[:20])
axes[0].set_title('Title Attention (first 20 tokens)')
axes[1].bar(range(len(abstract_attn[:50])), abstract_attn[:50])
axes[1].set_title('Abstract Attention (first 50 tokens)')
plt.tight_layout()
plt.savefig('attention.png', dpi=150, bbox_inches='tight')
plt.show()

## Download Results

In [None]:
from google.colab import files
files.download('best_hybrid_model.pth')
files.download('vocab_hybrid.pkl')
files.download('history.png')
files.download('attention.png')