In [1]:
import os
import random
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd
import yaml

from sklearn.model_selection import train_test_split

import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn import CrossEntropyLoss

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

from transformers import AutoModelForSequenceClassification, AutoTokenizer

from nltk.tokenize import sent_tokenize
import nltk

nltk.download('punkt_tab')


[nltk_data] Downloading package punkt_tab to /home/carmine-
[nltk_data]     landolfi/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [2]:
df = pd.read_csv("../Datasets/cleaned_dataset.csv",encoding='utf-8')

p = df.groupby("primary_category")


In [3]:
@dataclass
class RobustnessConfig:
    """Configuration loaded from YAML for model and tokenizer."""
     
    data_path: str
    model_path: str
    tokenizer_name: str
    name_model: str
    k_per_class: int
    min_sent_len: int
    n_sentences_per_insert: int
    max_length: int
    results_csv: str
    batch_size: int
    data_target_path: str


In [4]:
class SentencePoolBuilder:
    """
    Build a per-class sentence pool from a labeled dataframe using TF-IDF ranking.

    The pool is used to contaminate other datasets by sampling representative sentences
    from different classes.

    Parameters
    ----------
    df_source : pd.DataFrame
        Source dataframe used to extract sentences (this is your in-memory `df`).
    text_col : str
        Column name containing texts (e.g., "abstract_clean").
    label_col : str
        Column name containing labels (e.g., "primary_category").
    k_per_class : int
        Number of top-ranked sentences (by TF-IDF) to keep per class.
    min_sent_len : int
        Minimum sentence length (in characters) to consider.
    """

    def __init__(
        self,
        df_source: pd.DataFrame,
        text_col: str,
        label_col: str,
        k_per_class: int = 200,
        min_sent_len: int = 20,
    ) -> None:
        self.df_source = df_source
        self.text_col = text_col
        self.label_col = label_col
        self.k_per_class = k_per_class
        self.min_sent_len = min_sent_len

    def _split_sentences(self, text: str) -> List[str]:
        """Split a text into sentences and filter very short ones."""
        if not isinstance(text, str):  # puoi rimuovere
            return []
        sents = [s.strip() for s in sent_tokenize(text) if isinstance(s, str)]
        return [s for s in sents if len(s) >= self.min_sent_len]

    def build_pool(self) -> Dict[str, List[str]]:
        """
        Build the sentence pool using TF-IDF ranking computed across all sentences.

        Returns
        -------
        Dict[str, List[str]]
            Mapping: class_label -> list of top-k sentences.
        """
        # Collect sentences with their class
        all_sentences: List[str] = []
        sent_labels: List[str] = []

        grouped = self.df_source.groupby(self.label_col)
        per_class_sentences: Dict[str, List[str]] = {}

        for label, group in grouped:
            sents_label: List[str] = []
            for text in group[self.text_col].tolist():
                sents = self._split_sentences(text)
                sents_label.extend(sents)
                
            # Deduplicate sentences within class
            sents_label = list(dict.fromkeys(sents_label))
            per_class_sentences[label] = sents_label
            all_sentences.extend(sents_label)
            sent_labels.extend([label] * len(sents_label))


        # if no sentences, return empty pools
        if not all_sentences:
            return {c: [] for c in per_class_sentences.keys()}

        # TF-IDF over all sentences
        tfidf = TfidfVectorizer(
            ngram_range=(1, 2),
            max_df=0.9,
            min_df=2,
            max_features=50000,
        )
        X = tfidf.fit_transform(all_sentences)

        # Sentence score: mean TF-IDF weight (simple, effective)
        scores = np.asarray(X.mean(axis=1)).ravel()

        # Rank sentences per class by their score
        pool: Dict[str, List[str]] = {}
        # Build index per class
        class_indices: Dict[str, List[int]] = {}
        for idx, lab in enumerate(sent_labels):
            class_indices.setdefault(lab, []).append(idx)

        

        for lab, idxs in class_indices.items():
            # Sort class sentences by score desc
            sorted_local = sorted(idxs, key=lambda i: scores[i], reverse=True)
            top_idxs = sorted_local[: self.k_per_class]
            top_sents = [all_sentences[i] for i in top_idxs]
            pool[lab] = top_sents

        return pool


In [5]:
class AbstractContaminator:
    """
    Contaminate a target dataframe by inserting sentences from other classes.

    The contamination is applied to a fraction alpha of rows (uniformly sampled).
    For each contaminated row, we choose one *different* class at random and insert
    `n_sentences` sampled sentences from that class at the end of a randomly chosen
    sentence within the abstract (not necessarily at the very end).

    Parameters
    ----------
    df_target : pd.DataFrame
        The dataframe to contaminate (this is the CSV at ../Datasets/dataset_remaining.csv).
    text_col : str
        Text column name in df_target (e.g., "abstract_clean").
    label_col : str
        Label column name in df_target (e.g., "primary_category").
    pool : Dict[str, List[str]]
        The sentence pool built from the source df.
    """

    def __init__(
        self,
        df_target: pd.DataFrame,
        text_col: str,
        label_col: str,
        pool: Dict[str, List[str]],
    ) -> None:
        self.df_target = df_target.copy()
        self.text_col = text_col
        self.label_col = label_col
        self.pool = pool
        self.rng = random.Random(42)

    def _insert_sentences_into_text(self, text: str, contam_sents: List[str]) -> str:
        """Insert contamination at the end of one random sentence within the text."""
        sents = [s.strip() for s in sent_tokenize(text)]
        if not sents:
            # If tokenization fails, just append at the end
            return (text or "") + " " + " ".join(contam_sents)

        insert_idx = self.rng.randrange(0, len(sents))
        sents[insert_idx] = (sents[insert_idx].rstrip() + " " + " ".join(contam_sents)).strip()
        return " ".join(sents)

    def contaminate(self, alpha: float, n_sentences: int = 2) -> pd.DataFrame:
        """
        Apply contamination to a fraction alpha of rows (0 < alpha <= 1).

        Parameters
        ----------
        alpha : float
            Fraction of rows to contaminate.
        n_sentences : int
            Number of sentences to insert from a different class.

        Returns
        -------
        pd.DataFrame
            A copy of the dataframe with contaminated texts in the same `text_col`.
            Adds a boolean column 'contaminated'.
        """
        df_out = self.df_target.copy()
        df_out["contaminated"] = False

        n = len(df_out)
        k = max(1, int(round(alpha * n)))
        candidate_indices = list(range(n))
        self.rng.shuffle(candidate_indices)
        to_contam = set(candidate_indices[:k])

        labels_available = [lab for lab, sents in self.pool.items() if len(sents) > 0]
        label_set = set(labels_available)

        for idx, row in df_out.iterrows():
            if idx not in to_contam:
                continue

            cur_label = row[self.label_col]
            # Choose a different label that has sentences
            choices = list(label_set - {cur_label})
            if not choices:
                # No valid contamination class available
                continue
            chosen_label = self.rng.choice(choices)
            source_sents = self.pool.get(chosen_label, [])

            if len(source_sents) == 0:
                continue

            k_sents = min(n_sentences, len(source_sents))
            contam_sents = self.rng.sample(source_sents, k_sents)

            text = row[self.text_col]
            if not isinstance(text, str) or not text.strip():
                # Skip if text is empty
                continue

            new_text = self._insert_sentences_into_text(text, contam_sents)
            df_out.at[idx, self.text_col] = new_text
            df_out.at[idx, "contaminated"] = True

        return df_out

In [6]:
class SimpleTextDataset(Dataset):
    """Torch dataset for batched encoding/prediction."""

    def __init__(
        self,
        texts: List[str],
        labels: Optional[List[int]] = None,
        tokenizer=None,
        max_length: int = 300,
    ) -> None:
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self) -> int:
        return len(self.texts)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        enc = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt",
        )
        item = {k: v.squeeze(0) for k, v in enc.items()}
        if self.labels is not None:
            item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item


class ModelEvaluator:
    """
    Load a fine-tuned HF model + tokenizer, run predictions, and compute metrics.

    Parameters
    ----------
    model_path : str
        Path to the fine-tuned model (HF format, directory with config + weights).
    tokenizer_name : str
        Name or path of the tokenizer to use.
    device : Optional[str]
        'cuda' or 'cpu'. If None, auto-detect.
    """

    def __init__(self, model_path: str, tokenizer_name: str, device: Optional[str] = None) -> None:
        

        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"

        self.device = torch.device(device)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_path).to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True)

        # Try to get label2id from model config (preferable to match training)
        self.label2id: Dict[str, int] = getattr(self.model.config, "label2id", {}) or {}
        self.id2label: Dict[int, str] = getattr(self.model.config, "id2label", {}) or {}
        # Normalize possible stringified integers
        self.label2id = {str(k): int(v) for k, v in self.label2id.items()} if self.label2id else {}

    def _encode_labels(
        self, labels_str: List[str], strict: bool = True
    ) -> Tuple[List[int], Dict[str, int]]:
        """
        Map string labels to ids, using model config if available; else build a mapping.

        If `strict=True` and some labels are missing in the model mapping, raise ValueError.
        """
        if self.label2id:
            missing = [l for l in set(labels_str) if str(l) not in self.label2id]
            if missing and strict:
                raise ValueError(
                    f"Found labels not present in model config label2id: {missing}. "
                    f"Please ensure your evaluation set uses the same label space as training."
                )
            # Map known labels
            label2id_eff = self.label2id.copy()
            # Fallback for any stray label
            next_id = max(label2id_eff.values()) + 1 if label2id_eff else 0
            for l in set(labels_str):
                if str(l) not in label2id_eff:
                    label2id_eff[str(l)] = next_id
                    next_id += 1
        else:
            # Build a mapping from the dataset (sorted for determinism)
            uniq = sorted(set(labels_str))
            label2id_eff = {str(l): i for i, l in enumerate(uniq)}

        y = [label2id_eff[str(l)] for l in labels_str]
        return y, label2id_eff

    def evaluate(
        self,
        df: pd.DataFrame,
        text_col: str,
        label_col: str,
        batch_size: int = 16,
        max_length: int = 512,
        strict_labels: bool = True,
    ) -> Dict[str, float]:
        """
        Tokenize, predict and compute loss/accuracy/f1/precision/recall.

        Returns
        -------
        Dict[str, float]
        """
        texts = df[text_col].astype(str).tolist()
        labels_str = df[label_col].astype(str).tolist()
        y_true, _ = self._encode_labels(labels_str, strict=strict_labels)

        dataset = SimpleTextDataset(texts, y_true, tokenizer=self.tokenizer, max_length=max_length)
        loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

        self.model.eval()
        loss_fn = CrossEntropyLoss()
        total_loss = 0.0
        n_examples = 0
        preds_all: List[int] = []
        trues_all: List[int] = []

        with torch.no_grad():
            for batch in loader:
                input_ids = batch["input_ids"].to(self.device)
                attention_mask = batch["attention_mask"].to(self.device)
                labels = batch["labels"].to(self.device)

                outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                logits = outputs.logits
                loss = loss_fn(logits, labels)

                total_loss += loss.item() * labels.size(0)
                n_examples += labels.size(0)

                preds = torch.argmax(logits, dim=-1)
                preds_all.extend(preds.cpu().tolist())
                trues_all.extend(labels.cpu().tolist())

        avg_loss = total_loss / max(1, n_examples)
        acc = accuracy_score(trues_all, preds_all)
        f1 = f1_score(trues_all, preds_all, average="weighted")
        prec = precision_score(trues_all, preds_all, average="weighted", zero_division=0)
        rec = recall_score(trues_all, preds_all, average="weighted")

        return {
            "loss": float(avg_loss),
            "accuracy": float(acc),
            "f1": float(f1),
            "precision": float(prec),
            "recall": float(rec),
        }




In [7]:
### Initial parameters
Config_yaml_path = "../File_Yaml/Robustness.yaml"  
Alphas = [0.1, 0.2, 0.3]

# 1) Load config
with open(Config_yaml_path, "r") as f:
    cfg_raw = yaml.safe_load(f)
    
cfg = RobustnessConfig(
    data_path = cfg_raw["data_path"],
    model_path=cfg_raw["model_path"],
    tokenizer_name=cfg_raw["tokenizer_name"],
    name_model=cfg_raw["name_model"],
    k_per_class = cfg_raw["top_k"],
    min_sent_len = cfg_raw["min_sent_len"],
    n_sentences_per_insert = cfg_raw["n_sentences_at_time"],
    max_length = cfg_raw["max_length"],
    results_csv=cfg_raw["results_csv"],
    batch_size = cfg_raw["batch_size"],
    data_target_path=cfg_raw["data_target_path"],
)
# load dataet to create a pool of sentences
df = pd.read_csv(cfg.data_path, encoding='utf-8')
text_col = "abstract_clean"
label_col = "primary_category"

# 2) Build TF-IDF sentence pool from the *source* df (already in memory)
pool_builder = SentencePoolBuilder(
    df_source=df,
    text_col= text_col,
    label_col= label_col,
    k_per_class= cfg.k_per_class,
)
sentence_pool = pool_builder.build_pool()

# 3) Load the target dataset 
df_target = pd.read_csv(cfg.data_target_path)

# Split train/val
df_target_train, df_target_val  = train_test_split(
      df_target, test_size=0.2, random_state=42
)

# 4) Prepare contaminator and evaluator
contaminator = AbstractContaminator(
    df_target= df_target_val,
    text_col= text_col,
    label_col= label_col,
    pool= sentence_pool,
)

evaluator = ModelEvaluator(model_path=cfg.model_path, tokenizer_name=cfg.tokenizer_name)

# 5) Loop over alphas, contaminate, evaluate, collect results
results: List[Dict[str, float]] = []

# Evaluate also the clean baseline (alpha = 0.0)
metrics_clean = evaluator.evaluate(df_target_val, text_col= text_col, label_col= label_col,
                                   batch_size= cfg.batch_size, max_length= cfg.max_length)
metrics_clean["model"] = cfg.name_model
metrics_clean["alpha"] = 0.0
results.append(metrics_clean)

for alpha in Alphas:
    df_cont = contaminator.contaminate(alpha=alpha, n_sentences=cfg.n_sentences_per_insert)
    metrics = evaluator.evaluate(df_cont, text_col=text_col, label_col= label_col,
                                 batch_size= cfg.batch_size, max_length= cfg.max_length)
    metrics["model"] = cfg.name_model
    metrics["alpha"] = float(alpha)
    results.append(metrics)

# 6) Save to CSV (append if exists)
os.makedirs(os.path.dirname(cfg.results_csv), exist_ok=True)

results_df = pd.DataFrame(
    results,
    columns=["model", "alpha", "loss", "accuracy", "f1", "precision", "recall"]
)

# Check if file exists
if not os.path.isfile(cfg.results_csv):
    # Create new file with header
    results_df.to_csv(cfg.results_csv, index=False)
else:
    # Append without writing the header again
    results_df.to_csv(cfg.results_csv, mode='a', header=False, index=False)

print(f"Results saved to: {cfg.results_csv}")
print(results_df)

Results saved to: ../Datasets/robustness_contaminated_results.csv
       model  alpha      loss  accuracy        f1  precision    recall
0  Bert_base    0.0  1.188087  0.655016  0.647920   0.650607  0.655016
1  Bert_base    0.1  1.220143  0.648830  0.641815   0.644697  0.648830
2  Bert_base    0.2  1.251844  0.643159  0.636491   0.640023  0.643159
3  Bert_base    0.3  1.277091  0.637901  0.630635   0.633700  0.637901
