# Class Consolidation Experiment

## Setup and Configuration

In [None]:
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from sklearn.utils.class_weight import compute_class_weight
from datasets import Dataset
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import os
import warnings
warnings.filterwarnings('ignore')

CONFIG = {
    "data_path": "enhanced_dataset/training_ready_dataset.csv",
    "splits_dir": "model_training_results/config",
    "output_dir": "class_consolidation_results",
    "model_name": "roberta-large",
    "experiment_name": "source_issues_focal_loss",
    "class_map": {
        "unreliable source": "source_issues",
        "slanted": "source_issues",
        "false": "false",
        "repurposed": "repurposed",
        "decorative": "decorative"
    },
    "train_args": {
        "epochs": 4,
        "lr": 2e-5,
        "batch_size": 8,
        "max_len": 512,
        "weight_decay": 0.01,
        "warmup_steps": 100,
        "focal_gamma": 2.0,
    },
    "eval_steps": 20,
    "save_steps": 100,
    "logging_steps": 10,
}

## Data Loading and Class Consolidation

In [None]:
def load_and_consolidate_data():
    """
    Load data and apply class consolidation mapping.
    Merges 'unreliable source' and 'slanted' into 'source_issues' category.
    """
    
    base = pd.read_csv(CONFIG['data_path'])
    splits = {
        k: pd.read_csv(f"{CONFIG['splits_dir']}/{k}_split.csv") 
        for k in ['train', 'val', 'test']
    }
    
    def consolidate_classes(df):
        df = df.copy()
        df['label'] = df['label'].map(CONFIG['class_map'])
        return df.dropna(subset=['label'])
    
    consolidated_splits = {k: consolidate_classes(df) for k, df in splits.items()}
    
    print("Class Distribution After Consolidation:")
    for split_name, split_df in consolidated_splits.items():
        print(f"\n{split_name.upper()} SET:")
        class_counts = split_df['label'].value_counts()
        for label, count in class_counts.items():
            percentage = (count / len(split_df)) * 100
            print(f"  {label}: {count} ({percentage:.1f}%)")
    
    return base, consolidated_splits

## Dataset Tokenization

In [None]:
def prepare_datasets(base, splits):
    """
    Tokenize text data and encode labels for model training.
    """
    
    tokenizer = AutoTokenizer.from_pretrained(CONFIG['model_name'])
    label_encoder = LabelEncoder().fit(splits['train']['label'])
    
    print(f"\nLabel Encoding:")
    for i, label in enumerate(label_encoder.classes_):
        print(f"  {label}: {i}")
    
    def process_split(df):
        df['label_id'] = label_encoder.transform(df['label'])
        ds = Dataset.from_pandas(df[['text', 'label_id']].rename(columns={'label_id': 'label'}))
        ds = ds.map(
            lambda ex: tokenizer(
                ex['text'], 
                truncation=True, 
                padding='max_length', 
                max_length=CONFIG['train_args']['max_len']
            ), 
            batched=True
        )
        ds.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
        return ds
    
    processed_datasets = {k: process_split(df) for k, df in splits.items()}
    
    return tokenizer, label_encoder, processed_datasets

## Focal Loss Implementation

In [None]:
class FocalLoss(torch.nn.Module):
    """
    Focal Loss implementation for handling class imbalance.
    """
    
    def __init__(self, alpha=None, gamma=2.0):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, logits, targets):
        if self.alpha is not None:
            self.alpha = self.alpha.to(logits.device)
        
        ce_loss = torch.nn.functional.cross_entropy(
            logits, targets, reduction='none', weight=self.alpha
        )
        pt = torch.exp(-ce_loss)
        focal_loss = ((1 - pt) ** self.gamma * ce_loss).mean()
        return focal_loss

class FocalTrainer(Trainer):
    """
    Custom trainer implementing focal loss for class-balanced training.
    """
    
    def __init__(self, *args, class_weights=None, gamma=2.0, **kwargs):
        super().__init__(*args, **kwargs)
        self.loss_fn = FocalLoss(alpha=class_weights, gamma=gamma)
    
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss = self.loss_fn(logits, labels)
        return (loss, outputs) if return_outputs else loss

## Model Training

In [None]:
def train_consolidated_model():
    """
    Train RoBERTa model with class consolidation and focal loss.
    """
    
    # Load and prepare data
    base, splits = load_and_consolidate_data()
    tokenizer, label_encoder, datasets = prepare_datasets(base, splits)
    
    # Initialize model
    model = AutoModelForSequenceClassification.from_pretrained(
        CONFIG['model_name'], 
        num_labels=len(label_encoder.classes_)
    )
    
    # Calculate class weights for balanced training
    y_train = splits['train']['label'].map(
        dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))
    ).values
    
    class_weights = compute_class_weight(
        'balanced', 
        classes=np.unique(y_train), 
        y=y_train
    )
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)
    
    print(f"\nClass Weights for Balanced Training:")
    for i, (label, weight) in enumerate(zip(label_encoder.classes_, class_weights)):
        print(f"  {label}: {weight:.3f}")
    
    # Training arguments
    training_args = TrainingArguments(
        output_dir="/tmp",
        evaluation_strategy="steps",
        save_strategy="steps",
        eval_steps=CONFIG['eval_steps'],
        save_steps=CONFIG['save_steps'],
        logging_steps=CONFIG['logging_steps'],
        per_device_train_batch_size=CONFIG['train_args']['batch_size'],
        per_device_eval_batch_size=CONFIG['train_args']['batch_size'],
        learning_rate=CONFIG['train_args']['lr'],
        num_train_epochs=CONFIG['train_args']['epochs'],
        weight_decay=CONFIG['train_args']['weight_decay'],
        load_best_model_at_end=True,
        metric_for_best_model="eval_f1",
        greater_is_better=True,
        report_to="none"
    )
    
    def compute_metrics(eval_pred):
        predictions = np.argmax(eval_pred.predictions, axis=1)
        return {
            "accuracy": accuracy_score(eval_pred.label_ids, predictions),
            "f1": f1_score(eval_pred.label_ids, predictions, average='macro')
        }
    
    # Initialize trainer
    trainer = FocalTrainer(
        model=model,
        args=training_args,
        train_dataset=datasets['train'],
        eval_dataset=datasets['val'],
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
        class_weights=class_weights_tensor,
        gamma=CONFIG['train_args']['focal_gamma']
    )
    
    # Train model
    print(f"\nStarting model training...")
    trainer.train()
    
    return trainer, label_encoder, splits

## Performance Analysis


In [None]:
def analyze_model_performance(trainer, label_encoder, splits):
    """
    Comprehensive performance analysis and results export.
    """
    
    # Create results directory
    results_dir = os.path.join(CONFIG['output_dir'], "analysis_results")
    os.makedirs(results_dir, exist_ok=True)
    
    # Prepare test dataset for prediction
    tokenizer = trainer.tokenizer
    test_texts = splits['test']['text'].tolist()
    test_labels = label_encoder.transform(splits['test']['label'])
    
    test_dataset = Dataset.from_dict({
        'text': test_texts,
        'label': test_labels
    })
    
    test_dataset = test_dataset.map(
        lambda ex: tokenizer(
            ex['text'], 
            truncation=True, 
            padding='max_length', 
            max_length=CONFIG['train_args']['max_len']
        ), 
        batched=True
    )
    test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
    
    # Generate predictions
    predictions_output = trainer.predict(test_dataset)
    y_true = predictions_output.label_ids
    y_pred = np.argmax(predictions_output.predictions, axis=1)
    
    # Calculate comprehensive metrics
    overall_accuracy = accuracy_score(y_true, y_pred)
    overall_f1 = f1_score(y_true, y_pred, average='macro')
    
    print(f"\nOverall Test Performance:")
    print(f"  Accuracy: {overall_accuracy:.3f}")
    print(f"  Macro F1: {overall_f1:.3f}")
    
    # Per-class metrics
    class_report = classification_report(
        y_true, y_pred, 
        target_names=label_encoder.classes_, 
        output_dict=True
    )
    
    print(f"\nPer-Class Performance:")
    for class_name in label_encoder.classes_:
        metrics = class_report[class_name]
        print(f"  {class_name}:")
        print(f"    Precision: {metrics['precision']:.3f}")
        print(f"    Recall: {metrics['recall']:.3f}")
        print(f"    F1-Score: {metrics['f1-score']:.3f}")
        print(f"    Support: {int(metrics['support'])}")
    
    # Export detailed classification report
    with open(f"{results_dir}/classification_report.txt", "w") as f:
        f.write("CLASS CONSOLIDATION EXPERIMENT - DETAILED RESULTS\n")
        f.write("=" * 60 + "\n\n")
        f.write("OVERALL PERFORMANCE:\n")
        f.write(f"  Test Accuracy: {overall_accuracy:.3f}\n")
        f.write(f"  Macro F1-Score: {overall_f1:.3f}\n\n")
        f.write("PER-CLASS METRICS:\n")
        f.write("-" * 40 + "\n")
        
        for class_name in label_encoder.classes_:
            metrics = class_report[class_name]
            f.write(f"\n{class_name.upper()}:\n")
            f.write(f"  Precision: {metrics['precision']:.3f}\n")
            f.write(f"  Recall: {metrics['recall']:.3f}\n")
            f.write(f"  F1-Score: {metrics['f1-score']:.3f}\n")
            f.write(f"  Support: {int(metrics['support'])} samples\n")
        
        # Highlight source_issues performance for thesis
        if 'source_issues' in class_report:
            source_metrics = class_report['source_issues']
            f.write(f"\nKEY FINDING - SOURCE_ISSUES PERFORMANCE:\n")
            f.write(f"  F1-Score: {source_metrics['f1-score']:.3f}\n")
            f.write(f"  Recall: {source_metrics['recall']:.3f} ({source_metrics['recall']*100:.0f}%)\n")
            f.write(f"  Precision: {source_metrics['precision']:.3f}\n")
    
    # Export thesis summary
    with open(f"{results_dir}/thesis_summary.txt", "w") as f:
        f.write("THESIS SUMMARY - CLASS CONSOLIDATION RESULTS\n")
        f.write("=" * 50 + "\n\n")
        f.write("APPROACH:\n")
        f.write("- Merged 'unreliable source' and 'slanted' into 'source_issues'\n")
        f.write("- Used focal loss for class balancing\n")
        f.write("- Applied RoBERTa-large with optimized hyperparameters\n\n")
        
        if 'source_issues' in class_report:
            source_metrics = class_report['source_issues']
            f.write("KEY RESULTS:\n")
            f.write(f"- source_issues F1-score: {source_metrics['f1-score']:.2f}\n")
            f.write(f"- source_issues recall: {source_metrics['recall']*100:.0f}%\n")
            f.write(f"- Overall macro F1: {overall_f1:.3f}\n\n")
    
    return class_report

## Execute Experiment

In [None]:
def run_class_consolidation_experiment():
    """
    Execute complete class consolidation experiment.
    """
    
    print("CLASS CONSOLIDATION EXPERIMENT")
    print("=" * 60)
    print("Merging 'unreliable source' and 'slanted' into 'source_issues'")
    print("Using focal loss and class-balanced training\n")
    
    try:
        # Train model
        trainer, label_encoder, splits = train_consolidated_model()
        
        # Analyze performance
        results = analyze_model_performance(trainer, label_encoder, splits)
        
        print(f"\nExperiment completed successfully.")
        print(f"Check class_consolidation_results/analysis_results/ for detailed outputs.")
        
        return trainer, results
        
    except Exception as e:
        print(f"Error during experiment: {e}")
        return None, None

trainer, results = run_class_consolidation_experiment()