In [None]:
import re
import warnings
from collections import Counter

import evaluate
import matplotlib.pyplot as plt
import nltk
import numpy as np
import optuna
import pandas as pd
import torch
from datasets import Dataset
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score,
    average_precision_score,
    classification_report,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
)
from sklearn.model_selection import StratifiedKFold, cross_val_predict, train_test_split
from sklearn.multiclass import OneVsRestClassifier
from sklearn.preprocessing import label_binarize
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    TextClassificationPipeline,
    Trainer,
    TrainingArguments,
)
from transformers_interpret import SequenceClassificationExplainer
from xgboost import XGBClassifier


# Download NLTK resources
nltk.download("stopwords")
nltk.download("punkt_tab")

# 1. Global Configurations

In [None]:
LABEL_TO_ID = {"negative": 0, "neutral": 1, "positive": 2}
STOP_WORDS = set(stopwords.words("english"))

# 2. Helper Functions

In [None]:
def clean_text(text: str) -> str:
    """Normalize and tokenize by removing punctuation and stopwords."""
    text = text.lower()
    text = re.sub(r"[^a-z\s]", "", text)
    tokens = word_tokenize(text)
    filtered_tokens = [word for word in tokens if word not in STOP_WORDS]
    return " ".join(filtered_tokens)

In [None]:
def get_top_words_by_label(df, label, top_n=20):
    """Returns the most frequent words in a sentiment class."""
    sentences = df[df["sentiment"] == label]["sentence"]
    words = [
        word.lower()
        for sentence in sentences
        for word in word_tokenize(sentence)
        if word.isalpha() and word.lower() not in STOP_WORDS
    ]
    most_common = Counter(words).most_common(top_n)
    return pd.DataFrame(most_common, columns=["word", "count"])

In [None]:
def classification_metrics(y_true, y_pred, y_prob):
    y_bin = label_binarize(y_true, classes=np.unique(y_true))

    return {
        "accuracy": accuracy_score(y_true, y_pred),
        "precision": precision_score(y_true, y_pred, average="macro", zero_division=0),
        "recall": recall_score(y_true, y_pred, average="macro", zero_division=0),
        "f1_score": f1_score(y_true, y_pred, average="weighted"),
        "roc_auc": roc_auc_score(y_true, y_prob, multi_class="ovr"),
        "avg_precision": average_precision_score(y_bin, y_prob, average="macro"),
    }

# 3. Load Data

In [None]:
file_path = "../data/FinancialPhraseBank-v1.0/Sentences_75Agree.txt"
with open(file_path, "r", encoding="ISO-8859-1") as file:
    lines = [line.strip() for line in file if "@" in line]

data = [line.rsplit("@", 1) for line in lines]
df_raw = pd.DataFrame(data, columns=["sentence", "sentiment"])

In [None]:
print(f"Total examples: {len(df_raw)}")
print("\nClass distribution:\n", df_raw["sentiment"].value_counts())

# 4. Preprocessing

In [None]:
# Apply text cleaning
df_raw["clean_text"] = df_raw["sentence"].apply(clean_text)

# Assign numeric labels
df_raw["label"] = df_raw["sentiment"].map(LABEL_TO_ID)

# Train-eval split
train_df, eval_df = train_test_split(
    df_raw, test_size=0.2, stratify=df_raw["label"], random_state=42
)

# 5. Models

## 5.1. Model 1: TF-IDF + Logistic Regression

##### Prepare data

In [None]:
X_train = train_df["clean_text"]
X_eval = eval_df["clean_text"]
y_train = train_df["label"]
y_eval = eval_df["label"]

##### TF-IDF Vectorization

In [None]:
vectorizer = TfidfVectorizer(max_features=10_000, ngram_range=(1, 2))
X_train_vec = vectorizer.fit_transform(X_train)
X_eval_vec = vectorizer.transform(X_eval)

##### Hyperparameter Tuning with Optuna

In [None]:
def objective(trial):
    params = {
        "C": trial.suggest_float("C", 1e-3, 1e2, log=True),
        "penalty": trial.suggest_categorical("penalty", ["l1", "l2"]),
        "solver": trial.suggest_categorical("solver", ["liblinear", "saga"]),
        "max_iter": 1_000,
        "multi_class": "ovr",
    }

    # Some combinations are invalid
    if params["penalty"] == "l1" and params["solver"] == "saga":
        pass  # válido
    elif params["penalty"] == "l1" and params["solver"] != "liblinear":
        raise optuna.exceptions.TrialPruned()

    model = OneVsRestClassifier(LogisticRegression(**params))
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    with warnings.catch_warnings(action="ignore"):
        preds = cross_val_predict(
            model, X_train_vec, y_train, cv=skf, method="predict_proba"
        )

    roc = roc_auc_score(y_train, preds, multi_class="ovr")
    return roc


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

##### Training best model

In [None]:
best_params = study.best_params
best_params["max_iter"] = 1_000
best_params["multi_class"] = "ovr"

logreg = OneVsRestClassifier(LogisticRegression(**best_params))
logreg.fit(X_train_vec, y_train)

##### Run predictions on the evaluation set

In [None]:
y_pred = logreg.predict(X_eval_vec)
y_prob = logreg.predict_proba(X_eval_vec)

##### Evaluate results

In [None]:
print(classification_report(y_eval, y_pred))

In [None]:
logreg_metrics = classification_metrics(y_eval, y_pred, y_prob)
df_logreg_metrics = pd.DataFrame.from_dict(
    logreg_metrics, orient="index", columns=["scores"]
)
print(df_logreg_metrics)

## 5.2. Model 2: TF-IDF + XGBoost

##### Prepare data

In [None]:
X_train = train_df["clean_text"]
X_eval = eval_df["clean_text"]
y_train = train_df["label"]
y_eval = eval_df["label"]

##### TF-IDF Vectorization

In [None]:
vectorizer = TfidfVectorizer(max_features=10_000, ngram_range=(1, 2))
X_train_vec = vectorizer.fit_transform(X_train)
X_eval_vec = vectorizer.transform(X_eval)

##### Hyperparameter Tuning with Optuna

In [None]:
def objective(trial):
    params = {
        "max_depth": trial.suggest_int("max_depth", 3, 10),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3),
        "n_estimators": trial.suggest_int("n_estimators", 50, 200),
        "subsample": trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "eval_metric": "mlogloss",
    }

    model = XGBClassifier(**params)
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    preds = cross_val_predict(
        model, X_train_vec, y_train, cv=skf, method="predict_proba"
    )
    roc = roc_auc_score(y_train, preds, multi_class="ovr")
    return roc


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

##### Training best model

In [None]:
best_params = study.best_params
best_params["eval_metric"] = "mlogloss"

xgb = XGBClassifier(**best_params)
xgb.fit(X_train_vec, y_train)

##### Run predictions on the evaluation set

In [None]:
y_pred = xgb.predict(X_eval_vec)
y_prob = xgb.predict_proba(X_eval_vec)

##### Evaluate results

In [None]:
print(classification_report(y_eval, y_pred))

In [None]:
xgb_metrics = classification_metrics(y_eval, y_pred, y_prob)
df_xgb_metrics = pd.DataFrame.from_dict(xgb_metrics, orient="index", columns=["scores"])
print(df_xgb_metrics)

## 5.3. Model 3: Zero-shot with LLM (HuggingFace)

##### Prepare data

In [None]:
X_train = train_df["sentence"]
X_eval = eval_df["sentence"]
y_train = train_df["label"]
y_eval = eval_df["label"]

##### Load a pre-trained transformer for zero-shot classification

In [None]:
checkpoint = "cardiffnlp/twitter-roberta-base-sentiment"

tokenizer = AutoTokenizer.from_pretrained(checkpoint)
model = AutoModelForSequenceClassification.from_pretrained(checkpoint)

device = 0 if torch.cuda.is_available() else -1
zero_shot_classifier = TextClassificationPipeline(
    model=model,
    tokenizer=tokenizer,
    return_all_scores=True,
    device=device,
    task="zero-shot-classification",
)

##### Run zero-shot classification on the evaluation set

In [None]:
sentences = X_eval.tolist()
batch_size = 32

y_pred = []
y_prob = []

for i in range(0, len(sentences), batch_size):
    batch = sentences[i : i + batch_size]
    outputs = zero_shot_classifier(batch)

    for probs in outputs:
        scores = [s["score"] for s in probs]
        y_prob.append(scores)
        y_pred.append(int(np.argmax(scores)))

y_pred = np.array(y_pred)
y_prob = np.array(y_prob)

##### Evaluate results

In [None]:
print(classification_report(y_eval, y_pred))

In [None]:
zero_shot_metrics = classification_metrics(y_eval, y_pred, y_prob)
df_zero_shot_metrics = pd.DataFrame.from_dict(
    zero_shot_metrics, orient="index", columns=["scores"]
)
print(df_zero_shot_metrics)

## 5.4. Model 4: Fine-tuned LLM using PEFT + LoRA

##### Prepare data

In [None]:
X_train = train_df["sentence"]
X_eval = eval_df["sentence"]
y_train = train_df["label"]
y_eval = eval_df["label"]

##### Load tokenizer and base model for fine-tuning

In [None]:
checkpoint = "cardiffnlp/twitter-roberta-base-sentiment"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

##### Split train / test

In [None]:
df_train = pd.DataFrame({"text": X_train, "label": y_train})
dataset = Dataset.from_pandas(df_train)


def tokenize(example):
    return tokenizer(example["text"], truncation=True)


tokenized = dataset.map(tokenize, batched=True)
tokenized = tokenized.train_test_split(test_size=0.1)
train_ds = tokenized["train"]
eval_ds = tokenized["test"]

##### Prepare model for parameter-efficient fine-tuning

In [None]:
base_model = AutoModelForSequenceClassification.from_pretrained(
    checkpoint, num_labels=3
)
base_model.gradient_checkpointing_enable()
base_model = prepare_model_for_kbit_training(base_model)

##### Define LoRA configuration

In [None]:
peft_config = LoraConfig(
    r=8,
    lora_alpha=32,
    lora_dropout=0.1,
    bias="none",
    task_type="SEQ_CLS",
)
peft_model = get_peft_model(base_model, peft_config)

##### Define training arguments

In [None]:
training_args = TrainingArguments(
    output_dir="./qlora_model",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    learning_rate=2e-4,
    num_train_epochs=3,
    save_strategy="epoch",
    logging_dir="./logs",
    logging_steps=50,
    # fp16=True,
    metric_for_best_model="accuracy",
)

##### Define trainer

In [None]:
metric = evaluate.load("accuracy")


def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)
    return metric.compute(predictions=preds, references=labels)


trainer = Trainer(
    model=peft_model,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=eval_ds,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

##### Train model

In [None]:
trainer.train()

##### Make predictions

In [None]:
# Manually tokenize the evaluation set
eval_encodings = tokenizer(
    list(X_eval), truncation=True, padding=True, return_tensors="pt"
)

# Move tensors to the same device as the model
eval_encodings = {k: v.to(model.device) for k, v in eval_encodings.items()}

model = trainer.model
model.to("cpu")
model.eval()

# Disable gradient calculation
with torch.no_grad():
    outputs = model(**eval_encodings)
    logits = outputs.logits
    y_prob = torch.nn.functional.softmax(logits, dim=-1).cpu().numpy()
    y_pred = np.argmax(y_prob, axis=1)

##### Evaluate results

In [None]:
print(classification_report(y_eval, y_pred))

In [None]:
fine_tuning_metrics = classification_metrics(y_eval, y_pred, y_prob)
df_fine_tuning_metrics = pd.DataFrame.from_dict(
    fine_tuning_metrics, orient="index", columns=["scores"]
)
print(df_fine_tuning_metrics.head(10))

##### Misclassified Examples

In [None]:
y_true = y_eval.reset_index(drop=True)
y_pred_labels = y_pred

df_errors = pd.DataFrame(
    {
        "Text": X_eval.reset_index(drop=True),
        "True Label": y_true,
        "Predicted Label": y_pred_labels,
    }
)

df_misclassified = df_errors[df_errors["True Label"] != df_errors["Predicted Label"]]
sample_errors = df_misclassified.sample(3, random_state=42)

explainer = SequenceClassificationExplainer(model, tokenizer)

for i, (_, row) in enumerate(sample_errors.iterrows()):
    print("=" * 20, f"Example {i + 1}", "=" * 20)
    print(f"Text:\n{row['Text']}\n")
    print(f"True Label: {row['True Label']}")
    print(f"Predicted Label: {row['Predicted Label']}")
    word_attributions = explainer(row["Text"])
    explainer.visualize()

# 6. Evaluation

In [None]:
def compare_models(metrics_dict):
    df_compare = pd.DataFrame(metrics_dict).T
    df_compare = df_compare[
        ["accuracy", "precision", "recall", "f1_score", "roc_auc", "avg_precision"]
    ]
    df_compare.index.name = "Model"
    return df_compare


metrics_dict = {
    "TF-IDF + Logistic Regression": logreg_metrics,
    "TF-IDF + XGBoost": xgb_metrics,
    "Zero-shot LLM": zero_shot_metrics,
    "Fine-tuned LLM": fine_tuning_metrics,
}

df_compare = compare_models(metrics_dict)
print("\nModel Comparison:\n")
print(df_compare)

In [None]:
df_compare.T.plot(kind="bar", figsize=(8, 5))
plt.title("Model Comparison")
plt.ylabel("Score")
plt.xlabel("Metric")
plt.xticks(rotation=45)
plt.grid(True, axis="y", linestyle="--", alpha=0.7)
plt.legend(title="Model", loc="lower right")
plt.tight_layout()
plt.show()