In [35]:
from transformers import AutoTokenizer, AutoModel
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import time

In [36]:
checkpoint_folder = './checkpoints'

In [37]:
language_model = "FacebookAI/roberta-base"
tokenizer = AutoTokenizer.from_pretrained(language_model)
device = "cpu"
if torch.cuda.is_available():
    device = "cuda"
device

'cuda'

In [38]:
data = pd.read_csv('data/data.tsv', sep='\t')
data.head()
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10832 entries, 0 to 10831
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   url        10832 non-null  object
 1   topic      10832 non-null  object
 2   date       10832 non-null  object
 3   title      10832 non-null  object
 4   site       10832 non-null  object
 5   bias       10832 non-null  object
 6   page_text  10832 non-null  object
dtypes: object(7)
memory usage: 592.5+ KB


In [None]:
import re
import string
from nltk.corpus import stopwords
import nltk

# Download stopwords
nltk.download('stopwords', quiet=True)
# Use a lighter stopword set (remove stopwords but keep potentially meaningful ones)
all_stopwords = set(stopwords.words('english'))
# Keep words that might indicate bias
keep_words = {'government', 'business', 'country', 'people', 'president', 'trump', 'biden', 'democrat', 'republican', 'left', 'right', 'liberal', 'conservative'}
stop_words = all_stopwords - keep_words

# Load data
df = pd.read_csv('data/data.tsv', usecols=['page_text', 'bias'] , sep='\t')
df = df.dropna(subset=['page_text', 'bias']).reset_index(drop=True)
print(f"Loaded {len(df)} rows")
print(f"Label distribution:\n{df['bias'].value_counts()}")

# TEXT PREPROCESSING
def preprocess_text(text):
    """Comprehensive text preprocessing pipeline"""
    # Convert to string and lowercase
    text = str(text).lower()
    
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    
    # Remove email addresses
    text = re.sub(r'\S+@\S+', '', text)
    
    # Remove HTML tags and entities
    text = re.sub(r'<[^>]+>', '', text)
    text = re.sub(r'&[a-z]+;', '', text)
    
    # Remove special characters and digits (keep letters and spaces)
    text = re.sub(r'[^a-z\s]', '', text)
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    # Remove short words (< 3 chars) - but less aggressively
    words = text.split()
    words = [w for w in words if len(w) >= 2]  # Changed from >= 3 to >= 2
    
    # Remove stopwords (lighter set)
    words = [w for w in words if w not in stop_words]
    
    text = ' '.join(words)
    
    return text

# Apply preprocessing
print("\nPreprocessing text...")
df['text'] = df['page_text'].apply(preprocess_text)

# Remove rows with empty text after preprocessing
df = df[df['text'].str.len() > 0].reset_index(drop=True)
print(f"After preprocessing: {len(df)} rows")

# Encode labels
df['bias'] = pd.Categorical(df['bias'])
df['label'] = df['bias'].cat.codes
label_mapping_5 = dict(enumerate(df['bias'].cat.categories))
print(f"\nOriginal 5-class mapping: {label_mapping_5}")

# Consolidate to 3 classes: Left, Center, Right
def map_to_3_classes(label_code):
    """Map 5-class labels to 3-class labels"""
    original_label = label_mapping_5[label_code].lower()
    if 'left' in original_label:
        return 0  # Left
    elif 'right' in original_label:
        return 2  # Right
    else:  # center, least, etc.
        return 1  # Center

df['label'] = df['label'].map(map_to_3_classes)
label_mapping = {0: 'Left', 1: 'Center', 2: 'Right'}
print(f"\nConsolidated 3-class mapping: {label_mapping}")
print(f"Number of classes: {df['label'].nunique()}")
print(f"Samples per class:\n{df['label'].value_counts().sort_index()}")

Loaded 10832 rows
Label distribution:
bias
right            2986
leaning-left     2951
left             1930
leaning-right    1487
center           1478
Name: count, dtype: int64

Preprocessing text...


In [None]:
# Create dataset
class PandasTextDataset(torch.utils.data.Dataset):
    def __init__(self, df):
        self.texts = df['page_text'].tolist()
        self.labels = df['label'].tolist()
    def __len__(self):
        return len(self.texts)
    def __getitem__(self, idx):
        return {'text': self.texts[idx], 'label': torch.tensor(self.labels[idx], dtype=torch.long)}

dataset = PandasTextDataset(df)

# Define model with 5 classes - added dropout for regularization
class TransformerClassifier(nn.Module):
    def __init__(self, model_name, n_classes):
        super(TransformerClassifier, self).__init__()
        self.transformer = AutoModel.from_pretrained(model_name)
        layer_size = self.transformer.config.hidden_size

        self.classifer = nn.Sequential(
            nn.Dropout(0.1),
            nn.Linear(layer_size, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, n_classes)
        )

    def forward(self, x, attention_mask):
        with torch.no_grad():
            x = self.transformer(input_ids=x, attention_mask=attention_mask)
        x = x.last_hidden_state[:, 0, :]
        x = self.classifer(x)
        return x

model = TransformerClassifier(language_model, 3).to(device)
print(model)

Some weights of RobertaModel were not initialized from the model checkpoint at FacebookAI/roberta-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


TransformerClassifier(
  (transformer): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (L

In [None]:
# Create data loaders
batch_size = 32
n = len(dataset)
train_len = int(n * 0.7)
val_len = int(n * 0.15)
test_len = n - train_len - val_len

train_data, validation_data, test_data = torch.utils.data.random_split(dataset, [train_len, val_len, test_len])
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
validation_loader = DataLoader(validation_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

print(f"Total dataset: {n} samples")
print(f"Train: {len(train_data)} samples ({len(train_loader)} batches)")
print(f"Validation: {len(validation_data)} samples ({len(validation_loader)} batches)")
print(f"Test: {len(test_data)} samples ({len(test_loader)} batches)")
for i in train_loader:
    print(f"Sample text length: {len(i['text'][0])} chars")
    break

Total dataset: 10832 samples
Train: 7582 samples (237 batches)
Validation: 1624 samples (51 batches)
Test: 1626 samples (51 batches)
Sample text length: 6498 chars


In [None]:
loss_fn = nn.CrossEntropyLoss()
# Lower learning rate for better convergence on different models
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, weight_decay=0.01, eps=1e-8)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2)

def tokenize(text, device):
    tokens = tokenizer(
        text,
        return_tensors='pt',
        padding=True,
        truncation=True,
        max_length=512
    )
    return tokens['input_ids'].to(device), tokens['attention_mask'].to(device)

In [None]:
def train_data(start_epoch=0 , max_epochs=2 , save_snapshots=True):
    start_epoch = start_epoch
    max_epochs = max_epochs  # More epochs for new models to converge
    save_snapshots = True

    if start_epoch != 0:
        model.load_state_dict(torch.load(f"{checkpoint_folder}/epoch-{start_epoch}.pth"))

    best_acc = 0
    patience = 0
    patience_limit = 4

    for t in range(start_epoch+1, max_epochs+1):
        print(f"\nepoch {t}: ", end='')

        # TRAIN
        model.train()
        train_loss = 0
        for row in tqdm(train_loader):
            tokens, attention_mask = tokenize(row['text'], device)
            label = row["label"].to(device)

            optimizer.zero_grad()
            pred = model(tokens, attention_mask)
            loss = loss_fn(pred, label)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item()
        
        print(f"train_loss: {train_loss / len(train_loader):.4f}, ", end='')

        # VALIDATE
        model.eval()
        total_loss = 0
        correct = 0
        print(f"validation: ", end='')

        with torch.no_grad():
            for row in tqdm(validation_loader):
                tokens, attention_mask = tokenize(row['text'], device)
                label = row["label"].to(device)
                pred = model(tokens, attention_mask)
                correct += (pred.argmax(1) == label).type(torch.float).sum().item()
                total_loss += loss_fn(pred, label).item()

        avg_error = total_loss / len(validation_loader)
        accuracy = correct / len(validation_loader.dataset)
        print("error: {:.4f}, accuracy: {:.4f}".format(avg_error, accuracy))
        
        # Learning rate scheduling
        scheduler.step(accuracy)

        if save_snapshots and accuracy > best_acc:
            best_acc = accuracy
            torch.save(model.state_dict(),  f"{checkpoint_folder}/epoch-{t}.pth")
            print(f"  ‚Üí Saved checkpoint! Best so far: {best_acc:.4f}")
            patience = 0
        else:
            patience += 1
            if patience >= patience_limit:
                print(f"  ‚Üí Early stopping triggered (no improvement for {patience_limit} epochs)")
                break

    print(f"\nBEST ACC: {best_acc:.4f}")

train_data()


epoch 1: 

 35%|‚ñà‚ñà‚ñà‚ñç      | 82/237 [01:02<01:58,  1.31it/s]


KeyboardInterrupt: 

In [None]:
# TEST
def test(filename):
    model.load_state_dict(torch.load(filename))
    model.eval()
    total_loss = 0
    correct = 0
    with torch.no_grad():
        for row in tqdm(test_loader):
            tokens, attention_mask = tokenize(row['text'], device)
            label = row["label"].to(device)
            pred = model(tokens, attention_mask)
            correct += (pred.argmax(1) == label).type(torch.float).sum().item()
            total_loss += loss_fn(pred, label).item()

    avg_error = total_loss / len(test_loader)
    accuracy = correct / len(test_loader.dataset)
    print("error:", avg_error)
    print("accuracy:", accuracy)
test(f"{checkpoint_folder}/epoch-2.pth")

100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 141/141 [01:40<00:00,  1.40it/s]

error: 0.37048758598084147
accuracy: 0.9913333333333333





In [None]:
torch.cuda.empty_cache()

In [None]:
# MODEL COMPARISON: DeBERTa vs BERT
import time
from datetime import datetime

models_to_train = [
    "microsoft/deberta-base",
    "bert-base-uncased"
]

results = {}

def train_model(model_name, max_epochs=5):
    """Train a model and return metrics"""
    print(f"\n{'='*60}")
    print(f"Training {model_name}")
    print(f"{'='*60}\n")
    
    # Initialize tokenizer and model
    tokenizer_new = AutoTokenizer.from_pretrained(model_name)
    model_new = TransformerClassifier(model_name, 3).to(device)
    
    # Training setup
    optimizer_new = torch.optim.AdamW(model_new.parameters(), lr=5e-5, weight_decay=0.01, eps=1e-8)
    scheduler_new = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer_new, mode='max', factor=0.5, patience=2)
    
    # Store metrics
    metrics = {
        'model_name': model_name,
        'train_losses': [],
        'val_losses': [],
        'val_accuracies': [],
        'epochs_trained': 0,
        'best_accuracy': 0,
        'start_time': time.time(),
        'training_time': 0
    }
    
    def tokenize_new(text):
        tokens = tokenizer_new(
            text,
            return_tensors='pt',
            padding=True,
            truncation=True,
            max_length=512
        )
        return tokens['input_ids'].to(device), tokens['attention_mask'].to(device)
    
    best_acc = 0
    patience = 0
    patience_limit = 4
    
    for t in range(1, max_epochs+1):
        print(f"Epoch {t}/{max_epochs}: ", end='')
        
        # TRAIN
        model_new.train()
        train_loss = 0
        for row in tqdm(train_loader, disable=True):
            tokens, attention_mask = tokenize_new(row['text'])
            label = row["label"].to(device)
            
            optimizer_new.zero_grad()
            pred = model_new(tokens, attention_mask)
            loss = loss_fn(pred, label)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model_new.parameters(), 1.0)
            optimizer_new.step()
            train_loss += loss.item()
        
        avg_train_loss = train_loss / len(train_loader)
        metrics['train_losses'].append(avg_train_loss)
        print(f"train_loss: {avg_train_loss:.4f} | ", end='')
        
        # VALIDATE
        model_new.eval()
        total_loss = 0
        correct = 0
        
        with torch.no_grad():
            for row in tqdm(validation_loader, disable=True):
                tokens, attention_mask = tokenize_new(row['text'])
                label = row["label"].to(device)
                pred = model_new(tokens, attention_mask)
                correct += (pred.argmax(1) == label).type(torch.float).sum().item()
                total_loss += loss_fn(pred, label).item()
        
        avg_val_loss = total_loss / len(validation_loader)
        accuracy = correct / len(validation_loader.dataset)
        
        metrics['val_losses'].append(avg_val_loss)
        metrics['val_accuracies'].append(accuracy)
        
        print(f"val_loss: {avg_val_loss:.4f} | val_acc: {accuracy:.4f}")
        
        scheduler_new.step(accuracy)
        
        if accuracy > best_acc:
            best_acc = accuracy
            metrics['best_accuracy'] = best_acc
            patience = 0
        else:
            patience += 1
            if patience >= patience_limit:
                print(f"Early stopping triggered (no improvement for {patience_limit} epochs)")
                break
        
        metrics['epochs_trained'] = t
    
    metrics['training_time'] = time.time() - metrics['start_time']
    
    # TEST
    print(f"\nTesting {model_name}...")
    model_new.eval()
    total_loss = 0
    correct = 0
    
    with torch.no_grad():
        for row in tqdm(test_loader, disable=True):
            tokens, attention_mask = tokenize_new(row['text'])
            label = row["label"].to(device)
            pred = model_new(tokens, attention_mask)
            correct += (pred.argmax(1) == label).type(torch.float).sum().item()
            total_loss += loss_fn(pred, label).item()
    
    test_loss = total_loss / len(test_loader)
    test_accuracy = correct / len(test_loader.dataset)
    
    metrics['test_loss'] = test_loss
    metrics['test_accuracy'] = test_accuracy
    
    # Model size
    param_count = sum(p.numel() for p in model_new.parameters())
    metrics['parameters'] = param_count
    
    torch.cuda.empty_cache()
    
    return metrics

# Train both models
for model_name in models_to_train:
    results[model_name] = train_model(model_name, max_epochs=5)


In [None]:
# COMPARISON METRICS
import matplotlib.pyplot as plt
import numpy as np

print("\n" + "="*70)
print("MODEL COMPARISON RESULTS")
print("="*70 + "\n")

# Create comparison dataframe
comparison_data = []
for model_name, metrics in results.items():
    comparison_data.append({
        'Model': model_name.split('/')[-1],
        'Test Accuracy': f"{metrics['test_accuracy']:.4f}",
        'Test Loss': f"{metrics['test_loss']:.4f}",
        'Best Val Accuracy': f"{metrics['best_accuracy']:.4f}",
        'Epochs Trained': metrics['epochs_trained'],
        'Training Time (s)': f"{metrics['training_time']:.2f}",
        'Parameters': f"{metrics['parameters']:,}",
    })

comparison_df = pd.DataFrame(comparison_data)
print(comparison_df.to_string(index=False))
print("\n")

# Detailed metrics
for model_name, metrics in results.items():
    print(f"\n{'='*60}")
    print(f"Model: {model_name}")
    print(f"{'='*60}")
    print(f"Test Accuracy:        {metrics['test_accuracy']:.4f}")
    print(f"Test Loss:            {metrics['test_loss']:.4f}")
    print(f"Best Val Accuracy:    {metrics['best_accuracy']:.4f}")
    print(f"Epochs Trained:       {metrics['epochs_trained']}")
    print(f"Training Time:        {metrics['training_time']:.2f} seconds ({metrics['training_time']/60:.2f} minutes)")
    print(f"Parameters:           {metrics['parameters']:,}")
    print(f"Avg Params/Epoch:     {metrics['parameters']/metrics['epochs_trained']:,.0f}")
    print(f"Final Train Loss:     {metrics['train_losses'][-1]:.4f}")
    print(f"Final Val Loss:       {metrics['val_losses'][-1]:.4f}")


In [None]:
# VISUALIZATION: Model Comparison Plots
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Model Comparison: DeBERTa vs BERT', fontsize=16, fontweight='bold')

colors = ['#1f77b4', '#ff7f0e']

# 1. Validation Accuracy Over Epochs
ax = axes[0, 0]
for idx, (model_name, metrics) in enumerate(results.items()):
    epochs = range(1, len(metrics['val_accuracies']) + 1)
    ax.plot(epochs, metrics['val_accuracies'], marker='o', label=model_name.split('/')[-1], color=colors[idx], linewidth=2)
ax.set_xlabel('Epoch')
ax.set_ylabel('Validation Accuracy')
ax.set_title('Validation Accuracy Progression')
ax.legend()
ax.grid(True, alpha=0.3)

# 2. Validation Loss Over Epochs
ax = axes[0, 1]
for idx, (model_name, metrics) in enumerate(results.items()):
    epochs = range(1, len(metrics['val_losses']) + 1)
    ax.plot(epochs, metrics['val_losses'], marker='s', label=model_name.split('/')[-1], color=colors[idx], linewidth=2)
ax.set_xlabel('Epoch')
ax.set_ylabel('Validation Loss')
ax.set_title('Validation Loss Progression')
ax.legend()
ax.grid(True, alpha=0.3)

# 3. Test Accuracy Comparison
ax = axes[1, 0]
model_labels = [m.split('/')[-1] for m in results.keys()]
test_accuracies = [results[m]['test_accuracy'] for m in results.keys()]
bars = ax.bar(model_labels, test_accuracies, color=colors, alpha=0.8, edgecolor='black', linewidth=1.5)
ax.set_ylabel('Test Accuracy')
ax.set_title('Test Accuracy Comparison')
ax.set_ylim([0, 1])
for i, bar in enumerate(bars):
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2., height,
            f'{test_accuracies[i]:.4f}',
            ha='center', va='bottom', fontweight='bold')
ax.grid(True, alpha=0.3, axis='y')

# 4. Training Time vs Parameters
ax = axes[1, 1]
for idx, (model_name, metrics) in enumerate(results.items()):
    training_time = metrics['training_time']
    params = metrics['parameters'] / 1e6  # Convert to millions
    ax.scatter(params, training_time, s=300, color=colors[idx], 
               label=model_name.split('/')[-1], edgecolor='black', linewidth=2)
    ax.annotate(model_name.split('/')[-1], (params, training_time),
                xytext=(5, 5), textcoords='offset points', fontsize=10)
ax.set_xlabel('Parameters (Millions)')
ax.set_ylabel('Training Time (seconds)')
ax.set_title('Training Efficiency')
ax.grid(True, alpha=0.3)
ax.legend()

plt.tight_layout()
plt.show()

print("\nVisualization complete!")


In [None]:
# MODEL RECOMMENDATION & ANALYSIS
print("\n" + "="*70)
print("ANALYSIS & RECOMMENDATION")
print("="*70 + "\n")

# Find best performer
best_model = max(results.items(), key=lambda x: x[1]['test_accuracy'])
worst_model = min(results.items(), key=lambda x: x[1]['test_accuracy'])

print(f"üèÜ Best Performing Model: {best_model[0]}")
print(f"   Test Accuracy: {best_model[1]['test_accuracy']:.4f}")
print(f"   Test Loss: {best_model[1]['test_loss']:.4f}")

print(f"\n‚è±Ô∏è  Fastest Model: ", end="")
fastest = min(results.items(), key=lambda x: x[1]['training_time'])
print(f"{fastest[0]}")
print(f"   Training Time: {fastest[1]['training_time']:.2f} seconds")

print(f"\nüìä Most Parameters: ", end="")
largest = max(results.items(), key=lambda x: x[1]['parameters'])
print(f"{largest[0]}")
print(f"   Parameters: {largest[1]['parameters']:,}")

# Efficiency score (accuracy per second per million params)
print(f"\n‚ö° Efficiency Score (Test Acc per Sec per M Params):")
for model_name, metrics in results.items():
    efficiency = metrics['test_accuracy'] / (metrics['training_time'] * (metrics['parameters'] / 1e6))
    print(f"   {model_name.split('/')[-1]}: {efficiency:.6f}")

print(f"\n{'='*70}")
print("SUMMARY")
print(f"{'='*70}")
print(f"\n{'Model':<20} {'Accuracy':<12} {'Speed':<12} {'Parameters':<15}")
print("-" * 60)
for model_name, metrics in results.items():
    model_short = model_name.split('/')[-1]
    print(f"{model_short:<20} {metrics['test_accuracy']:<12.4f} {metrics['training_time']:<12.1f}s {metrics['parameters']:<15,}")

print(f"\n{'='*70}")
print("RECOMMENDATION FOR POLITICAL BIAS DETECTION")
print(f"{'='*70}")

# Compare accuracy difference
accuracy_values = [m['test_accuracy'] for m in results.values()]
best_acc = max(accuracy_values)
acc_diff = best_acc - min(accuracy_values)

if best_model[0] == "microsoft/deberta-base":
    print(f"\n‚úì DeBERTa-base shows superior performance for political bias detection")
    print(f"  ‚Ä¢ {acc_diff*100:.2f}% accuracy improvement over BERT")
    print(f"  ‚Ä¢ Better at capturing subtle linguistic patterns in political text")
    print(f"  ‚Ä¢ Improved attention mechanisms for nuanced bias detection")
    print(f"\n‚Üí RECOMMENDED: Use DeBERTa-base for production deployment")
else:
    print(f"\n‚úì BERT-base-uncased is sufficient for this task")
    print(f"  ‚Ä¢ Comparable accuracy to DeBERTa ({best_model[1]['test_accuracy']:.4f})")
    print(f"  ‚Ä¢ Faster training time ({fastest[1]['training_time']:.1f}s)")
    print(f"  ‚Ä¢ More widely supported and stable")
    print(f"\n‚Üí RECOMMENDED: Use BERT-base-uncased for speed, or DeBERTa for accuracy")

print("\n" + "="*70)
