In [4]:
import re

def clean_text(text):
    text = re.sub(r'http\S+', '', text)          # Remove URLs
    text = re.sub(r'@\w+', '', text)             # Remove mentions
    text = re.sub(r'#\w+', '', text)             # Remove hashtags
    text = re.sub(r'\s+', ' ', text).strip()     # Remove excess whitespace
    text = text.lower()
    return text

# Clean the dataset
texts = [clean_text(t) for t in texts]

In [5]:
# Split dataset, 80% train, 20% test, stratify labels to maintiain same class distribution in trian and test sets.
X_train, X_test, y_train, y_test = train_test_split(
    texts, labels, test_size=0.2, random_state=42, stratify=labels
)

In [6]:
# Initialize BERT tokenizer and model

from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')
model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=2)

# tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')
# model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

In [7]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)

In [8]:
# Tokenize training and testing data
train_encodings = tokenizer(X_train, truncation=True, padding=True, max_length=128)
test_encodings = tokenizer(X_test, truncation=True, padding=True, max_length=128)

In [9]:
class SentimentDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)

# Create datasets
train_dataset = SentimentDataset(train_encodings, y_train)
test_dataset = SentimentDataset(test_encodings, y_test)

In [10]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [11]:
from collections import Counter

class_counts = Counter(y_train)
total_samples = len(y_train)
class_weights = {
    i: total_samples / class_counts[i] for i in class_counts
}
class_weights_tensor = torch.tensor(
    [class_weights[0], class_weights[1]], dtype=torch.float
)

In [12]:
class WeightedLossTrainer(Trainer):
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        # print("Custom compute_loss is called")  # remove when done debugging
        labels = inputs.get("labels")
        # Forward pass
        outputs = model(**inputs)
        logits = outputs.get("logits")
        # Compute custom loss
        loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights)
        # Manual device placement
        # loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        # return (loss, outputs) if return_outputs else loss
        loss = loss_fct(logits, labels)
        loss = loss.unsqueeze(0)
        return (loss, outputs) if return_outputs else loss

In [13]:
# def compute_loss(model, inputs, return_outputs=False):
#     labels = inputs.get("labels")
#     outputs = model(**inputs)
#     logits = outputs.get("logits")
#     loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights_tensor)
#     loss = loss_fct(logits.view(-1, model.config.num_labels), labels.view(-1))
#     return (loss, outputs) if return_outputs else loss

In [14]:
training_args = TrainingArguments(
    output_dir='./results',          # Output directory
    num_train_epochs=3,              # Total number of training epochs
    per_device_train_batch_size=8,   # Batch size per device during training
    per_device_eval_batch_size=8,    # Batch size for evaluation
    warmup_steps=500,                # Number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # Strength of weight decay
    learning_rate=1e-5,              # Initial learning rate
    lr_scheduler_type='linear',      # Learning rate scheduler
    logging_dir='./logs',            # Directory for storing logs
    logging_steps=50,
    eval_strategy='epoch',           # Evaluate at each epoch
    save_strategy='epoch',           # Save model at each epoch
    save_total_limit=1,              # Keep only the last checkpoint
    disable_tqdm=True,               # Disable progress bars to reduce log size
    gradient_accumulation_steps=1,   # Accumulates gradients over 4 steps
    fp16=True,
    # max_steps=10
)

In [15]:
trainer = WeightedLossTrainer(
    model=model,                         # The instantiated Transformers model
    args=training_args,                  # Training arguments
    train_dataset=train_dataset,         # Training dataset
    eval_dataset=test_dataset,           # Evaluation dataset
    compute_metrics=compute_metrics,      # Evaluation metrics
    class_weights=class_weights_tensor   # To Include Class Weights
)

In [16]:
trainer.train()

In [17]:
eval_result = trainer.evaluate()
print(eval_result)

In [18]:
# Example prediction
def predict(text):
    encoding = tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=128)
    encoding = {key: val.to(device) for key, val in encoding.items()}  # Move inputs to GPU
    model.eval()
    with torch.no_grad():
        outputs = model(**encoding)
    logits = outputs.logits
    predicted_class = logits.argmax(-1).item()
    return predicted_class

# Test the prediction
sample_text = "I will blow these things up, not because they're made by these people. Just because they're weird."
print("Predicted class:", predict(sample_text))

In [2]:
import pandas as pd
import numpy as np
from transformers import BertTokenizerFast, BertForSequenceClassification, Trainer, TrainingArguments
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

In [3]:
data = pd.read_csv('../data/GoldStanderDataSet.csv', encoding='ISO-8859-1')
data = data.dropna(subset=['Text', 'Biased']) # Remove/drop NaN values
texts = data['Text'].tolist()
labels = data['Biased'].tolist()

data