In [None]:
import os, numpy as np, torch
import pandas as pd
from dataclasses import dataclass
from typing import Dict, Any
from transformers import (AutoTokenizer, AutoModelForSequenceClassification,
                          DataCollatorWithPadding, Trainer, TrainingArguments, AutoConfig)
from peft import LoraConfig, TaskType, get_peft_model
from sklearn.metrics import accuracy_score, f1_score, classification_report
import torch
from collections import Counter
import re
from datasets import load_dataset, concatenate_datasets, Features, Value
import numpy as np
from sklearn.metrics import accuracy_score, f1_score
from transformers import TrainingArguments, Trainer
from math import ceil
import optuna


def set_seed(seed: int = 8):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(8)

device = "cuda"

model = "finbert"  # "finbert" or "roberta"
strategy = "LoRA" 

In [None]:
def clean_text(text):
    text = re.sub(r'http\S+', '', text)  # Remove URLs
    text = re.sub(r'@\w+', '', text)     # Remove mentions
    text = re.sub(r'#\w+', '', text)     # Remove hashtags
    text = re.sub(r'\d+', '', text)      # Remove numbers
    text = re.sub(r'\s+', ' ', text)     # Remove extra spaces
    text = text.strip()                  # Remove leading/trailing spaces
    return text

ds_ax_clean = []
feats = Features({"text": Value("string"), "label": Value("int64")})
for x in ['emoji', 'emotion', 'hate', 'irony', 'offensive', 'sentiment', 'stance_abortion', 'stance_atheism', 'stance_climate', 'stance_feminist', 'stance_hillary']:
  ds = (load_dataset("cardiffnlp/tweet_eval", x)["train"])
  ds = ds.remove_columns("label")
  ds = ds.map(lambda x: {"text" : clean_text(x["text"]), "label": 3})
  ds = ds.cast(feats)
  ds_ax_clean.append(ds)
ds_ax_clean = concatenate_datasets(ds_ax_clean)


sampled_20000 = ds_ax_clean.shuffle(seed=8).select(range(min(20000, len(ds_ax_clean))))

ds_fin = load_dataset("zeroshot/twitter-financial-news-sentiment")
ds_fin = ds_fin.map(lambda x: {"text": clean_text(x["text"])})
full_ds = concatenate_datasets([ds_fin["train"], ds_fin["validation"], sampled_20000])



split_test = full_ds.train_test_split(train_size=0.90, seed=8)
train_full_ds = split_test["train"]
test_ds       = split_test["test"]

split_val = train_full_ds.train_test_split(train_size=0.88, seed=8)
train_ds = split_val["train"]
val_ds   = split_val["test"]

print(np.bincount(train_ds["label"], minlength=4))
print(np.bincount(val_ds["label"],   minlength=4))
print(np.bincount(test_ds["label"],  minlength=4))


In [None]:
if model =="finbert":
    MODEL_NAME = "ProsusAI/finbert"

elif model == "roberta":
    MODEL_NAME = "roberta-base"
OUTDIR = "out/exp1"
MAX_LEN = 128

tok = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)

def preprocess(batch):
    # batch["text"] is a list of strings when batched=True
    return tok(batch["text"], truncation=True, max_length=MAX_LEN)

# tokenize
train_tok = train_ds.map(preprocess, batched=True, remove_columns= ["text"])
val_tok   = val_ds.map(preprocess,   batched=True, remove_columns= ["text"])
test_tok  = test_ds.map(preprocess,  batched=True, remove_columns= ["text"])

# tell Datasets to return PyTorch tensors with the right columns
for split in (train_tok, val_tok, test_tok):
    split.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])

# use dynamic padding at batch time
collator = DataCollatorWithPadding(tokenizer=tok)

from collections import Counter

NUM_LABELS = 4  # since labels are 0,1,2,3
counts = Counter(train_ds["label"])  # {2: 5579, 1: 1710, 0: 1299}

# Inverse-frequency weights (normalized around 1.0)
total = sum(counts[c] for c in range(NUM_LABELS))
inv_freq = [total / (counts[i] if counts[i] > 0 else 1) for i in range(NUM_LABELS)]
weights = torch.tensor(inv_freq, dtype=torch.float32)
weights = weights / weights.mean()  # normalize to mean ~1
print("class weights (0,1,2,3):", weights.tolist())
weights = weights.to(device)


def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = logits.argmax(axis=-1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "macro_f1": f1_score(labels, preds, average="macro"),
    }




In [None]:
from torch import nn
counts = np.bincount(train_ds["label"], minlength=NUM_LABELS)
w = counts.sum() / np.maximum(counts, 1)
w = torch.tensor(w / w.mean(), dtype=torch.float32)

class CustomTrainer(Trainer):
    def __init__(self, class_weights=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights  # torch.tensor([...], float32) or None

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):  # <-- accept extras
        labels = inputs.pop("labels")                 # remove so model won't compute its own CE
        outputs = model(**inputs)
        logits = outputs.logits
        if self.class_weights is None:
            loss_fct = nn.CrossEntropyLoss()
        else:
            loss_fct = nn.CrossEntropyLoss(weight=self.class_weights.to(logits.device))
        loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
        return (loss, outputs) if return_outputs else loss



BATCH_SIZE = 16
EPOCHS = 5
WEIGHT_DECAY = 0.01


LORA_R = 8
LORA_ALPHA = 32
LORA_DROPOUT = 0.10

if model == "finbert":
    config = AutoConfig.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS)

    base = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME,   
        config=config,
        ignore_mismatched_sizes=True,   # re-inits the classifier layer to 4 classes
        force_download=True,            # avoids loading a stale cached 3-class head
    )
elif model == "roberta":
    base = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME,   
        num_labels=NUM_LABELS,
    )
if strategy == "LoRA":
    lora_cfg = LoraConfig(
        task_type=TaskType.SEQ_CLS,
        r=LORA_R, lora_alpha=LORA_ALPHA, lora_dropout=LORA_DROPOUT,
        target_modules=["query","key","value","dense"]  # robust set for RoBERTa
    )
    base = get_peft_model(base, lora_cfg)
    base.print_trainable_parameters()

import torch
base.to(device)

# Hyperparams (tweak if needed)
if strategy == "LoRA":
    LR = 1e-4
    training_args = TrainingArguments(
        output_dir="out/twitter-finance",
        per_device_train_batch_size=BATCH_SIZE,
        per_device_eval_batch_size=max(32, BATCH_SIZE),
        num_train_epochs=EPOCHS,
        learning_rate=LR,
        weight_decay=WEIGHT_DECAY,
        logging_steps=50,
        seed=8,
        fp16=(device == "cuda"),
        report_to="none",
    )
elif strategy == "FT":
    LR = 2e-5
    training_args = TrainingArguments(
        output_dir="out/twitter-finance_fullft",
        per_device_train_batch_size=BATCH_SIZE,
        per_device_eval_batch_size=max(32, BATCH_SIZE),
        num_train_epochs=EPOCHS,
        learning_rate=LR,
        weight_decay=WEIGHT_DECAY,
        warmup_ratio=0.1,                 # good default for full FT
        logging_steps=50,
        seed=8,
        fp16=(device == "cuda"),
        evaluation_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="macro_f1",
        greater_is_better=True,
        report_to="none",
    )

trainer = CustomTrainer(
    model=base,
    args=training_args,
    train_dataset=train_tok,
    eval_dataset=val_tok,
    tokenizer=tok,
    data_collator=collator,
    compute_metrics=compute_metrics,
    class_weights = w
)

trainer.train()
print("global_step:", trainer.state.global_step)
val_metrics = trainer.evaluate(val_tok)

score = float(val_metrics.get("eval_macro_f1", 0.0))
print(score)
# Report intermediate score for pruning, if desired:

test_metrics = trainer.evaluate(test_tok)
test_metrics


In [None]:
test_metrics