<a href="https://colab.research.google.com/github/Takshg/Sentiment-Analysis-Research-Project/blob/main/DeBERTa_Base_Test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# FABSA MVP Colab Test Notebook


## Configuration


In [None]:
# Setting up the device for GPU usage

from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
print(device)

cuda


In [None]:
from pathlib import Path

DATA_DIR = Path("data")
FABSA_MASTER = DATA_DIR / "fabsa_dataset.csv"
REVIEWS_DIR = DATA_DIR / "Raw_Reviews"
OUT_DIR = Path("outputs")
MODEL_DIR = OUT_DIR / "Models" / "deberta_pair"
PREDS_DIR = OUT_DIR / "Preds"
REPORTS_DIR = OUT_DIR / "Reports"
for x in [DATA_DIR, REVIEWS_DIR, MODEL_DIR, PREDS_DIR, REPORTS_DIR]:
    x.mkdir(parents=True, exist_ok=True)

# FABSA Dataset
ID_COL = "id"
TEXT_COL = "text"
LABELS_COL = "labels"
EXTRA_COLS = ["org_index", "industry"]

# Task space
ASPECTS = [
    "app-website",
    "general-satisfaction",
    "ease-of-use",
    "attitude-of-staff",
    "price-value-for-money",
    "speed",
    "competitor",
    "account-access",
    "discounts-promotions",
    "phone",
    "reviews",
    "email",
]

LABELS = ["negative", "neutral", "positive", "absent"]
LABEL2ID = {label: idx for idx, label in enumerate(LABELS)}
ID2LABEL = {idx: label for label, idx in LABEL2ID.items()}

# Model & training defaults
MODEL_NAME = "distilroberta-base"
MAX_LEN = 64
LR = 3e-5
EPOCHS = 2
TRAIN_BS = 1
EVAL_BS = 2
SEED = 42

FABSA_TRAIN = DATA_DIR / "train.csv"
FABSA_DEV = DATA_DIR / "dev.csv"
FABSA_TEST = DATA_DIR / "test.csv"

FABSA_TRAIN_PAIRS = DATA_DIR / "train_pairs.csv"
FABSA_DEV_PAIRS = DATA_DIR / "dev_pairs.csv"
FABSA_TEST_PAIRS = DATA_DIR / "test_pairs.csv"


## Utilities


In [None]:
import pandas as pd
from datasets import Dataset


def df_to_hf_dataset(df: pd.DataFrame) -> Dataset:
    """Convert a pandas DataFrame into a HuggingFace Dataset."""
    return Dataset.from_pandas(df, preserve_index=False)


def save_csv(df: pd.DataFrame, path: Path) -> None:
    """Write a DataFrame to CSV after ensuring the parent directory exists."""
    path.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(path, index=False)

## Metrics


In [None]:
import numpy as np
import evaluate as hf_eval

_f1 = hf_eval.load("f1")
_acc = hf_eval.load("accuracy")
_prec = hf_eval.load("precision")
_rec = hf_eval.load("recall")


def hf_classification_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    return {
        "accuracy": _acc.compute(predictions=preds, references=labels)["accuracy"],
        "f1_micro": _f1.compute(predictions=preds, references=labels, average="micro")["f1"],
        "f1_macro": _f1.compute(predictions=preds, references=labels, average="macro")["f1"],
        "precision_macro": _prec.compute(predictions=preds, references=labels, average="macro")["precision"],
        "recall_macro": _rec.compute(predictions=preds, references=labels, average="macro")["recall"],
    }


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading builder script: 0.00B [00:00, ?B/s]

Downloading builder script: 0.00B [00:00, ?B/s]

Downloading builder script: 0.00B [00:00, ?B/s]

Downloading builder script: 0.00B [00:00, ?B/s]

## Data Preparation


In [None]:
import ast
import json
import os
from typing import List, Tuple

import pandas as pd
from sklearn.model_selection import train_test_split

SENT_MAP = {"-1": "negative", "0": "neutral", "1": "positive"}


def _parse_labels(cell) -> List[Tuple[str, str]]:
    """Convert label strings like "['aspect.sentiment']" into (aspect, sentiment)."""
    if cell is None or cell == "" or (isinstance(cell, float) and pd.isna(cell)):
        return []

    if isinstance(cell, list):
        items = cell
    else:
        try:
            items = ast.literal_eval(str(cell))
        except Exception:
            return []

    parsed: List[Tuple[str, str]] = []
    for item in items:
        if not isinstance(item, str):
            continue
        parts = item.split('.')
        if len(parts) < 2:
            continue
        aspect = parts[-2].strip()
        sentiment_code = parts[-1].strip()
        sentiment = SENT_MAP.get(sentiment_code)
        if sentiment is None:
            continue
        parsed.append((aspect, sentiment))
    return parsed


def _primary_label(row) -> str:
    labels = _parse_labels(row.get(LABELS_COL, "")) or []
    if not labels:
        return "NONE"
    aspect, sentiment = labels[0]
    return f"{aspect}|{sentiment}"


def _validate_master_schema(df: pd.DataFrame) -> None:
    required = {ID_COL, TEXT_COL, LABELS_COL}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(
            f"Master FABSA file missing columns: {missing}. "
            f"Expected at least: {sorted(required)}. Got: {list(df.columns)}"
        )


def _ensure_splits_exist(
    train_path: os.PathLike,
    dev_path: os.PathLike,
    test_path: os.PathLike,
    master_path: os.PathLike,
    seed: int = 42,
):
    if all(os.path.exists(p) for p in [train_path, dev_path, test_path]):
        return (
            pd.read_csv(train_path),
            pd.read_csv(dev_path),
            pd.read_csv(test_path),
        )

    if not os.path.exists(master_path):
        raise FileNotFoundError(
            f"FABSA master file not found at {master_path}. "
            f"Expected a CSV with at least: {ID_COL},{TEXT_COL},{LABELS_COL}"
        )

    master_df = pd.read_csv(master_path)
    _validate_master_schema(master_df)

    master_df = master_df.copy()
    primary = master_df.apply(_primary_label, axis=1)
    if "industry" in master_df.columns:
        master_df["_strat"] = primary.astype(str) + "##" + master_df["industry"].astype(str)
    else:
        master_df["_strat"] = primary

    try:
        train_df, temp_df = train_test_split(
            master_df,
            test_size=0.30,
            random_state=seed,
            shuffle=True,
            stratify=master_df["_strat"],
        )
        dev_df, test_df = train_test_split(
            temp_df,
            test_size=(2 / 3),
            random_state=seed,
            shuffle=True,
            stratify=temp_df["_strat"],
        )
    except ValueError:
        train_df, temp_df = train_test_split(
            master_df, test_size=0.30, random_state=seed, shuffle=True
        )
        dev_df, test_df = train_test_split(
            temp_df, test_size=(2 / 3), random_state=seed, shuffle=True
        )

    for name, df in [("train", train_df), ("dev", dev_df), ("test", test_df)]:
        df.drop(columns=["_strat"], errors="ignore").to_csv(
            {"train": train_path, "dev": dev_path, "test": test_path}[name],
            index=False,
        )

    return (
        train_df.drop(columns=["_strat"], errors="ignore"),
        dev_df.drop(columns=["_strat"], errors="ignore"),
        test_df.drop(columns=["_strat"], errors="ignore"),
    )


def load_fabsa_split(
    train_path: os.PathLike = FABSA_TRAIN,
    dev_path: os.PathLike = FABSA_DEV,
    test_path: os.PathLike = FABSA_TEST,
):
    """Load train/dev/test splits if present; otherwise create them from the master file."""
    return _ensure_splits_exist(train_path, dev_path, test_path, FABSA_MASTER, seed=SEED)


def make_sentence_pairs(df_reviews: pd.DataFrame) -> pd.DataFrame:
    rows = []
    for _, review in df_reviews.iterrows():
        gold_pairs = set(_parse_labels(review.get(LABELS_COL, "")))

        for aspect in ASPECTS:
            sentiment = "absent"
            for gold_aspect, gold_sentiment in gold_pairs:
                if gold_aspect == aspect:
                    sentiment = gold_sentiment
                    break

            record = {
                ID_COL: review[ID_COL],
                "text": review[TEXT_COL],
                "aspect": aspect,
                "target_label_str": sentiment,
                "target_label_id": LABEL2ID[sentiment],
            }
            for col in EXTRA_COLS:
                if col in review:
                    record[col] = review[col]
            rows.append(record)
    return pd.DataFrame(rows)


## Model Loader

In [21]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer

from transformers.utils import logging
logging.set_verbosity_error()


def load_model_and_tokenizer(num_labels=len(LABELS)):
    """Load a sequence classifier and tokenizer configured for FABSA labels."""
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=False)
    model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME,
        num_labels=num_labels,
        id2label={i: label for i, label in enumerate(LABELS)},
        label2id={label: i for i, label in enumerate(LABELS)},
    )
    model.gradient_checkpointing_enable()
    model.config.use_cache = False

    return model, tokenizer


## Training Pipeline

In [24]:
import pandas as pd
from datasets import Dataset
from transformers import DataCollatorWithPadding, Trainer, TrainingArguments

def build_hf_datasets(train_pairs: pd.DataFrame, dev_pairs: pd.DataFrame, tokenizer, max_len: int):
    """Tokenize pair DataFrames using the provided tokenizer and return HF datasets."""
    hf_train = Dataset.from_pandas(train_pairs)
    hf_dev = Dataset.from_pandas(dev_pairs)

    hf_train = hf_train.rename_column('target_label_id', 'labels')
    hf_dev = hf_dev.rename_column('target_label_id', 'labels')

    cols_to_remove_train = [c for c in hf_train.column_names if c not in ('labels',)]
    cols_to_remove_dev = [c for c in hf_dev.column_names if c not in ('labels',)]

    def tokenize_batch(batch):
        return tokenizer(
            batch['text'],
            batch['aspect'],
            truncation=True,
            max_length=max_len,
            return_overflowing_tokens=False,
        )

    hf_train = hf_train.map(tokenize_batch, batched=True, remove_columns=cols_to_remove_train)
    hf_dev = hf_dev.map(tokenize_batch, batched=True, remove_columns=cols_to_remove_dev)

    hf_train.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
    hf_dev.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
    return hf_train, hf_dev


def run_training():
    print('[START] Notebook training pipeline')
    print('[STEP] Loading/creating FABSA splits...')
    train_df, dev_df, test_df = load_fabsa_split(FABSA_TRAIN, FABSA_DEV, FABSA_TEST)
    print(f"[INFO] Split sizes -> train: {len(train_df)}, dev: {len(dev_df)}, test: {len(test_df)}")

    print('[STEP] Expanding reviews into sentence–aspect pairs...')
    train_pairs = make_sentence_pairs(train_df)
    dev_pairs = make_sentence_pairs(dev_df)
    print(f"[INFO] Pair sizes -> train: {len(train_pairs)}, dev: {len(dev_pairs)}")

    if train_pairs.empty:
        raise RuntimeError('train_pairs is empty. Check source label formatting.')

    print('[STEP] Saving pair CSVs...')
    save_csv(train_pairs, FABSA_TRAIN_PAIRS)
    save_csv(dev_pairs, FABSA_DEV_PAIRS)

    print('[STEP] Loading tokenizer & model...')
    model, tokenizer = load_model_and_tokenizer()

    print('[STEP] Building tokenized datasets...')
    hf_train, hf_dev = build_hf_datasets(train_pairs, dev_pairs, tokenizer, MAX_LEN)

    print('[STEP] Configuring Trainer...')
    training_args = TrainingArguments(
        output_dir=str(MODEL_DIR),
        learning_rate=LR,
        num_train_epochs=EPOCHS,
        per_device_train_batch_size=TRAIN_BS,
        per_device_eval_batch_size=EVAL_BS,
        gradient_accumulation_steps=8,
        eval_strategy='epoch',
        save_strategy='no',
        load_best_model_at_end=False,
        metric_for_best_model='f1_macro',
        seed=SEED,
        dataloader_num_workers=0,
        dataloader_pin_memory=False,
        logging_steps=10,
        disable_tqdm=False,
        report_to='none',
        use_mps_device=False,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=hf_train,
        eval_dataset=hf_dev,
        tokenizer=tokenizer,
        data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
        compute_metrics=hf_classification_metrics,
    )

    print('[STEP] Starting training...')
    trainer.train()
    print('[STEP] Training complete.')

    print('[STEP] Saving model & tokenizer...')
    trainer.save_model(str(MODEL_DIR))
    tokenizer.save_pretrained(str(MODEL_DIR))
    print('[DONE] Artifacts stored in', MODEL_DIR)

    return {
        'trainer': trainer,
        'train_pairs': train_pairs,
        'dev_pairs': dev_pairs,
        'hf_train': hf_train,
        'hf_dev': hf_dev,
    }

In [None]:

run_training()

[START] Notebook training pipeline
[STEP] Loading/creating FABSA splits...
[INFO] Split sizes -> train: 7401, dev: 1057, test: 2116
[STEP] Expanding reviews into sentence–aspect pairs...
[INFO] Pair sizes -> train: 88812, dev: 12684
[STEP] Saving pair CSVs...
[STEP] Loading tokenizer & model...
[STEP] Building tokenized datasets...


Map:   0%|          | 0/88812 [00:00<?, ? examples/s]

Map:   0%|          | 0/12684 [00:00<?, ? examples/s]

[STEP] Configuring Trainer...


  trainer = Trainer(


[STEP] Starting training...


Epoch,Training Loss,Validation Loss


## Prediction

In [None]:
import torch
import numpy as np
import pandas as pd
from datasets import Dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer


def build_pairs(df: pd.DataFrame) -> pd.DataFrame:
    """Expand each review into all aspect combinations for scoring."""
    rows = []
    for _, row in df.iterrows():
        for aspect in ASPECTS:
            rows.append({
                ID_COL: row[ID_COL],
                'text': row[TEXT_COL],
                'aspect': aspect,
            })
    return pd.DataFrame(rows)


def run_prediction(input_csv: str, output_csv: str = str(PREDS_DIR / 'review_preds.csv')) -> pd.DataFrame:
    """Load raw reviews, score all aspect pairs, and write predictions."""
    df = pd.read_csv(input_csv)
    pairs = build_pairs(df)

    tokenizer = AutoTokenizer.from_pretrained(str(MODEL_DIR))
    model = AutoModelForSequenceClassification.from_pretrained(str(MODEL_DIR))

    dataset = Dataset.from_pandas(pairs)

    def tokenize_batch(batch):
        return tokenizer(
            batch['text'],
            batch['aspect'],
            truncation=True,
            max_length=MAX_LEN,
        )

    dataset = dataset.map(tokenize_batch, batched=True)
    dataset.set_format(type='torch', columns=['input_ids', 'attention_mask'])

    with torch.no_grad():
        logits = model(**{k: dataset[k] for k in ['input_ids', 'attention_mask']}).logits
    logits = logits.detach().cpu().numpy()

    probs = logits - logits.max(axis=1, keepdims=True)
    probs = np.exp(probs)
    probs = probs / probs.sum(axis=1, keepdims=True)
    pred_ids = logits.argmax(axis=1)

    output_df = pairs.copy()
    output_df['pred_label_id'] = pred_ids
    id2label = model.config.id2label
    output_df['pred_label_str'] = [id2label[int(idx)] for idx in pred_ids]
    output_df['prob'] = probs[np.arange(len(probs)), pred_ids]

    save_csv(output_df, Path(output_csv))
    print(f'Saved predictions to: {output_csv}')
    return output_df