***FAKE NEWS DETECTION***

**DATASET AND SPLIT**

1. Load the fake and true csv files 
2. Label each dataset (0 for fake, 1 for true)
3. Combines them into one data set but shuffle rows to ensure random distribution
4. Split train 60%, validation 20%, and test 20%
5. Save after split sizes and class balance

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# Load datasets
fake_df = pd.read_csv('Fake.csv')
true_df = pd.read_csv('True.csv')

# Add a label column to each dataset
fake_df['label'] = 0  # 0 for fake news
true_df['label'] = 1  # 1 for true news

# Combine datasets
combined_df = pd.concat([fake_df, true_df], ignore_index=True)

# Shuffle the dataset
combined_df = combined_df.sample(frac=1, random_state=42).reset_index(drop=True)

# Split into train+val and test sets (80/20 split)
train_val_df, test_df = train_test_split(
    combined_df, 
    test_size=0.2,  # 20% for test set
    random_state=42,
    stratify=combined_df['label']  # This ensures same proportion of true/fake in each split
)

# Step 6: Split train+val into train and validation sets (75/25 split, resulting in 60/20/20 overall)
train_df, val_df = train_test_split(
    train_val_df,
    test_size=0.25,  # 25% of 80% = 20% overall for validation
    random_state=42,
    stratify=train_val_df['label']
)

# Verify the splits
print(f"Total dataset size: {len(combined_df)}")
print(f"Training set size: {len(train_df)} ({len(train_df)/len(combined_df)*100:.1f}%)")
print(f"Validation set size: {len(val_df)} ({len(val_df)/len(combined_df)*100:.1f}%)")
print(f"Test set size: {len(test_df)} ({len(test_df)/len(combined_df)*100:.1f}%)")

# Check class balance in each split
print("\nClass balance (percentage of true news):")
print(f"Original data: {combined_df['label'].mean()*100:.1f}%")
print(f"Training set: {train_df['label'].mean()*100:.1f}%")
print(f"Validation set: {val_df['label'].mean()*100:.1f}%")
print(f"Test set: {test_df['label'].mean()*100:.1f}%")

# Save the splits to CSV files
train_df.to_csv('train.csv', index=False)
val_df.to_csv('val.csv', index=False)
test_df.to_csv('test.csv', index=False)

In [None]:
!pip install -q transformers datasets accelerate scikit-learn pandas numpy matplotlib seaborn
!pip install torch --index-url https://download.pytorch.org/whl/cpu

import pandas as pd
import numpy as np
import torch
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
from transformers import TrainingArguments
from sklearn.dummy import DummyClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from transformers import (
    RobertaTokenizerFast,
    RobertaForSequenceClassification,
    Trainer,
    TrainingArguments,
    DataCollatorWithPadding
)
from datasets import Dataset
from huggingface_hub import login

# Initialize Hugging Face login
login("HUGGING FACE API KEY")

In [None]:
# Load data from CSV files
train_df = pd.read_csv("train.csv")
val_df = pd.read_csv("val.csv")
test_df = pd.read_csv("test.csv")

# Verify label distribution
print("Train distribution:\n", train_df['label'].value_counts(normalize=True))
print("\nValidation distribution:\n", val_df['label'].value_counts(normalize=True))
print("\nTest distribution:\n", test_df['label'].value_counts(normalize=True))

# Convert to Hugging Face Dataset format
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

In [None]:
# TF-IDF + Logistic Regression as a stronger baseline
tfidf = TfidfVectorizer(max_features=5000)
X_train = tfidf.fit_transform(train_df['text'])
X_val = tfidf.transform(val_df['text'])

# Baselines
baselines = {
    "majority": DummyClassifier(strategy="most_frequent"),
    "random": DummyClassifier(strategy="uniform"),
    "tfidf_lr": LogisticRegression(max_iter=1000)
}

for name, model in baselines.items():
    model.fit(X_train, train_df['label'])
    preds = model.predict(X_val)
    acc = accuracy_score(val_df['label'], preds)
    prec, rec, f1, _ = precision_recall_fscore_support(val_df['label'], preds, average='binary')
    print(f"{name.capitalize()} - Acc: {acc:.4f} | Prec: {prec:.4f} | Rec: {rec:.4f} | F1: {f1:.4f}")

***ROBERTA MODEL***

Robustly optimized BERT training approach (RoBERTa) was introduced by researchers at Facebook. This model is an improvised version of BERT, where it is trained using larger datasets, using high computational power. This new implementation will replaces BERT's static masking with dynamic pattern generation.
Since RoBERTa is pre-trained models, they accept input only in a certain format, vectors of integers, where each integer value represents a token. They accept the input sequences and converts them into the required formats as needed by the models. To avoid the problem of overfitting, I have decided to train the model for just 2 epoch for the model for optimal results. Data Collator will pad data so that all examples are the same input length.


In [None]:
# Load RoBERTa tokenizer
tokenizer = RobertaTokenizerFast.from_pretrained("roberta-base")

# Define the tokenization function
def tokenize_function(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=512)

# Tokenize
train_dataset = train_dataset.map(tokenize_function, batched=True)
val_dataset = val_dataset.map(tokenize_function, batched=True)
test_dataset = test_dataset.map(tokenize_function, batched=True)

# Remove non-input columns (AFTER tokenization)
columns_to_remove = ["title", "subject", "date"]  # KEEP 'text' until tokenized
train_dataset = train_dataset.remove_columns(columns_to_remove)
val_dataset = val_dataset.remove_columns(columns_to_remove)
test_dataset = test_dataset.remove_columns(columns_to_remove)

# Set format for PyTorch
train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
val_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
test_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])

# Load pre-trained RoBERTa model for binary classification
model = RobertaForSequenceClassification.from_pretrained("roberta-base", num_labels=2)

# Create a data collator for dynamic padding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
# Manually run a forward pass on one batch to test
from torch.utils.data import DataLoader

sample_loader = DataLoader(train_dataset, batch_size=2)
batch = next(iter(sample_loader))

# Move to same device as model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
batch = {k: v.to(device) for k, v in batch.items()}

with torch.no_grad():
    output = model(input_ids=batch["input_ids"], attention_mask=batch["attention_mask"], labels=batch["label"])
    print(output)


In [None]:
from transformers import (
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer
)

model = AutoModelForSequenceClassification.from_pretrained(
    "bert-base-uncased",  # Or any other model
    num_labels=2          # Number of classes
)

# Define compute_metrics function
def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'precision': precision,
        'recall': recall,
        'f1': f1,
    }

# Define training arguments

training_args = TrainingArguments(
    output_dir="./results",             
    num_train_epochs=1,                 # 2-3 epochs is the best
    per_device_train_batch_size=4,     # Smaller batch size here but you should use 8
    learning_rate=5e-5,                 # Slightly higher learning rate here but starting point should be 2e-5
    max_steps=1000,                     # Limit total training steps (optional because we wanted to minimize running time)
    logging_steps=100,                  # Log less frequently (prevent crashing runtime) 
    evaluation_strategy="epoch",        # Evaluate once per epoch
    save_strategy="epoch",              # Save once per epoch
    load_best_model_at_end=True,        # Early stopping (with callback)
    # weight_decay=0.05,                 # Set this for regularization
    # per_device_eval_batch_size=32,        # CRITICAL!!!)
)

# Trainer instance
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics
)

# Train the model
trainer.train()

In [None]:
# Get predictions for confusion matrix
predictions = trainer.predict(test_dataset)
predicted_labels = np.argmax(predictions.predictions, axis=1)

In [None]:
def plot_words(model, tokenizer, n=20):
    # Get weights of the classifier layer
    weights = model.classifier.dense.weight.detach().numpy()
    avg_weights = np.mean(weights, axis=0)
    # Get token ids sorted by importance
    top_indices = np.argsort(avg_weights)[-n:]
    # Get corresponding tokens
    top_tokens = [tokenizer.decode([i]) for i in top_indices]
    # Plot
    plt.figure(figsize=(10, 6))
    sns.barplot(x=avg_weights[top_indices], y=top_tokens)
    plt.title(f"Top {n} Important Tokens")
    plt.tight_layout()
    plt.show()

In [None]:
cm = confusion_matrix(test_dataset["label"], predicted_labels)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
           xticklabels=['Fake', 'Real'], 
           yticklabels=['Fake', 'Real'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix - RoBERTa')
plt.tight_layout()
plt.show()

In [None]:
trainer.push_to_hub()
tokenizer.push_to_hub("HUGGING FACE PATH")