In [None]:
!pip install transformers torch datasets evaluate wandb scikit-learn
!nvidia-smi

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import json
import csv
import numpy as np
import torch
from torch.utils.data import Dataset
from transformers import (
    RobertaTokenizer,
    RobertaForSequenceClassification,
    TrainingArguments,
    Trainer
)
import wandb
import evaluate
from google.colab import drive

In [None]:
!tar -xvf drive/MyDrive/data.tar.gz

In [None]:

JSON_DIR = "./data/jsons"
SPLIT_TYPES = ['random']
MODEL_NAME = "roberta-base"
MAX_LENGTH = 512

In [None]:
id_to_data = {}
missing_count = 0

for filename in os.listdir(JSON_DIR):
    if filename.endswith(".json"):
        try:
            with open(os.path.join(JSON_DIR, filename), 'r') as f:
                data = json.load(f)
                if 'ID' in data and 'content_original' in data and 'bias' in data:
                    id_to_data[data['ID']] = data
                else:
                    print(f"Skipping invalid JSON: {filename}")
                    continue
        except Exception as e:
            print(f"Error loading {filename}: {e}")
            missing_count += 1

print(f"Successfully loaded {len(id_to_data)} articles")
if missing_count > 0:
    print(f"Warning: Failed to load {missing_count} files")

In [None]:

def load_split(split_type, split_name):
    """Load TSV files from ./data/splits/[split_type]/[split_name].tsv"""
    tsv_path = f"./data/splits/{split_type}/{split_name}.tsv"
    print(f"🔄 Looking for split file at: {tsv_path}")  # Debug path

    if not os.path.exists(tsv_path):
        raise FileNotFoundError(
            f" Missing split file! Verify these exist:\n"
            f"1. Directory structure: ./data/splits/{split_type}/\n"
            f"2. File name: {split_name}.tsv\n"
            f"3. File extension: .tsv (not .txt)"
        )

    ids, labels = [], []
    with open(tsv_path, 'r', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter='\t')
        next(reader)  # Skip header
        for row in reader:
            if len(row) == 2:
                ids.append(row[0])
                labels.append(int(row[1]))

    print(f"✅ Loaded {len(ids)} samples from {tsv_path}")
    return ids, labels

In [None]:
class BiasDataset(Dataset):
    def __init__(self, ids, labels, id_to_data, tokenizer, max_length):
        self.ids = ids
        self.labels = labels
        self.id_to_data = id_to_data
        self.tokenizer = tokenizer
        self.msax_length = max_length

        # Verify all IDs exist
        missing_ids = [id_ for id_ in ids if id_ not in id_to_data]
        if missing_ids:
            print(f"Warning: {len(missing_ids)} IDs not found in data")

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, idx):
        try:
            article = self.id_to_data[self.ids[idx]]
            encoding = self.tokenizer(
                article['content_original'],
                truncation=True,
                max_length=self.max_length,
                padding='max_length',
                return_tensors='pt'
            )
            return {
                'input_ids': encoding['input_ids'].flatten(),
                'attention_mask': encoding['attention_mask'].flatten(),
                'labels': torch.tensor(self.labels[idx], dtype=torch.long)
            }
        except KeyError:
            print(f"Missing article for ID: {self.ids[idx]}")
            return None  # Will be handled by Trainer

In [None]:
accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")

def compute_metrics(eval_pred):
    try:
        logits, labels = eval_pred
        preds = np.argmax(logits, axis=-1)
        return {
            "accuracy": accuracy_metric.compute(predictions=preds, references=labels)["accuracy"],
            "f1_weighted": f1_metric.compute(predictions=preds, references=labels, average="weighted")["f1"],
            "f1_macro": f1_metric.compute(predictions=preds, references=labels, average="macro")["f1"]
        }
    except Exception as e:
        print(f"Error computing metrics: {e}")
        return {}

In [None]:
# Initialize WandB
try:
    wandb.login()
except Exception as e:
    print(f"WandB login failed: {e}")
    raise

In [None]:
for split_type in SPLIT_TYPES:
    print(f"\n{'='*40}\nTraining on {split_type} split\n{'='*40}")

    try:
        # Load splits

        train_ids, train_labels = load_split(split_type, 'train')
        val_ids, val_labels = load_split(split_type, 'valid')
        test_ids, test_labels = load_split(split_type, 'test')

        # Initialize model components
        tokenizer = RobertaTokenizer.from_pretrained(MODEL_NAME)
        model = RobertaForSequenceClassification.from_pretrained(
            MODEL_NAME,
            num_labels=3,
            id2label={0: "left", 1: "center", 2: "right"}
        ).to('cuda')

        # Create datasets
        train_dataset = BiasDataset(train_ids, train_labels, id_to_data, tokenizer, MAX_LENGTH)
        val_dataset = BiasDataset(val_ids, val_labels, id_to_data, tokenizer, MAX_LENGTH)
        test_dataset = BiasDataset(test_ids, test_labels, id_to_data, tokenizer, MAX_LENGTH)

        # Training setup
        training_args = TrainingArguments(
            output_dir=os.path.join("results/{split_type}"),
            evaluation_strategy='epoch',
            save_strategy='epoch',
            learning_rate=2e-5,
            per_device_train_batch_size=8,
            per_device_eval_batch_size=8,
            num_train_epochs=4,
            weight_decay=0.01,
            fp16=True,
            load_best_model_at_end=True,
            metric_for_best_model='f1_macro',
            report_to="wandb",
            logging_steps=50,
            push_to_hub=False,
            warmup_steps=100,                          # Gradually ramp up LR(helps a bit with accuracy later)
            lr_scheduler_type="linear",
        )

        # Init trainer
        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            compute_metrics=compute_metrics,
        )

        # WandB
        wandb.init(
            project="political-bias-detection",
            name=f"{MODEL_NAME}-{split_type}",
            config=training_args.to_dict()
        )

        # Training
        trainer.train()
        trainer.save_model()

        # Final evaluation
        test_results = trainer.evaluate(test_dataset)
        print(f"\nTest results ({split_type}):")
        print(f"Accuracy: {test_results['eval_accuracy']:.4f}")
        print(f"Weighted F1: {test_results['eval_f1_weighted']:.4f}")
        print(f"Macro F1: {test_results['eval_f1_macro']:.4f}")


    except Exception as e:
        print(f"Error during {split_type} training: {e}")
        raise

        # save_path = f"./saved_models/{split_type}_model"
        # os.makedirs(save_path, exist_ok=True)
        # model.save_pretrained(save_path)
        # tokenizer.save_pretrained(save_path)
        # print(f"\n Model saved to: {save_path}")
        # print(f"Contents: {os.listdir(save_path)}")




In [None]:
!tar -cvf large.tar.gz results/random/checkpoint-13992

In [None]:
!cp large.tar.gz /content/drive/MyDrive/

In [None]:

save_dir = f"/content/drive/MyDrive/saved_models/{split_type}_model"


import os
os.makedirs(save_dir, exist_ok=True)


model.save_pretrained(save_dir)
tokenizer.save_pretrained(save_dir)
print(f"Model saved to: {save_dir}")

import wandb
api = wandb.Api()

sweep = api.sweep("")
runs = sorted(sweep.runs,
  key=lambda run: run.summary.get("val_acc", 0), reverse=True)
val_acc = runs[0].summary.get("val_acc", 0)
print(f"Best run {runs[0].name} with {val_acc}% validation accuracy")

runs[0].file("model.h5").download(replace=True)
print("Best model saved to model-best.h5")