# Init

In [1]:
pip install datasets transformers scikit-learn pandas matplotlib spacy torch sentencepiece accelerate

Note: you may need to restart the kernel to use updated packages.


In [2]:
!python3 -m pip install spacy --break-system-packages

Defaulting to user installation because normal site-packages is not writeable


In [3]:
!{sys.executable} -m spacy download en_core_web_sm

/bin/bash: line 1: {sys.executable}: command not found


# Tests

In [4]:
import torch
torch.cuda.is_available()

True

In [5]:
import pandas as pd
from datasets import load_dataset

full_data_path = "/content/drive/MyDrive/datasets/final dataset/final_dataset.jsonl"
test_data_path = "/content/drive/MyDrive/datasets/results/test/test_dataset.jsonl"

del dataset
dataset = load_dataset("csv", data_files="/content/sample_data/california_housing_test.csv")
dataset

NameError: name 'dataset' is not defined

In [None]:
class MyClass:
    def __init__(self, fixed_param, **kwargs):
        # `fixed_param` is a required parameter for the class
        self.fixed_param = fixed_param

        # Call the function from the library, passing kwargs
        print("Calling some_function with the following kwargs:")
        some_function(**kwargs)  # Pass additional kwargs to the function

# Define the external function (for illustration)
def some_function(arg1, arg2, **kwargs):
    print(arg1)
    print(arg2)

# Now, create an instance of the class and pass arguments
obj = MyClass(fixed_param="Important", arg1=42, arg2="hello", arg3=True)


In [None]:
import inspect
from some_library import some_function

class MyClass:
    def __init__(self, fixed_param, **kwargs):
        self.fixed_param = fixed_param

        # Get the signature of some_function
        func_signature = inspect.signature(some_function)

        # Extract the parameter names from the function's signature
        valid_params = func_signature.parameters.keys()

        # Filter the kwargs to only include valid ones
        filtered_kwargs = {key: kwargs[key] for key in kwargs if key in valid_params}

        # Call the function from the library, passing only filtered kwargs
        print(f"Calling some_function with filtered kwargs: {filtered_kwargs}")
        some_function(**filtered_kwargs)

# Define the external function (for illustration purposes)
def some_function(arg1=None, arg2=None):
    print(f"arg1 = {arg1}")
    print(f"arg2 = {arg2}")

# Example usage
obj = MyClass(fixed_param="Important", arg1=42, arg2="hello", arg3="extra")


In [None]:
def save_dataset(dataset, save_path, save_format="csv"):
    """
    Save a Hugging Face dataset to the specified file format.

    Args:
    - dataset (datasets.Dataset): The dataset to save.
    - save_path (str): The path (including filename) where the dataset will be saved.
    - save_format (str): The format to save the dataset. Options are "csv", "json", "parquet", "hf" (default: "csv").
                         "hf" saves the dataset in the Hugging Face native format using `save_to_disk()`.

    Returns:
    - None
    """
    if save_format == "csv":
        dataset.to_csv(f"{save_path}.csv")
        print(f"Dataset saved as CSV at {save_path}.csv")
    elif save_format == "json":
        dataset.to_json(f"{save_path}.json")
        print(f"Dataset saved as JSON at {save_path}.json")
    elif save_format == "parquet":
        dataset.to_parquet(f"{save_path}.parquet")
        print(f"Dataset saved as Parquet at {save_path}.parquet")
    elif save_format == "hf":
        dataset.save_to_disk(save_path)
        print(f"Dataset saved in Hugging Face format at {save_path}")
    else:
        raise ValueError(f"Unsupported save format: {save_format}. Choose from 'csv', 'json', 'parquet', or 'hf'.")


In [None]:
from datasets import Dataset

# Example dataset
data = {
    'text': ["I love this!", "This is bad.", "Not sure about this one."],
    'label': [1, 0, 1]
}
dataset = Dataset.from_dict(data)

# Save dataset as CSV
save_dataset(dataset, save_path="output/my_dataset", save_format="csv")

# Save dataset as JSON
save_dataset(dataset, save_path="output/my_dataset", save_format="json")

# Save dataset as Parquet
save_dataset(dataset, save_path="output/my_dataset", save_format="parquet")

# Save dataset in Hugging Face format
save_dataset(dataset, save_path="output/hf_dataset", save_format="hf")


# Notes
TODO: Tout ce qui est noté avec "NOTE: " doit être enlevé à la release

# Utils

In [None]:
import subprocess
import sys

def check_and_install(package: str) -> None:
    # Check if the package is installed
    package_spec = importlib.util.find_spec(package)
    if package_spec is None:
        # Package not found, install it
        print(f"{package} not found. Installing...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
    else:
        print(f"{package} is already installed.")

# Datasets

In [None]:
from datasets import Dataset

class myDataset():
    def __init__(self, dataset_name=None, custom_data=False, dataset_type=None, local_path=None, **kwargs) -> None:

        if custom_data:
            self.dataset = load_dataset(dataset_type, data_files=test_data_path)
        elif dataset_name:
            self.dataset = load_dataset(dataset_name)
        else:
            raise ValueError("Provide either a dataset name or custom data")

    def save_dataset(self, path="") -> None:
        self.dataset.save_to_disk(path)

    def get_dataset(self, formatted = False) -> Dataset:
        return self.dataset

In [None]:
class bambooDataset(myDataset):
    def __init__(self, local_path=None, **kwargs) -> None:
        super().__init__(dataset_name = "Bamboo", custom_data = False, local_path = local_path)

    #NOTE: Ne pas utiliser, à priori on va uploader tout déjà prêt sur huggingface.
    def format_dataset(self) -> None :
        dataset = getattr(self, 'dataset')
        dataset["input"] = "Title: " + dataset["title"] + ". Content: " + dataset["content"]
        dataset["output"] = dataset["hypothesis"]
        dataset["reference"] = None
        dataset["label"] = dataset["answer"]
        dataset['dataset'] = "bamboo"

        self.formatted_dataset = dataset [["input", "output", "reference", "label", 'dataset']]


# Metrics

## ScoreManager

In [None]:
from sklearn.metrics import (
    f1_score, precision_score, recall_score, accuracy_score,
    balanced_accuracy_score, matthews_corrcoef, cohen_kappa_score,
    log_loss, roc_curve, auc, precision_recall_curve, confusion_matrix
)
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from datasets import DatasetDict, Dataset

class ScoreManager:
    def __init__(self, dataset: Dataset, metric_list: list[str], on_split: str ="test"):
        """
        Initialize the ScoreManager with dataset and metrics to compute.

        Args:
            dataset (dict): The dataset containing 'label' and 'predictions' keys.
            metric_list (list): List of metric names to compute. Supported metrics:
                                ['f1', 'precision', 'recall', 'accuracy',
                                 'balanced_accuracy', 'mcc', 'kappa',
                                 'log_loss', 'roc_values', 'auc',
                                 'confusion_matrix', 'precision_recall_values'].
        """

        # Check if the dataset has splits (is a DatasetDict)
        if isinstance(dataset, DatasetDict):
            if on_split is None:
                raise ValueError("The dataset has splits, but no split was provided. Please specify a split (e.g., 'train', 'test').")
            # Select the provided split
            if on_split in dataset:
                dataset = dataset[on_split]
            else:
                raise ValueError(f"Split '{on_split}' does not exist in the dataset. Available splits: {list(dataset.keys())}")
        # If no splits, use the entire dataset dont need to change anything
        display(dataset)

        self.labels = dataset["label"]
        self.class_scores = dataset["predictions"]
        self.predicted_labels = [self.get_predicted_label(example) for example in dataset]
        self.metric_list = metric_list

        # Initialize metrics attributes
        self.results = {}
        self.calculate_metrics()

        # Calculate and store ROC and precision-recall results
        #self.results['roc_auc'], self.results['roc_values'] = self.calculate_roc()
        #self.results['pr_values'] = self.calculate_precision_recall()

        # Calculate confusion matrix
        #self.results['confusion_matrix'] = self.calculate_confusion_matrix()

    def get_predicted_label(self, example):
        """Get the label with the highest score from predictions."""
        prediction_dict = example["predictions"]
        return max(prediction_dict, key=prediction_dict.get)

    def calculate_metrics(self):
        """Calculate and store the specified metrics."""
        for metric in self.metric_list:
            method_name = f"calculate_{metric}"
            if hasattr(self, method_name):
                self.results[metric] = getattr(self, method_name)()
            else:
                raise ValueError(f"Unsupported metric: {metric}")

    def calculate_f1(self):
        """Calculate F1 score."""
        return f1_score(self.labels, self.predicted_labels, average='weighted')

    def calculate_precision(self):
        """Calculate precision score."""
        return precision_score(self.labels, self.predicted_labels, average='weighted')

    def calculate_recall(self):
        """Calculate recall score."""
        return recall_score(self.labels, self.predicted_labels, average='weighted')

    def calculate_accuracy(self):
        """Calculate accuracy score."""
        return accuracy_score(self.labels, self.predicted_labels)

    def calculate_balanced_accuracy(self):
        """Calculate balanced accuracy score."""
        return balanced_accuracy_score(self.labels, self.predicted_labels)

    def calculate_mcc(self):
        """Calculate Matthews Correlation Coefficient."""
        return matthews_corrcoef(self.labels, self.predicted_labels)

    def calculate_kappa(self):
        """Calculate Cohen's Kappa score."""
        return cohen_kappa_score(self.labels, self.predicted_labels)

    def calculate_log_loss(self):
        """Calculate Log Loss (Cross-Entropy Loss)."""
        y_pred_proba = [list(example.values()) for example in self.class_scores]
        return log_loss(self.labels, y_pred_proba)

    def calculate_roc_values(self):
        """Calculate ROC curve and AUC."""
        y_true = np.array([1 if label == 'HALL' else 0 for label in self.labels])
        y_scores = [example['HALL'] for example in self.class_scores]

        fpr, tpr, thresholds = roc_curve(y_true, y_scores)

        return {'fpr': fpr, 'tpr': tpr, 'thresholds': thresholds}

    def calculate_auc(self):
        """Calculate ROC curve and AUC."""
        y_true = np.array([1 if label == 'HALL' else 0 for label in self.labels])
        y_scores = [example['HALL'] for example in self.class_scores]

        fpr, tpr, thresholds = roc_curve(y_true, y_scores)
        roc_auc = auc(fpr, tpr)

        return roc_auc

    def calculate_precision_recall_values(self):
        """Calculate Precision-Recall curve values."""
        y_true = np.array([1 if label == 'HALL' else 0 for label in self.labels])
        y_scores = [example['HALL'] for example in self.class_scores]

        precision, recall, thresholds = precision_recall_curve(y_true, y_scores)

        return {'precision': precision, 'recall': recall, 'thresholds': thresholds}

    def calculate_confusion_matrix(self):
        """Calculate confusion matrix."""
        cm = confusion_matrix(self.labels, self.predicted_labels, labels=['HALL', 'NOHALL'])
        return cm

    def plot_confusion_matrix(self):
        """Plot the confusion matrix."""
        if "confusion_matrix" not in self.results:
            self.results["confusion_matrix"] = self.calculate_confusion_matrix()

        cm = self.results['confusion_matrix']
        plt.figure()
        plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        plt.title('Confusion Matrix')
        plt.colorbar()
        tick_marks = np.arange(len(['HALL', 'NOHALL']))
        plt.xticks(tick_marks, ['HALL', 'NOHALL'])
        plt.yticks(tick_marks, ['HALL', 'NOHALL'])

        threshold = cm.max() / 2
        for i, j in np.ndindex(cm.shape):
            plt.text(j, i, format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > threshold else "black")

        plt.ylabel('True label')
        plt.xlabel('Predicted label')
        plt.tight_layout()
        return plt

    def plot_metrics_bar_chart(self, metrics_to_plot):
        """Plot a bar chart for specified metrics."""
        values = [self.results[metric] for metric in metrics_to_plot if metric in self.results]

        plt.figure(figsize=(10, 6))
        bars = plt.bar(metrics_to_plot, values, color=plt.cm.viridis(np.linspace(0, 1, len(values))))

        plt.xlabel('Metrics')
        plt.ylabel('Values')
        plt.title('Metrics Bar Chart')
        plt.xticks(rotation=45, ha='right')
        plt.ylim(0, 1)  # Adjust y-axis as necessary

        # Adding value labels on top of the bars
        for bar in bars:
            yval = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2, yval, round(yval, 2), ha='center', va='bottom')

        plt.tight_layout()
        return plt

    def plot_roc_curve(self):
        """Plot ROC curve."""
        if "roc_values" not in self.results:
            self.results["roc_values"] = self.calculate_roc_values()
        if "roc_auc" not in self.results:
            self.results["roc_auc"] = self.calculate_auc()

        roc_values = self.results['roc_values']
        plt.figure()
        plt.plot(roc_values['fpr'], roc_values['tpr'], color='blue', label='ROC curve (area = {:.2f})'.format(self.results['roc_auc']))
        plt.plot([0, 1], [0, 1], color='red', linestyle='--')
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic')
        plt.legend(loc='lower right')
        return plt

    def plot_precision_recall_curve(self):
        """Plot Precision-Recall curve."""
        if "pr_values" not in self.results:
            self.results["roc_values"] = self.calculate_precision_recall_values()

        pr_values = self.results['precision_recall_values']
        plt.figure()
        plt.plot(pr_values['recall'], pr_values['precision'], color='blue')
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.title('Precision-Recall Curve')
        return plt

    def plot(self, plot_list, metrics_bar=None, save_plots=False, output_path=None):
        """Plot specified metrics and visualizations from the provided list.

        Args:
            plot_list (list): List of plot types to generate.
            metrics_bar (list, optional): Metrics to plot in the bar chart.
            save_plots (bool, optional): Whether to save the plots to files.
            output_path (str, optional): Path to save plots if save_plots is True.
        """
        for plot_type in plot_list:
            if plot_type == 'metrics_bar' and metrics_bar:
                bar_plot = self.plot_metrics_bar_chart(metrics_bar)
                if save_plots and output_path:
                    bar_plot.savefig(f'{output_path}/metrics_bar_chart.png')
                else:
                    plt.show()
            elif plot_type == 'roc_curve':
                roc_plot = self.plot_roc_curve()
                if save_plots and output_path:
                    roc_plot.savefig(f'{output_path}/roc_curve.png')
                else:
                    plt.show()
            elif plot_type == 'precision_recall_curve':
                pr_plot = self.plot_precision_recall_curve()
                if save_plots and output_path:
                    pr_plot.savefig(f'{output_path}/precision_recall_curve.png')
                else:
                    plt.show()
            elif plot_type == 'confusion_matrix':
                cm_plot = self.plot_confusion_matrix()
                if save_plots and output_path:
                    cm_plot.savefig(f'{output_path}/confusion_matrix.png')
                else:
                    plt.show()
            else:
                raise ValueError(f"Unsupported plot type: {plot_type}")

## Mymetric

In [None]:
from sklearn.metrics import f1_score, roc_auc_score
from transformers import EvalPrediction
import importlib.util
from datasets import Dataset
import datasets


class myMetric:
    def __init__(self, metric_name: str = None, custom_metric: str = None) -> None:
        return None

    def evaluate_dataset(self) -> None :
        return None

    def save_results(self, folder_path: str, filename: str = "evaluated_dataset", format: str = "hf", **kwargs) -> None:
        # Create folder if it doesn't exist
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)

        # Automatically append the appropriate file extension based on format
        if format == "hf":
            file_path = folder_path  # No filename needed for Hugging Face format
        elif format == "csv":
            file_path = os.path.join(folder_path, f"{filename}.csv")
        elif format == "json":
            file_path = os.path.join(folder_path, f"{filename}.json")
        else:
            raise ValueError(f"Unsupported format: {format}")

        # Save the dataset in the specified format
        if format == "hf":
            self.evaluated_dataset.save_to_disk(folder_path)
            print(f"Dataset saved in Hugging Face format at {folder_path}")
        elif format == "csv":
            self.evaluated_dataset.to_csv(file_path, **kwargs)
            print(f"Dataset saved as CSV at {file_path}")
        elif format == "json":
            self.evaluated_dataset.to_json(file_path, **kwargs)
            print(f"Dataset saved as JSON at {file_path}")


    def benchmark_scores(self, metric_list: list[str] = []):
        if not hasattr(self, "evaluated_dataset"):
            print("no dataset to score")
        scores = scoreManager(dataset=self.evaluated_dataset, metric_list=[])
        return scores

## transformer_model

In [None]:
from transformers import T5ForConditionalGeneration, PreTrainedModel
from transformers import T5Tokenizer, PreTrainedTokenizer
from transformers import pipeline
from datasets import Dataset
import datasets
import torch
import os


#------------------------------------
# General class for all metrics that "just" use a fine tuned transformer
#  - Include defining model, loading it, providing it to user, running it on dataset
#------------------------------------
class transformer_model(myMetric):
    def __init__(self, model_path: str, tokenizer_path: str, metric_name: str = "custom", custom_metric: bool = True) -> None:
        super().__init__(metric_name=metric_name, custom_metric=custom_metric)

        self.model_path = model_path
        self.tokenizer_path = tokenizer_path

    #------------------------------------
    # Getter, Loader, Savers
    #------------------------------------
    def load_model(self, save_folder: str) -> None:
        self.model = PreTrainedModel.from_pretrained(self.model_path)


    def load_tokenizer(self, save_folder: str) -> None:
        self.tokenizer.save_pretrained(save_folder)


    def get_model(self) -> PreTrainedModel:
        if not hasattr(self, "model"):
            self.pipeline = self.load_model()
        return self.model


    def get_tokenizer(self) -> PreTrainedTokenizer:
        if not hasattr(self, "model"):
            self.pipeline = self.load_tokenizer()
        return self.tokenizer


    def save_tokenizer(self, save_folder: str) -> None:
        if not hasattr(self, "model"):
            self.pipeline = self.load_tokenizer()

        self.tokenizer.save_pretrained(save_folder)


    def save_model(self, save_folder: str) -> None:
        if not hasattr(self, "model"):
            self.model = self.load_model()

        self.model.save_pretrained(save_folder)


    #------------------------------------
    # Pipeline functions
    #------------------------------------

    #creating pipeline, not done at init to save on space if user only want to save locally or use his own pipeline
    def create_pipeline(self, **kwargs) -> pipeline :
       self.pipeline = pipeline(model=self.model_path, tokenizer=self.tokenizer_path, **kwargs)
       return self.pipeline

    #Execution of the pipeline at row (or batch) level
    #NOTE: Possibly overloaded in metric-specific class
    def run_pipeline(self, batch, source_col, gen_col, top_k, function_to_apply, truncation, padding):
        inputs = [[[source, gen]] for source, gen in zip(batch[source_col], batch[gen_col])]

        results = self.pipeline(inputs, top_k=top_k, truncation=truncation, padding=padding, function_to_apply=function_to_apply)
        return {"predictions": results}

    #Execution of the pipeline on whole dataset
    #NOTE: NOT overloaded in metric-specific class (ideally)
    def evaluate_dataset(self,
        dataset,
        source_col="text",
        gen_col="gen",
        top_k=None,
        truncation=False,
        padding=False,
        function_to_apply=None,
        save_result_dataset_folder_path=None,
        save_result_dataset_format="hf",
        map_kwargs=None
    ):

        if map_kwargs is None:
            map_kwargs = {}

        #if user has not set these parameters inside kwargs then use our default params:
        map_kwargs.setdefault("batched", False)
        map_kwargs.setdefault("batch_size", 10)

        #if not already created, init pipeline
        if not hasattr(self, "pipeline"):
            self.pipeline = self.create_pipeline()

        self.evaluated_dataset = dataset.map(lambda batch: self.run_pipeline(batch=batch,
                                                                             source_col=source_col,
                                                                             gen_col=gen_col,
                                                                             top_k=top_k,
                                                                             truncation=truncation,
                                                                             padding=padding,
                                                                             function_to_apply=function_to_apply), **map_kwargs)

        if save_result_dataset_folder_path:
            self.save_results(folder_path=save_result_dataset_folder_path, format=save_result_dataset_format)

        return self.evaluated_dataset

## TrueTeacher

In [None]:
from transformers import T5ForConditionalGeneration, PreTrainedModel
from transformers import T5Tokenizer, PreTrainedTokenizer
from transformers import pipeline
from datasets import Dataset
import datasets

class trueTeacher(transformer_model):
    def __init__(self, model_path="google/t5_11b_trueteacher_and_anli", tokenizer_path="google/t5_11b_trueteacher_and_anli") -> None:
        super().__init__(metric_name="trueTeacher",
                         model_path="google/t5_11b_trueteacher_and_anli",
                         tokenizer_path="google/t5_11b_trueteacher_and_anli",
                         custom_metric = False)

    def load_model(self, model_path) -> None:
        self.model = T5ForConditionalGeneration.from_pretrained(model_path)

    def load_tokenizer(self, tokenizer_path) -> None:
        self.tokenizer = T5Tokenizer.from_pretrained(tokenizer_path)


## FactCC

In [None]:
from transformers import T5ForConditionalGeneration, PreTrainedModel
from transformers import T5Tokenizer, PreTrainedTokenizer
from transformers import pipeline
from datasets import Dataset
import datasets

class factcc(transformer_model):
    def __init__(self, model_path="manueldeprada/FactCC", tokenizer_path="manueldeprada/FactCC") -> None:
        super().__init__(metric_name="factCC", model_path="manueldeprada/FactCC", tokenizer_path="manueldeprada/FactCC", custom_metric = False)


    def convert_label(self, label):
        #We get either 'INCORRECT' or 'CORRECT'
        #'INCORRECT' means there is a hal and 'CORRECT' means there is none
        #   INCORRECT -> HALL
        #   CORRECT   -> NOHALL
        if label == 'INCORRECT':
            return "HALL"
        else:
            return "NOHALL"

    def format_result(self, result):
        #Convert from:
        #   [{'label': 'CORRECT', 'score': 0.9...},
        #    {'label': 'INCORRECT', 'score': 0.1...}]
        #To:
        #   {'CORRECT': 0.9719441533088684, 'INCORRECT': 0.02805584855377674}


        # Check if the result is batched (a list of lists) or not
        if isinstance(result["predictions"][0], list):
            # If batched, iterate over each batch
            return {
                "predictions": [
                    {self.convert_label(label["label"]): label["score"] for label in batch}
                    for batch in result["predictions"]
                ]
            }
        else:
            # If not batched, process directly
            return {
                "predictions": [
                    {self.convert_label(label["label"]): label["score"] for label in result["predictions"]}
                ]
            }

    #Execution of the pipeline at row (or batch) level
    # Adding a formatting
    def run_pipeline(self, batch, source_col, gen_col, top_k, function_to_apply, truncation, padding):
        result = super().run_pipeline(batch, source_col, gen_col, top_k, function_to_apply, truncation, padding)

        return self.format_result(result)

## QA based

In [None]:
from transformers import T5ForConditionalGeneration, PreTrainedModel
from transformers import T5Tokenizer, AutoTokenizer
from transformers import pipeline
from datasets import Dataset
import datasets
import torch
import os


#------------------------------------
# General class for all metrics that use QG & QA methods
#------------------------------------
class qgqa_based_metric(myMetric):
    def __init__(self,
                 qg_model_path: str,
                 qa_model_path: str,
                 qg_tokenizer_path: str,
                 qa_tokenizer_path: str = None,
                 metric_name="custom",
                 custom_metric=True,
                 qg_prefix="generate questions: ",
                 qg_separator="<sep>"):
        super().__init__(metric_name=metric_name, custom_metric=custom_metric)

        self.qg_model_path = qg_model_path  # Path to the Question Generation model
        self.qa_model_path = qa_model_path  # Path to the Question Answering model
        self.qg_tokenizer_path = qg_tokenizer_path  # Path to the QG Tokenizer
        self.qa_tokenizer_path = qa_tokenizer_path or qg_tokenizer_path  # Use QG Tokenizer if QA Tokenizer not provided
        self.qg_model = None
        self.qa_model = None
        self.qg_tokenizer = None
        self.qa_tokenizer = None
        self.qg_prefix = qg_prefix
        self.qg_separator = qg_separator

#------------------------------------
# Getter, Loader, Savers for QG, QA, and their respective Tokenizers
#------------------------------------

    def load_qg_model(self) -> None:
        self.qg_model = PreTrainedModel.from_pretrained(self.qg_model_path)

    def load_qa_model(self) -> None:
        self.qa_model = PreTrainedModel.from_pretrained(self.qa_model_path)

    def load_qg_tokenizer(self) -> None:
        self.qg_tokenizer = AutoTokenizer.from_pretrained(self.qg_tokenizer_path)

    def load_qa_tokenizer(self) -> None:
        self.qa_tokenizer = AutoTokenizer.from_pretrained(self.qa_tokenizer_path)

    def get_qg_model(self) -> PreTrainedModel:
        if not self.qg_model:
            self.load_qg_model()
        return self.qg_model

    def get_qa_model(self) -> PreTrainedModel:
        if not self.qa_model:
            self.load_qa_model()
        return self.qa_model

    def get_qg_tokenizer(self) -> AutoTokenizer:
        if not self.qg_tokenizer:
            self.load_qg_tokenizer()
        return self.qg_tokenizer

    def get_qa_tokenizer(self) -> AutoTokenizer:
        if not self.qa_tokenizer:
            self.load_qa_tokenizer()
        return self.qa_tokenizer

    def save_qg_model(self, save_folder: str) -> None:
        if not self.qg_model:
            self.load_qg_model()
        self.qg_model.save_pretrained(save_folder)

    def save_qa_model(self, save_folder: str) -> None:
        if not self.qa_model:
            self.load_qa_model()
        self.qa_model.save_pretrained(save_folder)

    def save_qg_tokenizer(self, save_folder: str) -> None:
        if not self.qg_tokenizer:
            self.load_qg_tokenizer()
        self.qg_tokenizer.save_pretrained(save_folder)

    def save_qa_tokenizer(self, save_folder: str) -> None:
        if not self.qa_tokenizer:
            self.load_qa_tokenizer()
        self.qa_tokenizer.save_pretrained(save_folder)


#------------------------------------
# Pipeline functions
#------------------------------------

    #creating pipeline, not done at init to save on space if user only want to save locally or use his own pipeline
    def create_qg_pipeline(self, **kwargs) -> pipeline :
        self.qg_pipeline = pipeline(model=self.qg_model_path, tokenizer=self.qg_tokenizer_path, truncation=True, max_length=512, **kwargs)
        return self.qg_pipeline
    
    #creating pipeline, not done at init to save on space if user only want to save locally or use his own pipeline
    def create_qa_pipeline(self, **kwargs) -> pipeline :
       self.qa_pipeline = pipeline(model=self.qa_model_path, tokenizer=self.qa_tokenizer_path, truncation=True, max_length=512, **kwargs)

       return self.qa_pipeline

    #creating pipeline, not done at init to save on space if user only want to save locally or use his own pipeline
    def create_pipeline(self, **kwargs) -> pipeline :
       self.create_qa_pipeline(**kwargs)
       self.create_qg_pipeline(**kwargs)
       return {"qa_pipeline":self.qa_pipeline,"qg_pipeline":self.qg_pipeline}

    # Define the processing function for the question generation input
    # Just adding the prefix
    def format_qg_input(self, batch):
        batch = [self.qg_prefix + text for text in batch]
        return batch

    # Define the processing function for the question generation output
    # (mostly just to deal with the separator and empty questions)
    # Just adding the prefix
    def format_qg_output(self, questions):
        questions = questions.split(self.qg_separator)
        questions = [q for q in questions if q != ""]

        # Remove duplicates while maintaining order
        seen = set()
        questions = [seen.add(item) or item for item in questions if item not in seen]

        return questions

    #Default function to use to get a score when comparing the answers from the source and the generation
    def compute_token_f1(self, answers, gen_col, src_col):
        f1_results = []

        for answer in answers:
            src_answer = answer[src_col]
            gen_answer = answer[gen_col]

            # Tokenize the answers
            # answers generated with the qa tokenizers so we use this tokenizer here
            tokens_src = set(self.qa_tokenizer.tokenize(src_answer))
            tokens_gen = set(self.qa_tokenizer.tokenize(gen_answer))

            # Calculate precision, recall, and F1 score
            common_tokens = tokens_src & tokens_gen
            precision = len(common_tokens) / len(tokens_src) if tokens_src else 0
            recall = len(common_tokens) / len(tokens_gen) if tokens_gen else 0
            f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

            f1_results.append(f1_score)

        return f1_results

    #Getting the score of a single row given the answers generated
    def score_answers(self, answers, gen_col, src_col):
        scores = self.compute_token_f1(answers, gen_col, src_col)

        # Calculate the average score
        if scores:  # Check if the list is not empty
            average_score = sum(scores) / len(scores)
        else:
            average_score = 0
        return average_score

    #Creating and running the pipeline for generating the answer (either from generation or source, passed as context) given the question generated
    def generate_answers(self, question, context):
        answer = self.qa_pipeline(f"question: {question}  context: {context}")[0]["generated_text"]
        return answer

    def generate_questions(self, batch, qg_pipeline_call_args={}):

        #Format to add a prefix ex: "generate question" and only keep the source col
        input = self.format_qg_input(batch)

        #questions = self.qg_pipeline(input, max_new_tokens=max_new_tokens, truncation=truncation)
        questions = self.qg_pipeline(input, **qg_pipeline_call_args)

        #Format the recieved questions ex:
        #[["what does the fox say?<sep>who let the dogs out?<sep>Who you gonna call?"],["Why did the chicken cross the road?<sep>knock knock whos there?"]]
        #[["what does the fox say?","who let the dogs out?","Who you gonna call?"],["Why did the chicken cross the road?","knock knock whos there?"]]
        #the comprehension is for dealing with each line separately, format_qg_output does the formatting and is the one which should be overridden if needed
        questions = [self.format_qg_output(output["generated_text"]) for output in questions]

        return questions

    #Execution of the pipeline at row (or batch) level
    #NOTE: Possibly overloaded in metric-specific class
    def run_pipeline(self, batch, batched, gen_col, src_col, qg_pipeline_call_args, keep_questions, keep_answers):

        # If input is not batched it is not a dict of list
        # Changing it to a list to homogenize rest of code
        # Check if the input is batched (a list of items) or not
        if not batched:
            batch = {key: [value] for key, value in batch.items()}

        #Run the question generation model on input
        src_questions = self.generate_questions(batch[src_col], qg_pipeline_call_args)

        #Run the question answering model on each question
        #Do it in the context of the source, and of the generation
        all_answers_batch = [
                                [
                                    {src_col: self.generate_answers(question, src_col),
                                    gen_col: self.generate_answers(question, gen_col)}
                                for question in row]
                            for row in src_questions]

        #Give a score to each answer when comparing it between contexts (source/generation) in the same line (default: token-level F1)
        #For each lines, group the scores of all it's answers (default: Average)
        score = [self.score_answers(answers, gen_col, src_col) for answers in all_answers_batch]


        #If it is not batched then I need to send only a str not a list of str
        if not batched:
            score = score[0]

        output={"scores": score}

        if keep_questions:
            output["questions"]=src_questions
        if keep_answers:
            output["answers"]=all_answers_batch

        torch.cuda.empty_cache()
        return output

    #Execution of the pipeline on whole dataset
    #NOTE: NOT overloaded in metric-specific class (ideally)
    def evaluate_dataset(self,
        dataset,
        source_col="src",
        gen_col="text",
        keep_questions=False,
        keep_answers=False,
        top_k=None,
        qg_pipeline_call_args={},
        padding=False,
        function_to_apply=None,
        save_result_dataset_folder_path=None,
        save_result_dataset_format="hf",
        map_kwargs=None
    ):

        if map_kwargs is None:
            map_kwargs = {}

        #if user has not set these parameters inside kwargs then use our default params:
        map_kwargs.setdefault("batched", False)
        map_kwargs.setdefault("batch_size", 10)

        #if not already created, init qg & qa pipelines
        if not hasattr(self, "qg_pipeline"):
            self.pipeline = self.create_qg_pipeline()

        if not hasattr(self, "qa_pipeline"):
            self.pipeline = self.create_qa_pipeline()

        #we will need the qa tokenizer to compute differences between answer for the source and the generation
        #we check that it is not loaded and if not we load
        if not self.qa_tokenizer:
            self.load_qa_tokenizer()

        
        with torch.no_grad():
            self.evaluated_dataset = dataset.map(lambda batch: self.run_pipeline(batch=batch,
                                                                             src_col=source_col,
                                                                             gen_col=gen_col,
                                                                             qg_pipeline_call_args=qg_pipeline_call_args,
                                                                             batched=map_kwargs["batched"],
                                                                             keep_questions=keep_questions,
                                                                             keep_answers=keep_answers,
                                                                             ), **map_kwargs)

        torch.cuda.empty_cache()
        
        if save_result_dataset_folder_path:
            self.save_results(folder_path=save_result_dataset_folder_path, format=save_result_dataset_format)

        return self.evaluated_dataset


In [None]:
#Reproduces the method highlited in QAGS but with other models

class qags(qgqa_based_metric):
    def __init__(self) -> None:
        super().__init__(qg_model_path="valhalla/t5-small-e2e-qg",
                        qa_model_path= "valhalla/t5-small-qa-qg-hl",
                        qg_tokenizer_path= "valhalla/t5-small-e2e-qg",
                        qa_tokenizer_path= "valhalla/t5-small-qa-qg-hl",
                        metric_name="qags",
                        custom_metric=False,
                        qg_prefix="generate questions: ",
                        qg_separator="<sep>")

In [None]:
import spacy

#Reproduces the method highlited in FEQA but with other models

class feqa(qgqa_based_metric):
    def __init__(self) -> None:
        super().__init__(qg_model_path="valhalla/t5-base-qg-hl",
                        qa_model_path= "valhalla/t5-small-qa-qg-hl",
                        qg_tokenizer_path= "valhalla/t5-base-qg-hl",
                        qa_tokenizer_path= "valhalla/t5-small-qa-qg-hl",
                        metric_name="feqa",
                        custom_metric=False,
                        qg_prefix="generate questions: ",
                        qg_separator="<sep>")

        self.highlight_token="<hl>"

    #creating pipeline, not done at init to save on space if user only want to save locally or use his own pipeline
    def create_pipeline(self, **kwargs) -> pipeline :
       self.create_qa_pipeline(**kwargs)
       self.create_qg_pipeline(**kwargs)
       self.spacy_model = spacy.load("en_core_web_sm")

       return {"qa_pipeline":self.qa_pipeline,"qg_pipeline":self.qg_pipeline, "spacy_model":self.spacy_model}

    #Used to extract all named entities and noun chunks.
    # For each one of them we get a copy of the input where they are enclosed in the defined highlight_token (by default <hl>)
    def spacy_entity_extraction(self, text):
        # Process the text using spaCy
        doc = self.spacy_model(text)

        highlighted_sentences = []

        # Collect all named entities and noun chunks
        mask_targets = list(doc.ents) + list(doc.noun_chunks)

        # Generate a new sentence with each target enclosed in <hl> tags
        for target in mask_targets:
            # Enclose the current target with <hl> tags
            highlighted_text = text.replace(target.text, f"{self.highlight_token}{target.text}{self.highlight_token}")
            highlighted_sentences.append(highlighted_text)

        return highlighted_sentences

    # Define the processing function for the question generation output
    # (mostly just to deal with the separator and empty questions)
    # Just adding the prefix
    def format_qg_output(self, list_of_separated_questions):

        #We get the answser inside the dict
        #and we remove each question beyond the first (we assume it would only be a repetition)
        questions = [ output["generated_text"].split(self.qg_separator)[0] for output in list_of_separated_questions]

        #we remove empty questions
        questions = [q for q in questions if q != ""]

        # Remove duplicates while maintaining order
        seen = set()
        questions = [seen.add(item) or item for item in questions if item not in seen]

        return questions

    def generate_questions(self, batch, qg_pipeline_call_args={}):
        #Creating the masked input for each line
        batch_of_list_of_text_with_entity_hl = [self.spacy_entity_extraction(line) for line in batch]

        #Format to add a prefix ex: "generate question" and only keep the source col
        formatted_batch_of_list_of_text_with_entity_hl = [self.format_qg_input(list_of_text_with_entity_hl) for list_of_text_with_entity_hl in batch_of_list_of_text_with_entity_hl]

        #data in the form:
        #[["generate questions: <hl>entity1.1<hl>...", "generate questions: ...<hl>entity1.2<hl>..."...]",       <- line 1 of input batch
        # ["generate questions: <hl>entity2.1<hl>...", "generate questions: ...<hl>entity2.2<hl>..."...]",       <- line 2 of input batch
        #...]

        #for each line of formatted_batch_of_list_of_text_with_entity_hl we want 1 question per item (entity hl)
        #because we assume there is only a single relevant question per entity hl
        #we generate each question
        all_questions = [self.qg_pipeline(list_of_question, **qg_pipeline_call_args)
                         for list_of_question in formatted_batch_of_list_of_text_with_entity_hl]

        #data in the form:
        #
        #[[{'generated_text': 'question1.1.1<sep>question1.1.2...'},{'generated_text': 'question1.2.1<sep>question1.2.2...'},
        #{'generated_text': 'question2.1.1<sep>question2.1.2...'},{'generated_text': 'question2.2.1<sep>question2.2.2...'},...], <-all for line 1
        #...]
        #
        #   line 1 of input batch, entity 1.X-> y different questions: 1.X.y
        #   line 2 of input batch, entity 2.X-> z different questions: 2.X.z
        #   ...

        #Format the recieved questions ex:
        #[["what does the fox say?<sep>who let the dogs out?<sep>Who you gonna call?"],["Why did the chicken cross the road?<sep>knock knock whos there?"]]
        #[["what does the fox say?","who let the dogs out?","Who you gonna call?"],["Why did the chicken cross the road?","knock knock whos there?"]]
        #the comprehension is for dealing with each line separately, format_qg_output does the formatting and is the one which should be overridden if needed
        questions = [self.format_qg_output(questions_per_line) for questions_per_line in all_questions]

        return questions

## Entity based

In [None]:
from transformers import T5ForConditionalGeneration, PreTrainedModel
from transformers import T5Tokenizer, PreTrainedTokenizer
from transformers import pipeline
from datasets import Dataset
import datasets
import torch
import os


#------------------------------------
# General class for all metrics that "just" use a fine tuned transformer
#  - Include defining model, loading it, providing it to user, running it on dataset
#------------------------------------
class entity_based_metric(myMetric):
    def __init__(self, er_model_path, er_tokenizer_path, metric_name="custom", custom_metric=True) -> None:
        super().__init__(metric_name=metric_name, custom_metric=custom_metric)

        self.er_model_path = er_model_path
        self.er_tokenizer_path = er_tokenizer_path

    #------------------------------------
    # Getter, Loader, Savers
    #------------------------------------
    def load_er_model(self, save_folder) -> None:
        self.model = PreTrainedModel.from_pretrained(self.er_model_path)


    def load_er_tokenizer(self, save_folder) -> None:
        self.tokenizer.save_pretrained(save_folder)


    def get_er_model(self) -> PreTrainedModel:
        if not hasattr(self, "model"):
            self.pipeline = self.load_model()
        return self.model


    def get_er_tokenizer(self) -> PreTrainedTokenizer:
        if not hasattr(self, "model"):
            self.pipeline = self.load_tokenizer()
        return self.tokenizer


    def save_er_tokenizer(self, save_folder) -> None:
        if not hasattr(self, "model"):
            self.pipeline = self.load_tokenizer()

        self.tokenizer.save_pretrained(save_folder)


    def save_er_model(self, save_folder) -> None:
        if not hasattr(self, "model"):
            self.model = self.load_model()

        self.model.save_pretrained(save_folder)


    #------------------------------------
    # Pipeline functions
    #------------------------------------

    #creating pipeline, not done at init to save on space if user only want to save locally or use his own pipeline
    def create_pipeline(self, **kwargs) -> pipeline :
       self.create_er_pipeline(**kwargs)
       return {"er_pipeline":self.er_pipeline}

    #creating pipeline, not done at init to save on space if user only want to save locally or use his own pipeline
    def create_er_pipeline(self, **kwargs) -> pipeline :
       self.er_pipeline = pipeline(model=self.er_model_path, tokenizer=self.er_tokenizer_path, truncation=True, max_length=1024, **kwargs)
       return self.er_pipeline

    #Formatting extracted entities
    def format_entities(self, text):
        return text

    def filter_matching_entities(self, entities1, entities2):
        """Filters entities from `entities1` that have matching subject and relation in `entities2`."""
        filtered_entities = [
            entity1 for entity1 in entities1
            if any(entity1["subject"] == entity2["subject"] and entity1["relation"] == entity2["relation"]
                for entity2 in entities2)
        ]
        return filtered_entities

    #Default method to compare entities of both gen and sources to get a score
    def compare_entities(self, src_entities, gen_entities):

        #Filter matching entities based on subject and relation
        gen_entities_prime = self.filter_matching_entities(gen_entities, src_entities)

        #Calculate proporiton of entities and relation in gen that are also present in source
        fact_accuracy = len(gen_entities_prime) / len(gen_entities) if gen_entities else 0
        return fact_accuracy

    #Default method to compare entities of both gen and sources to get a score
    def compare_entities(self, src_entities, gen_entities):

        #Filter matching entities based on subject and relation
        gen_entities_prime = self.filter_matching_entities(gen_entities, src_entities)

        #Calculate proporiton of entities and relation in gen that are also present in source
        fact_accuracy = len(gen_entities_prime) / len(gen_entities) if gen_entities else 0
        return fact_accuracy

    #Execution of the pipeline at row (or batch) level
    #NOTE: Possibly overloaded in metric-specific class
    def run_pipeline(self, batch, batched, source_col, gen_col, keep_entities, top_k, truncation, padding):

        # If input is not batched it is not a dict of list
        # Changing it to a list to homogenize rest of code
        # Check if the input is batched (a list of items) or not
        if not batched:
            batch = {key: [value] for key, value in batch.items()}

        #Source
        entities_src_tokens = self.er_pipeline(batch[source_col], return_tensors=True, return_text=False)
        entities_src_tokens = [ line["generated_token_ids"] for line in entities_src_tokens]

        entities_src_text = self.er_pipeline.tokenizer.batch_decode(entities_src_tokens)
        entities_src_format = [ self.format_entities(line) for line in entities_src_text]

        #Gen
        entities_gen_tokens = self.er_pipeline(batch[gen_col], return_tensors=True, return_text=False)
        entities_gen_tokens = [ line["generated_token_ids"] for line in entities_gen_tokens]

        entities_gen_text = self.er_pipeline.tokenizer.batch_decode(entities_gen_tokens)
        entities_gen_format = [ self.format_entities(line) for line in entities_gen_text]

        score = [self.compare_entities(src, gen) for src, gen in zip(entities_gen_format, entities_src_format)]

        #If it is not batched then I need to send only a str not a list of str
        if not batched:
            score = score[0]

        output={"scores": score}

        if keep_entities:
            if not batched:
                entities_src_format = entities_src_format[0]
                entities_gen_format = entities_gen_format[0]

            output["entities_src"]=entities_src_format
            output["entities_gen"]=entities_gen_format

        return output

    #Execution of the pipeline on whole dataset
    #NOTE: NOT overloaded in metric-specific class (ideally)
    def evaluate_dataset(self,
        dataset,
        source_col="text",
        gen_col="gen",
        top_k=None,
        truncation=False,
        padding=False,
        max_tokens_er = 1000,
        keep_entities=False,
        save_result_dataset_folder_path=None,
        save_result_dataset_format="hf",
        map_kwargs=None
    ):

        if map_kwargs is None:
            map_kwargs = {}
        
        #if user has not set these parameters inside kwargs then use our default params:
        map_kwargs.setdefault("batched", False)
        map_kwargs.setdefault("batch_size", 1)

        #if not already created, init pipeline
        if not hasattr(self, "er_pipeline"):
            self.pipeline = self.create_er_pipeline()

        with torch.no_grad():  # Ensure no gradients
            self.evaluated_dataset = dataset.map(lambda batch: self.run_pipeline(batch=batch,
                                                                                 batched=map_kwargs["batched"],
                                                                                 source_col=source_col,
                                                                                 gen_col=gen_col,
                                                                                 keep_entities=keep_entities,
                                                                                 top_k=top_k,
                                                                                 truncation=truncation,
                                                                                 padding=padding), **map_kwargs)

        if save_result_dataset_folder_path:
            self.save_results(folder_path=save_result_dataset_folder_path, format=save_result_dataset_format)

        return self.evaluated_dataset

In [None]:
class factacc(entity_based_metric):
    def __init__(self) -> None:
        super().__init__(er_model_path= "Babelscape/rebel-large",
                         er_tokenizer_path= "Babelscape/rebel-large",
                         metric_name="factacc",
                         custom_metric=False)

    #Formatting extracted entities
    def format_entities(self, text):
        triplets = []
        relation, subject, relation, object_ = '', '', '', ''
        text = text.strip()
        current = 'x'
        for token in text.replace("<s>", "").replace("<pad>", "").replace("</s>", "").split():
            if token == "<triplet>":
                current = 't'
                if relation != '':
                    triplets.append({'subject': subject.strip(), 'relation': relation.strip(),'object': object_.strip()})
                    relation = ''
                subject = ''
            elif token == "<subj>":
                current = 's'
                if relation != '':
                    triplets.append({'subject': subject.strip(), 'relation': relation.strip(),'object': object_.strip()})
                object_ = ''
            elif token == "<obj>":
                current = 'o'
                relation = ''
            else:
                if current == 't':
                    subject += ' ' + token
                elif current == 's':
                    object_ += ' ' + token
                elif current == 'o':
                    relation += ' ' + token
        if subject != '' and relation != '' and object_ != '':
            triplets.append({'subject': subject.strip(), 'relation': relation.strip(),'object': object_.strip()})
        return triplets


    #Used to compare entities of both gen and sources to get a score
    def compare_entities(self, src_entities, gen_entities):

        #Filter matching entities based on subject and relation
        src_entities_prime = self.filter_matching_entities(src_entities, gen_entities)
        gen_entities_prime = self.filter_matching_entities(gen_entities, src_entities)

        #Calculate the intersection based on matching subject, relation, and object
        intersection = [entity for entity in src_entities_prime
                        if any(entity["subject"] == gen_entity["subject"] and
                            entity["relation"] == gen_entity["relation"] and
                            entity["object"] == gen_entity["object"]
                            for gen_entity in gen_entities_prime)]

        #Calculate factual accuracy as precision
        fact_accuracy = len(intersection) / len(gen_entities_prime) if gen_entities_prime else 0
        return fact_accuracy

# Test real data

## Importing data

In [None]:
import torch

In [None]:
import pandas as pd
from datasets import Dataset

# Step 1: Load the CSV file
df = pd.read_csv("../../datasets/XSUMFaith_git_repo/xsum_hallucination_annotations/hallucination_annotations_xsum_summaries.csv")

# Step 2: Keep only the required columns
df = df[["system", "bbcid", "summary", "worker_id", "hallucination_type"]]

# Step 3: Group by unique summaries and aggregate annotations
grouped_df = (
    df.groupby(["system", "bbcid", "summary"])
    .apply(lambda x: x[["worker_id", "hallucination_type"]].to_dict(orient="records"))
    .reset_index(name="annotations")
)

# Step 4: Convert to Hugging Face Dataset
xsumfaith_annotation = Dataset.from_pandas(grouped_df)

# Check the resulting dataset
display(xsumfaith_annotation[0])  # Example of a grouped record
display(xsumfaith_annotation.to_pandas())  # Example of a grouped record

In [None]:
from datasets import load_dataset

xsum_dataset = load_dataset("EdinburghNLP/xsum")

In [None]:
xsum_dataset["test"]

In [None]:
# Convert Hugging Face datasets to pandas DataFrames
df1 = xsumfaith_annotation.to_pandas()
df2 = xsum_dataset["test"].to_pandas()

# Convert the "bbcid" columns to the same type (e.g., both to string)
df1['bbcid'] = df1['bbcid'].astype(str)
df2['bbcid'] = df2['id'].astype(str)

df2 = df2.drop(columns=["summary"])

# Perform an inner join on "bbcid"xsumfaith_dataset
merged_df = pd.merge(df1, df2, on="bbcid", how="inner")
merged_df = merged_df.drop(columns=["id"])

# Convert the merged DataFrame back to a Hugging Face Dataset
xsumfaith_dataset = Dataset.from_pandas(merged_df)

In [None]:
xsumfaith_dataset.to_pandas()

In [None]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'


## Running benchmarks

In [None]:
#!export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True

In [None]:
dataset_name = "xsumfaith_dataset"

### TrueTeacher

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()
# Free memory on GPU 0
torch.cuda.set_device(0)
torch.cuda.empty_cache()

# Free memory on GPU 1
torch.cuda.set_device(1)
torch.cuda.empty_cache()

In [None]:
testmetric = trueTeacher()

#testmetric.create_pipeline(device="cuda:0")
#testmetric.create_pipeline(device_map="auto") #Necessitate 44GB to run at full FP32
testmetric.create_pipeline(torch_dtype=torch.float16,
                           device_map="auto",
                           offload_folder="offload",
                           max_memory={
                               0: "22GB",   # GPU 0 VRAM
                               1: "22GB",   # GPU 1 VRAM
                               "cpu": "20GB"  # Limit CPU RAM usage to 20GB
                           }
                          ) #Necessitate 22GB to run at FP16


map_kwargs = {
    "batched": False,
    "batch_size": 1
}

results = testmetric.evaluate_dataset(xsumfaith_dataset,
                                      source_col="document",
                                      gen_col="summary",
                                      truncation=True,
                                      #qg_pipeline_call_args={"truncation":True, "max_length":1024},
                                      save_result_dataset_folder_path="/home/benjamin/work/datasets/MIRAGE/results/" + dataset_name +"/trueTeacher/",
                                      map_kwargs=map_kwargs)

results

### FactCC

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()
# Free memory on GPU 0
torch.cuda.set_device(0)
torch.cuda.empty_cache()

# Free memory on GPU 1
torch.cuda.set_device(1)
torch.cuda.empty_cache()

In [None]:
testmetric = factcc()

testmetric.create_pipeline(device="cuda:0")

map_kwargs = {
    "batched": True,
    "batch_size": 10
}

results = testmetric.evaluate_dataset(xsumfaith_dataset,
                                      source_col="document",
                                      gen_col="summary",
                                      truncation=True,
                                      #qg_pipeline_call_args={"truncation":True, "max_length":1024},
                                      save_result_dataset_folder_path="/home/benjamin/work/datasets/MIRAGE/results/" + dataset_name +"/factcc/",
                                      map_kwargs=map_kwargs)

results

### QAGS

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()
# Free memory on GPU 0
torch.cuda.set_device(0)
torch.cuda.empty_cache()

# Free memory on GPU 1
torch.cuda.set_device(1)
torch.cuda.empty_cache()

In [None]:
testmetric = qags()

testmetric.create_pipeline(device="cuda:0")

map_kwargs = {
    "batched": True,
    "batch_size": 10
}

results = testmetric.evaluate_dataset(xsumfaith_dataset,
                                      source_col="document",
                                      gen_col="summary",
                                      qg_pipeline_call_args={"truncation":True, "max_length":512},
                                      save_result_dataset_folder_path="/home/benjamin/work/datasets/MIRAGE/results/" + dataset_name +"/qags/",
                                      map_kwargs=map_kwargs)

results

### FEQA

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()
# Free memory on GPU 0
torch.cuda.set_device(0)
torch.cuda.empty_cache()

# Free memory on GPU 1
torch.cuda.set_device(1)
torch.cuda.empty_cache()

In [None]:
testmetric = feqa()

testmetric.create_pipeline(device="cuda:0")

map_kwargs = {
    "batched": True,
    "batch_size": 100
}

results = testmetric.evaluate_dataset(xsumfaith_dataset,
                                      source_col="document",
                                      gen_col="summary",
                                      qg_pipeline_call_args={"truncation":True, "max_length":512},
                                      save_result_dataset_folder_path="/home/benjamin/work/datasets/MIRAGE/results/" + dataset_name +"/feqa/",
                                      keep_questions=True,
                                      keep_answers=True,
                                      map_kwargs=map_kwargs)

results

### Factacc

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()
# Free memory on GPU 0
torch.cuda.set_device(0)
torch.cuda.empty_cache()

# Free memory on GPU 1
torch.cuda.set_device(1)
torch.cuda.empty_cache()

In [None]:
testmetric = factacc()

testmetric.create_pipeline(device="cuda:0")

map_kwargs = {
    "batched": True,
    "batch_size": 10
}

results = testmetric.evaluate_dataset(xsumfaith_dataset,
                                      source_col="document",
                                      gen_col="summary",
                                      truncation=True,
                                      #qg_pipeline_call_args={"truncation":True, "max_length":1024},
                                      save_result_dataset_folder_path="/home/benjamin/work/datasets/MIRAGE/results/" + dataset_name +"/factacc/",
                                      map_kwargs=map_kwargs)

results

# Test

## Test FacAcc

In [None]:
from datasets import Dataset

source1= "Leeds showed they are in good shape to cope with Kevin Sinfield’s retirement as they claimed a 26 - 12 derby victory over Castleford in front of a sell-out crowd at the Mend-a-Hose Jungle. [...] Ryan Hall was sent to the sin-bin for the first time in his career […] Joel Moon scored his first try of the season […] Leeds extended their unbeaten run against the Tigers to six matches"
summary1= "Kevin Sinfield scored his first try of the season against Castleford. Leeds Rhino scored unbeaten run against Tigers to six matches. Ryan Hall was sent to Leeds Rhino for first time in his career ."

source2= "Amazon has announced plans to open a new headquarters in Arlington, Virginia, creating over 25,000 new jobs in the area. The company has invested heavily in the region, including a $5 billion project to develop the surrounding infrastructure. Despite concerns from local residents about increased traffic and housing costs, Amazon expects the new campus to bring significant economic benefits to the region. CEO Jeff Bezos emphasized the company's commitment to sustainability, noting that the new headquarters would run entirely on renewable energy."
summary2= "Jeff Bezos announced plans to create 5,000 new jobs in Arlington, Virginia, by opening a new Amazon headquarters. The development, which will cost $25 billion, is expected to increase traffic and housing costs but will boost the economy. Amazon’s campus will use non-renewable energy."

source3= "Apple unveiled its latest iPhone model during a special event in Cupertino. The new iPhone 15 comes with a faster processor, improved camera capabilities, and a more durable design. The device also introduces USB-C charging, marking a shift from Apple's long-standing use of the Lightning port. Prices for the iPhone 15 start at $799, with pre-orders available from next week. CEO Tim Cook highlighted the company's focus on privacy and environmental responsibility, noting that the new phone uses recycled materials in its construction."
summary3= "Apple released the iPhone 14 with new features, including a Lightning port and improved battery life. Prices start at $999, and pre-orders begin immediately. Tim Cook emphasized the importance of faster processing speed and wireless charging."

data = {
    'text': [source1, source2, source3],
    'summary': [summary1, summary2, summary3]
}

test_dataset = Dataset.from_dict(data)

# View the dataset
print(test_dataset)
print(test_dataset['text'])
print(len(test_dataset['text']))

In [None]:
import torch

print("PyTorch version:", torch.__version__)

In [None]:
pip show transformers


In [None]:
#test_dataset = machintruc
#testmetric = entity_based_metric(er_model_path= "Babelscape/rebel-large",
#                                 er_tokenizer_path= "Babelscape/rebel-large",
#                                 metric_name="custom",
#                                 custom_metric=True)
testmetric = factacc()
#testmetric.create_pipeline()
#results = testmetric.evaluate_dataset(test_dataset)

testmetric.create_pipeline(device="cuda:0")
results = testmetric.evaluate_dataset(test_dataset,
                                      source_col="text",
                                      gen_col="summary",
                                      save_result_dataset_folder_path="work/datasets/MIRAGE/test_dataset",
                                      max_tokens_er=1000,
                                      batched=True,
                                      batch_size=100,
                                      keep_entities=True)

results

In [None]:
display(results['entities_src'][0])

In [None]:
display(results.to_pandas())

In [None]:
def filter_matching_entities(entities1, entities2):
    """Filters entities from `entities1` that have matching subject and relation in `entities2`."""
    filtered_entities = [
        entity1 for entity1 in entities1
        if any(entity1["subject"] == entity2["subject"] and entity1["relation"] == entity2["relation"]
               for entity2 in entities2)
    ]
    return filtered_entities

def calculate_factual_accuracy(src_entities, gen_entities):
    # Step 1: Filter matching entities based on subject and relation
    src_entities_prime = filter_matching_entities(src_entities, gen_entities)
    gen_entities_prime = filter_matching_entities(gen_entities, src_entities)

    # Step 2: Calculate the intersection based on matching subject, relation, and object
    intersection = [entity for entity in src_entities_prime
                    if any(entity["subject"] == gen_entity["subject"] and
                           entity["relation"] == gen_entity["relation"] and
                           entity["object"] == gen_entity["object"]
                           for gen_entity in gen_entities_prime)]

    # Step 3: Calculate factual accuracy as precision
    fact_accuracy = len(intersection) / len(gen_entities_prime) if gen_entities_prime else 0
    return fact_accuracy

# Example usage
src_entities = [
    {"subject": "subject1", "relation": "relation1", "object": "object1"},
    {"subject": "subject2", "relation": "relation2", "object": "object2"}
]
gen_entities = [
    {"subject": "subject1", "relation": "relation1", "object": "object1"},
    {"subject": "subject2", "relation": "relation2", "object": "object3"}
]

fact_accuracy = calculate_factual_accuracy(src_entities, gen_entities)
print("Factual Accuracy:", fact_accuracy)


## FactAcc

In [None]:
from datasets import Dataset

source1= "Leeds showed they are in good shape to cope with Kevin Sinfield’s retirement as they claimed a 26 - 12 derby victory over Castleford in front of a sell-out crowd at the Mend-a-Hose Jungle. [...] Ryan Hall was sent to the sin-bin for the first time in his career […] Joel Moon scored his first try of the season […] Leeds extended their unbeaten run against the Tigers to six matches"
summary1= "Kevin Sinfield scored his first try of the season against Castleford. Leeds Rhino scored unbeaten run against Tigers to six matches. Ryan Hall was sent to Leeds Rhino for first time in his career ."

source2= "Amazon has announced plans to open a new headquarters in Arlington, Virginia, creating over 25,000 new jobs in the area. The company has invested heavily in the region, including a $5 billion project to develop the surrounding infrastructure. Despite concerns from local residents about increased traffic and housing costs, Amazon expects the new campus to bring significant economic benefits to the region. CEO Jeff Bezos emphasized the company's commitment to sustainability, noting that the new headquarters would run entirely on renewable energy."
summary2= "Jeff Bezos announced plans to create 5,000 new jobs in Arlington, Virginia, by opening a new Amazon headquarters. The development, which will cost $25 billion, is expected to increase traffic and housing costs but will boost the economy. Amazon’s campus will use non-renewable energy."

source3= "Apple unveiled its latest iPhone model during a special event in Cupertino. The new iPhone 15 comes with a faster processor, improved camera capabilities, and a more durable design. The device also introduces USB-C charging, marking a shift from Apple's long-standing use of the Lightning port. Prices for the iPhone 15 start at $799, with pre-orders available from next week. CEO Tim Cook highlighted the company's focus on privacy and environmental responsibility, noting that the new phone uses recycled materials in its construction."
summary3= "Apple released the iPhone 14 with new features, including a Lightning port and improved battery life. Prices start at $999, and pre-orders begin immediately. Tim Cook emphasized the importance of faster processing speed and wireless charging."

data = {
    'text': [source1, source2, source3],
    'summary': [summary1, summary2, summary3]
}

test_dataset = Dataset.from_dict(data)

# View the dataset
print(test_dataset)
print(test_dataset['text'])
print(len(test_dataset['text']))

In [None]:
import gc
if "testmetric" in locals():
    del testmetric
collected = gc.collect()

## test relation classification

In [None]:
from transformers import pipeline

triplet_extractor = pipeline('text2text-generation', model='Babelscape/rebel-large', tokenizer='Babelscape/rebel-large')

In [None]:
# We need to use the tokenizer manually since we need special tokens.
test_text = "Punta Cana is a resort town in the municipality of Higuey, in La Altagracia Province, the eastern most province of the Dominican Republic"
test_text = source1
result = triplet_extractor(test_dataset["text"], return_tensors=True, return_text=False)

In [None]:
result = [ line["generated_token_ids"] for line in result]

In [None]:
result

In [None]:
extracted_text = triplet_extractor.tokenizer.batch_decode(result)
display(extracted_text)

In [None]:
# Function to parse the generated text and extract the triplets
def extract_triplets(text):
    triplets = []
    relation, subject, relation, object_ = '', '', '', ''
    text = text.strip()
    current = 'x'
    for token in text.replace("<s>", "").replace("<pad>", "").replace("</s>", "").split():
        if token == "<triplet>":
            current = 't'
            if relation != '':
                triplets.append({'subject': subject.strip(), 'relation': relation.strip(),'object': object_.strip()})
                relation = ''
            subject = ''
        elif token == "<subj>":
            current = 's'
            if relation != '':
                triplets.append({'subject': subject.strip(), 'relation': relation.strip(),'object': object_.strip()})
            object_ = ''
        elif token == "<obj>":
            current = 'o'
            relation = ''
        else:
            if current == 't':
                subject += ' ' + token
            elif current == 's':
                object_ += ' ' + token
            elif current == 'o':
                relation += ' ' + token
    if subject != '' and relation != '' and object_ != '':
        triplets.append({'subject': subject.strip(), 'relation': relation.strip(),'object': object_.strip()})
    return triplets
extracted_triplets = [ extract_triplets(line) for line in extracted_text]
display(extracted_triplets)


## [OLD]test relation classification

In [None]:
# Use a pipeline as a high-level helper
from transformers import pipeline

pipe = pipeline(model="studio-ousia/luke-large-finetuned-conll-2003")

In [None]:
# Load model directly
from transformers import LukeTokenizer, LukeForEntitySpanClassification

# Load the model checkpoint
model = LukeForEntitySpanClassification.from_pretrained("studio-ousia/luke-large-finetuned-conll-2003")
model.eval()
model.to("cpu")

# Load the tokenizer
tokenizer = LukeTokenizer.from_pretrained("studio-ousia/luke-large-finetuned-conll-2003")

In [None]:
tokenizer(source1)

In [None]:
text = "Star Wars is a film written and directed by George Lucas"
nlp = spacy.load("en_core_web_sm")
doc = nlp(text)

entity_spans = []
original_word_spans = []
for token_start in doc:
    for token_end in doc[token_start.i:]:
        entity_spans.append((token_start.idx, token_end.idx + len(token_end)))
        original_word_spans.append((token_start.i, token_end.i + 1))

inputs = tokenizer(text, entity_spans=entity_spans, return_tensors="pt", padding=True)
inputs = inputs.to("cpu")
with torch.no_grad():
    outputs = model(**inputs)

logits = outputs.logits
max_logits, max_indices = logits[0].max(dim=1)

predictions = []
for logit, index, span in zip(max_logits, max_indices, original_word_spans):
    if index != 0:  # the span is not NIL
        predictions.append((logit, span, model.config.id2label[int(index)]))

# construct an IOB2 label sequence
predicted_sequence = ["O"] * len(doc)
for _, span, label in sorted(predictions, key=lambda o: o[0], reverse=True):
    if all([o == "O" for o in predicted_sequence[span[0] : span[1]]]):
        predicted_sequence[span[0]] = "B-" + label
        if span[1] - span[0] > 1:
            predicted_sequence[span[0] + 1 : span[1]] = ["I-" + label] * (span[1] - span[0] - 1)

for token, label in zip(doc, predicted_sequence):
    print(token, label)

In [None]:
source21= "Amazon has announced plans to open a new headquarters in Arlington, Virginia, creating over 25,000 new jobs in the area. The company has invested heavily in the region, including a $5 billion project to develop the surrounding infrastructure. Despite concerns from local residents about increased traffic and housing costs, Amazon expects the new campus to bring significant economic benefits to the region. CEO Jeff Bezos emphasized the company's commitment to sustainability, noting that the new headquarters would run entirely on renewable energy.Beyoncé lives in Los Angeles."

In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import pipeline

tokenizer = AutoTokenizer.from_pretrained("Babelscape/wikineural-multilingual-ner")
model = AutoModelForTokenClassification.from_pretrained("Babelscape/wikineural-multilingual-ner")

In [None]:
nlp = pipeline("ner", model=model, tokenizer=tokenizer, grouped_entities=True,)
example = "My name is Wolfgang and I live in Berlin"

ner_results = nlp(source21)
print(ner_results)

In [None]:
[ entity['word'] for entity in ner_results]

In [None]:
from itertools import combinations

# Create a new dictionary with only the specified fields
filtered_entity = [{key: entity[key] for key in entity.keys() if key in ['word','start','end']} for entity in ner_results]
filtered_entity
entity_pairs = [[entity1, entity2] for entity1, entity2 in combinations(filtered_entity, 2) if entity1['word'] != entity2['word']]

In [None]:
def transform_entity(entity):
    return {'word': entity['word'], 'loc': (entity['start'], entity['end'])}

entity_pairs = [[transform_entity(entity) for entity in pair] for pair in entity_pairs]

In [None]:
display(entity_pairs)
len(entity_pairs)

In [None]:
from transformers import LukeTokenizer, LukeForEntityPairClassification
model_pair_class = LukeForEntityPairClassification.from_pretrained("studio-ousia/luke-large-finetuned-tacred")
tokenizer_pair_class = LukeTokenizer.from_pretrained("studio-ousia/luke-large-finetuned-tacred")

In [None]:
text = "Beyoncé lives in Los Angeles."
entity_spans = [(0, 7), (17, 28)]  # character-based entity spans corresponding to "Beyoncé" and "Los Angeles"
inputs = tokenizer_pair_class(text, entity_spans=entity_spans, return_tensors="pt")
outputs = model_pair_class(**inputs)
logits = outputs.logits
predicted_class_idx = int(logits[0].argmax())
print("Predicted class:", model_pair_class.config.id2label[predicted_class_idx])
# Predicted class: per:cities_of_residence

In [None]:
def process_class(identity_pair, text):

    inputs = tokenizer_pair_class(text, entity_spans=entity_spans, return_tensors="pt", truncation=True)
    outputs = model_pair_class(**inputs)
    logits = outputs.logits
    predicted_class_idx = logits.argmax(-1).item()

    print("Original text:", text)
    decoded_text = tokenizer_pair_class.decode(inputs['input_ids'][0], skip_special_tokens=True)
    print("Decoded text:", decoded_text)


    return {'object1':identity_pair[0]['word'],
            'object2':identity_pair[1]['word'],
            'relation':model_pair_class.config.id2label[predicted_class_idx]}

In [None]:
test_identity_pair = [{'word': 'Amazon', 'loc': (0, 6)}, {'word': 'Arlington', 'loc': (57, 66)}]
test_text = source2
source2len = len(test_text)

test_text = test_text + "Beyoncé lives in Los Angeles. "
print(test_text)

test_identity_pair = [{'word': 'Beyoncé', 'loc': (source2len, source2len+7)},
                      {'word': 'Los Angeles', 'loc': (source2len+17, source2len+28)}]

print(test_text[test_identity_pair[0]['loc'][0]:test_identity_pair[0]['loc'][1]])
print(test_text[test_identity_pair[1]['loc'][0]:test_identity_pair[1]['loc'][1]])

process_class(test_identity_pair, text=test_text)

In [None]:
test_text[567:578]

In [None]:
display([ process_class(identity_pair, source2) for identity_pair in entity_pairs])

In [None]:
source21

#Test FEQA

In [None]:
!pip install spacy
!python -m spacy download en_core_web_sm

In [None]:
text = "Barack Obama was born in Hawaii. He was the 44th president of the United States."
text2 = "The home was built for inspection."

In [None]:
import spacy

# Load the pre-trained model for English
nlp = spacy.load("en_core_web_sm")

def generate_highlighted_sentences(text):
    # Process the text using spaCy
    doc = nlp(text)

    highlighted_sentences = []

    # Collect all named entities and noun chunks
    mask_targets = list(doc.ents) + list(doc.noun_chunks)

    # Generate a new sentence with each target enclosed in <hl> tags
    for target in mask_targets:
        # Enclose the current target with <hl> tags
        highlighted_text = text.replace(target.text, f"<hl>{target.text}</hl>")
        highlighted_sentences.append(highlighted_text)

    return highlighted_sentences

# Get separate sentences for each entity/noun phrase highlighted
highlighted_sentences = generate_highlighted_sentences(text)

# Print the results
for sentence in highlighted_sentences:
    print(sentence)


#test lib QAQG

In [None]:
from datasets import Dataset

source1= "Leeds showed they are in good shape to cope with Kevin Sinfield’s retirement as they claimed a 26 - 12 derby victory over Castleford in front of a sell-out crowd at the Mend-a-Hose Jungle. [...] Ryan Hall was sent to the sin-bin for the first time in his career […] Joel Moon scored his first try of the season […] Leeds extended their unbeaten run against the Tigers to six matches"
summary1= "Kevin Sinfield scored his first try of the season against Castleford. Leeds Rhino scored unbeaten run against Tigers to six matches. Ryan Hall was sent to Leeds Rhino for first time in his career ."

source2= "Amazon has announced plans to open a new headquarters in Arlington, Virginia, creating over 25,000 new jobs in the area. The company has invested heavily in the region, including a $5 billion project to develop the surrounding infrastructure. Despite concerns from local residents about increased traffic and housing costs, Amazon expects the new campus to bring significant economic benefits to the region. CEO Jeff Bezos emphasized the company's commitment to sustainability, noting that the new headquarters would run entirely on renewable energy."
summary2= "Jeff Bezos announced plans to create 5,000 new jobs in Arlington, Virginia, by opening a new Amazon headquarters. The development, which will cost $25 billion, is expected to increase traffic and housing costs but will boost the economy. Amazon’s campus will use non-renewable energy."

source3= "Apple unveiled its latest iPhone model during a special event in Cupertino. The new iPhone 15 comes with a faster processor, improved camera capabilities, and a more durable design. The device also introduces USB-C charging, marking a shift from Apple's long-standing use of the Lightning port. Prices for the iPhone 15 start at $799, with pre-orders available from next week. CEO Tim Cook highlighted the company's focus on privacy and environmental responsibility, noting that the new phone uses recycled materials in its construction."
summary3= "Apple released the iPhone 14 with new features, including a Lightning port and improved battery life. Prices start at $999, and pre-orders begin immediately. Tim Cook emphasized the importance of faster processing speed and wireless charging."

data = {
    'text': [source1, source2, source3],
    'summary': [summary1, summary2, summary3]
}

test_dataset = Dataset.from_dict(data)

# View the dataset
print(test_dataset)
print(test_dataset['text'])
print(len(test_dataset['text']))

In [None]:
import gc
if "testmetric" in locals():
    del testmetric
collected = gc.collect()

In [None]:
#test_dataset = machintruc
testmetric = qgqa_based_metric(qg_model_path= "valhalla/t5-small-e2e-qg",
                             qa_model_path= "valhalla/t5-small-qa-qg-hl",
                             qg_tokenizer_path= "valhalla/t5-small-e2e-qg",
                             qa_tokenizer_path= "valhalla/t5-small-qa-qg-hl",
                             metric_name="custom",
                             custom_metric=True)
testmetric = feqa()
#testmetric.create_pipeline()
#results = testmetric.evaluate_dataset(test_dataset)

testmetric.create_pipeline(device="cpu")
results = testmetric.evaluate_dataset(test_dataset,
                                      source_col="text",
                                      gen_col="summary",
                                      save_result_dataset_folder_path="/content/drive/MyDrive/datasets/results/MIRAGE_Test/test_dataset",
                                      max_tokens_qg=1000,
                                      batched=True,
                                      batch_size=100,
                                      keep_answers=True,
                                      keep_questions=True)

results

In [None]:
from datasets import Dataset

source1= "Leeds showed they are in good shape to cope with Kevin Sinfield’s retirement as they claimed a 26 - 12 derby victory over Castleford in front of a sell-out crowd at the Mend-a-Hose Jungle. [...] Ryan Hall was sent to the sin-bin for the first time in his career […] Joel Moon scored his first try of the season […] Leeds extended their unbeaten run against the Tigers to six matches"
summary1= "Kevin Sinfield scored his first try of the season against Castleford. Leeds Rhino scored unbeaten run against Tigers to six matches. Ryan Hall was sent to Leeds Rhino for first time in his career ."

source2= "Amazon has announced plans to open a new headquarters in Arlington, Virginia, creating over 25,000 new jobs in the area. The company has invested heavily in the region, including a $5 billion project to develop the surrounding infrastructure. Despite concerns from local residents about increased traffic and housing costs, Amazon expects the new campus to bring significant economic benefits to the region. CEO Jeff Bezos emphasized the company's commitment to sustainability, noting that the new headquarters would run entirely on renewable energy."
summary2= "Jeff Bezos announced plans to create 5,000 new jobs in Arlington, Virginia, by opening a new Amazon headquarters. The development, which will cost $25 billion, is expected to increase traffic and housing costs but will boost the economy. Amazon’s campus will use non-renewable energy."

source3= "Apple unveiled its latest iPhone model during a special event in Cupertino. The new iPhone 15 comes with a faster processor, improved camera capabilities, and a more durable design. The device also introduces USB-C charging, marking a shift from Apple's long-standing use of the Lightning port. Prices for the iPhone 15 start at $799, with pre-orders available from next week. CEO Tim Cook highlighted the company's focus on privacy and environmental responsibility, noting that the new phone uses recycled materials in its construction."
summary3= "Apple released the iPhone 14 with new features, including a Lightning port and improved battery life. Prices start at $999, and pre-orders begin immediately. Tim Cook emphasized the importance of faster processing speed and wireless charging."

data = {
    'text': [source1, source2, source3],
    'summary': [summary1, summary2, summary3]
}

test_dataset = Dataset.from_dict(data)

# View the dataset
print(test_dataset)
print(test_dataset['text'])
print(len(test_dataset['text']))

In [None]:
import gc
if "testmetric" in locals():
    del testmetric
collected = gc.collect()

In [None]:
#test_dataset = machintruc
testmetric = qgqa_based_metric(qg_model_path= "valhalla/t5-small-e2e-qg",
                             qa_model_path= "valhalla/t5-small-qa-qg-hl",
                             qg_tokenizer_path= "valhalla/t5-small-e2e-qg",
                             qa_tokenizer_path= "valhalla/t5-small-qa-qg-hl",
                             metric_name="custom",
                             custom_metric=True)
testmetric = feqa()
#testmetric.create_pipeline()
#results = testmetric.evaluate_dataset(test_dataset)

testmetric.create_pipeline(device="cpu")
results = testmetric.evaluate_dataset(test_dataset,
                                      source_col="text",
                                      gen_col="summary",
                                      save_result_dataset_folder_path="/content/drive/MyDrive/datasets/results/MIRAGE_Test/test_dataset",
                                      max_tokens_qg=1000,
                                      batched=True,
                                      batch_size=100,
                                      keep_answers=True,
                                      keep_questions=True)

results

In [None]:
results.to_pandas()

In [None]:
results.to_list()[0]

|   |                     text 	                      |                   summary 	                    |       scores          |
|---|-------------------------------------------------|-------------------------------------------------|-----------------------|
| 0 |Leeds showed they are in good shape to cope wi...|Kevin Sinfield scored his first try of the sea...| 0.5                	|
| 1 |Amazon has announced plans to open a new headq...|Jeff Bezos announced plans to create 5,000 new...| 1 	                |
| 2 |Apple unveiled its latest iPhone model during ...|Apple released the iPhone 14 with new features...| 1               	    |

In [None]:
qg_model = testmetric.create_qg_pipeline()

In [None]:
qg_model.__call__

In [None]:
test = [{'generated_text': 'Which team extended their unbeaten run against the Tigers to six matches?'},
{'generated_text': 'Whose retirement did Leeds show they are in good shape to cope with?'},
{'generated_text': 'How many wins did Leeds have against Castleford?'},
{'generated_text': 'Who did Leeds beat in the derby?'},
{'generated_text': 'What venue hosted the derby?'},
{'generated_text': 'Who was sent to the sin-bin for the first time in his career?'},
{'generated_text': 'For what time of his career did Joel Moon score a try?'},
{'generated_text': 'Who scored his first try of the season?'},
{'generated_text': 'For what time of his career did Joel Moon score a try?'},
{'generated_text': 'Who did Leeds extend their unbeaten run against?'},
{'generated_text': 'How many matches did Leeds extend their unbeaten run against the Tigers to?'},
{'generated_text': 'Which team extended their unbeaten run against the Tigers to six matches?'},
{'generated_text': 'What did Leeds show in the derby victory over Castleford?'},
{'generated_text': 'What did Leeds show in the defeat of Castleford?'},
{'generated_text': 'What did Leeds show they are in good shape to cope with?'},
{'generated_text': 'What did Leeds show in the derby victory over Castleford?'},
{'generated_text': 'What did Leeds win against Castleford?'},
{'generated_text': 'Who did Leeds beat in the derby?'},
{'generated_text': 'What was the pitch of the derby?'},
{'generated_text': 'What was the crowd at the Mend-a-Hose Jungle?'},
{'generated_text': 'Where was the derby played?'},
{'generated_text': 'Who was sent to the sin-bin for the first time in his career?'},
{'generated_text': 'Where was Ryan Hall sent for the first time in his career?'},
{'generated_text': 'For what reason was Ryan Hall sent to the sin-bin?'},
{'generated_text': "What was Ryan Hall's first time in the sin-bin?"},
{'generated_text': 'Who scored his first try of the season?'},
{'generated_text': 'What did Joel Moon score?'},
{'generated_text': 'What season did Joel Moon score his first try for Leeds?'},
{'generated_text': 'Which team extended their unbeaten run against the Tigers to six matches?'},
{'generated_text': 'What did Leeds extend to six matches against the Tigers?'},
{'generated_text': 'Who did Leeds extend their unbeaten run against?'},
{'generated_text': 'How many matches did Leeds extend their unbeaten run against the Tigers to?'}]
a = [ output["generated_text"].split("<sep>")[0] for output in test]
a

## test QAQG model

In [None]:
from datasets import Dataset

source1= "Leeds showed they are in good shape to cope with Kevin Sinfield’s retirement as they claimed a 26 - 12 derby victory over Castleford in front of a sell-out crowd at the Mend-a-Hose Jungle. [...] Ryan Hall was sent to the sin-bin for the first time in his career […] Joel Moon scored his first try of the season […] Leeds extended their unbeaten run against the Tigers to six matches"
summary1= "Kevin Sinfield scored his first try of the season against Castleford. Leeds Rhino scored unbeaten run against Tigers to six matches. Ryan Hall was sent to Leeds Rhino for first time in his career ."

source2= "Amazon has announced plans to open a new headquarters in Arlington, Virginia, creating over 25,000 new jobs in the area. The company has invested heavily in the region, including a $5 billion project to develop the surrounding infrastructure. Despite concerns from local residents about increased traffic and housing costs, Amazon expects the new campus to bring significant economic benefits to the region. CEO Jeff Bezos emphasized the company's commitment to sustainability, noting that the new headquarters would run entirely on renewable energy."
summary2= "Jeff Bezos announced plans to create 5,000 new jobs in Arlington, Virginia, by opening a new Amazon headquarters. The development, which will cost $25 billion, is expected to increase traffic and housing costs but will boost the economy. Amazon’s campus will use non-renewable energy."

source3= "Apple unveiled its latest iPhone model during a special event in Cupertino. The new iPhone 15 comes with a faster processor, improved camera capabilities, and a more durable design. The device also introduces USB-C charging, marking a shift from Apple's long-standing use of the Lightning port. Prices for the iPhone 15 start at $799, with pre-orders available from next week. CEO Tim Cook highlighted the company's focus on privacy and environmental responsibility, noting that the new phone uses recycled materials in its construction."
summary3= "Apple released the iPhone 14 with new features, including a Lightning port and improved battery life. Prices start at $999, and pre-orders begin immediately. Tim Cook emphasized the importance of faster processing speed and wireless charging."

data = {
    'text': [source1, source2, source3],
    'summary': [summary1, summary2, summary3]
}

In [None]:
# Use a pipeline as a high-level helper
from transformers import pipeline

pipe = pipeline("text2text-generation", model="valhalla/t5-small-e2e-qg", max_new_tokens=1000)

In [None]:
result = pipe('generate question: [MASK] showed they are in good shape to cope with Kevin Sinfield’s retirement as they claimed a 26 - 12 derby victory over Castleford in front of a sell-out crowd at the Mend-a-Hose Jungle. [...] Ryan Hall was sent to the sin-bin for the first time in his career […] Joel Moon scored his first try of the season […] [MASK] extended their unbeaten run against the Tigers to six matches')

In [None]:
[text["generated_text"].split("<sep>") for text in result]

In [None]:
src_qg = pipe("generate questions: " + source)[0]["generated_text"].split("<sep>")
if src_qg[-1] == "":
    src_qg.pop()
gen_qg = pipe("generate questions: " + summary)[0]["generated_text"].split("<sep>")
if gen_qg[-1] == "":
    gen_qg.pop()

print(src_qg)
print(gen_qg)

In [None]:
src_qg

In [None]:
# Use a pipeline as a high-level helper
from transformers import pipeline

pipe_qa = pipeline("text2text-generation", model="valhalla/t5-small-qa-qg-hl", max_new_tokens=1000)

In [None]:
pipe_qa("question: What was the name of the man who retired from Leeds?  context: " + summary)[0]["generated_text"]

In [None]:
answers = [{"source": pipe_qa(f"question: {question}  context: {source}")[0]["generated_text"],
            "gen": pipe_qa(f"question: {question}  context: {summary}")[0]["generated_text"]}
           for question in src_qg]

In [None]:
answers

In [None]:
def compute_token_f1(answers, model_name="valhalla/t5-small-qa-qg-hl"):
    # Load tokenizer for the model
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    f1_results = []

    for answer in answers:
        src_answer = answer['source']
        gen_answer = answer['gen']

        # Tokenize the answers
        tokens_src = set(tokenizer.tokenize(src_answer))
        tokens_gen = set(tokenizer.tokenize(gen_answer))

        # Calculate precision, recall, and F1 score
        common_tokens = tokens_src & tokens_gen
        precision = len(common_tokens) / len(tokens_src) if tokens_src else 0
        recall = len(common_tokens) / len(tokens_gen) if tokens_gen else 0
        f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

        f1_results.append(f1_score)

    return f1_results

In [None]:
answers_scored = compute_token_f1(answers)

In [None]:
answers_scored

## utilisation exemple transformermodel

In [None]:
from datasets import Dataset

text1='''The US has "passed the peak" on new coronavirus cases, the White House reported. They predict that some states would reopen this month.
The US has over 637,000 confirmed Covid-19 cases and over 30,826 deaths, the highest for any country in the world.'''
summary1 = '''The pandemic has almost not affected the US'''
summary2 = '''The pandemic has not affected the US'''
summary3 = '''The pandemic has affected the US a lot'''
summary4 = '''The pandemic has not affected the US at all, no death at all'''


data = {
    'source': [text1, text1, text1, text1],
    'text': [summary1, summary2, summary3, summary4],
    'label': ["HALL", "NOHALL", "NOHALL", "HALL"]  # Labels corresponding to the text
}

data2 = {
    'text': ["This is a sample text", "Here's another one", "And yet another sentence"],
    'label': [0, 1, 1]  # Labels corresponding to the text
}

test_dataset = Dataset.from_dict(data)
test_dataset2 = Dataset.from_dict(data2)

# View the dataset
print(test_dataset)
print(test_dataset['text'])

In [None]:
import gc
if "testmetric" in locals():
    del testmetric
collected = gc.collect()

#test_dataset = machintruc
testmetric = factcc()
#testmetric.create_pipeline()
#results = testmetric.evaluate_dataset(test_dataset)

testmetric.create_pipeline(device="cpu")
results = testmetric.evaluate_dataset(test_dataset, source_col="source", gen_col="text", save_result_dataset_folder_path="/content/drive/MyDrive/datasets/results/MIRAGE_Test/test_dataset", truncation="longest_first", padding='max_length', top_k=None, batched=False, batch_size=100  )

results['label']

In [None]:
display(results.to_pandas())

In [None]:
results[3]

In [None]:
pipe=pipeline(model="manueldeprada/FactCC")
input = [[[text1,summary1]],[[text1,summary3]]]
print(input)
pipe(input,truncation='only_first',padding='max_length')

In [None]:
results["text"]

In [None]:
#testmetric.save_results(folder_path="/content/drive/MyDrive/datasets/results/MIRAGE_Test/test_dataset", format="hf")
testmetric.save_results(folder_path="/content/drive/MyDrive/datasets/results/MIRAGE_Test/test_dataset", format="json")
testmetric.save_results(folder_path="/content/drive/MyDrive/datasets/results/MIRAGE_Test/test_dataset", format="csv")

In [None]:
display(
    a.labels,
    a.class_scores,
    a.predicted_labels)

In [None]:
results["label"]

In [None]:

metrics = ['f1', 'precision', 'recall', 'accuracy', 'balanced_accuracy', 'mcc', 'kappa', 'log_loss']
manager = ScoreManager(results, metrics)

print(manager.results)  # Outputs the calculated metrics

In [None]:
manager.plot(['metrics_bar', 'roc_curve', 'precision_recall_curve', 'confusion_matrix'], metrics_bar=['f1', 'precision', 'recall', 'accuracy'])

In [None]:
manager.plot(['metrics_bar', 'roc_curve', 'precision_recall_curve', 'confusion_matrix'], metrics_bar=['f1', 'precision', 'recall', 'accuracy'], save_plots=True, output_path='/content/drive/MyDrive/datasets/results/MIRAGE_Test')

In [None]:
from sklearn.metrics import (
    f1_score, precision_score, recall_score, roc_auc_score, precision_recall_curve, roc_curve
)
import numpy as np

# True labels
true_labels = a.labels

# Predicted labels
predicted_labels = a.predicted_labels

# Class scores: Extract the probabilities for the positive class ('NOHALL')
positive_class_scores = [score[0]['NOHALL'] for score in a.class_scores]

# Convert labels to binary (assuming 'NOHALL' is the positive class, and 'HALL' is the negative class)
true_labels_binary = [1 if label == 'NOHALL' else 0 for label in true_labels]
predicted_labels_binary = [1 if label == 'NOHALL' else 0 for label in predicted_labels]

# F1 score
f1 = f1_score(true_labels_binary, predicted_labels_binary)

# Precision and Recall
precision = precision_score(true_labels_binary, predicted_labels_binary)
recall = recall_score(true_labels_binary, predicted_labels_binary)

# ROC AUC score
roc_auc = roc_auc_score(true_labels_binary, positive_class_scores)

# Precision-Recall curve
precision_curve, recall_curve, pr_thresholds = precision_recall_curve(true_labels_binary, positive_class_scores)

# ROC curve
fpr, tpr, roc_thresholds = roc_curve(true_labels_binary, positive_class_scores)

# Display the results
print(f"F1 Score: {f1}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"ROC AUC: {roc_auc}")

# You can also plot Precision-Recall and ROC curves if needed
import matplotlib.pyplot as plt

# Plot Precision-Recall curve
plt.figure(figsize=(8, 6))
plt.plot(recall_curve, precision_curve, label="Precision-Recall curve")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()
plt.show()

# Plot ROC curve
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label="ROC curve")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.show()


In [None]:
display(results.to_dict())

In [None]:
results["predictions"]

## Utilisation with benchmark data

In [None]:
from datasets import load_dataset

dataset = load_dataset('json', data_files="/content/drive/MyDrive/datasets/results/test/test_dataset.jsonl")

In [None]:
dataset

In [None]:
testmetric = factcc()
testmetric.create_pipeline(device="cuda:0")
results = testmetric.evaluate_dataset(dataset, input_col="input", save_result_dataset_folder_path="/content/drive/MyDrive/datasets/results/MIRAGE_Test/test_dataset", truncation="longest_first", padding='max_length', top_k=None, batched=True, batch_size=100  )
results

In [None]:
results['train']

In [None]:
a=results['train']

def get_predicted_label(example):
    """Get the label with the highest score from predictions."""
    prediction_dict = example["predictions"]
    return max(prediction_dict, key=prediction_dict.get)

b = [get_predicted_label(example) for example in a]

In [None]:
def transform_labels(dataset):
    # Define the mapping
    label_mapping = {1: "NOHALL", 0: "HALL"}

    # Update the "label" column
    dataset = dataset.map(lambda x: {'label': label_mapping.get(x['label'], x['label'])})

    return dataset

clean_results = transform_labels(results['train'])

In [None]:
metrics = ['f1', 'precision', 'recall', 'accuracy', 'balanced_accuracy', 'mcc', 'kappa', 'log_loss', 'roc_values', 'auc', 'confusion_matrix', 'precision_recall_values']
manager = ScoreManager(clean_results, metrics, on_split="train")


print(manager.results)  # Outputs the calculated metrics

In [None]:
manager.plot(['metrics_bar', 'roc_curve', 'precision_recall_curve', 'confusion_matrix'], metrics_bar=['f1', 'precision', 'recall', 'accuracy'])

# Test

In [None]:
!python --version

In [None]:
!wget -O mini.sh https://repo.anaconda.com/miniconda/Miniconda3-py38_4.8.2-Linux-x86_64.sh
!chmod +x mini.sh
!bash ./mini.sh -b -f -p /usr/local
!conda install -q -y jupyter
!conda install -q -y google-colab -c conda-forge
!python -m ipykernel install --name "py38" --user

In [None]:
!python --version

In [None]:
!pip install torch===1.4.0 torchvision===0.5.0

In [None]:
def evaluate_dataset(dataset, map_kwargs=None):
    # Initialize map_kwargs with defaults if not provided
    if map_kwargs is None:
        map_kwargs = {}

    # Ensure some default values
    map_kwargs.setdefault("batched", False)
    map_kwargs.setdefault("num_proc", None)

    # A simple processing function
    def process(batch):
        return {"length": len(batch["text"])}

    # Call dataset.map with unpacked map_kwargs
    return dataset.map(process, **map_kwargs)


In [None]:
# Test dataset
data = {"text": ["This is a test.", "Another sentence."]}
dataset = Dataset.from_dict(data)

# Map arguments
map_kwargs = {"batched": True}

# Call the function
result = evaluate_dataset(dataset, map_kwargs)
print(result)


In [None]:
def inner_function(x, y, **kwargs):
    # Use the kwargs passed
    print(f"Inner x: {x}, y: {y}")
    print(f"Inner kwargs: {kwargs}")

def outer_function(a, b, inner_kwargs=None):
    # Define an inner function

    # Ensure inner_kwargs is a dictionary
    inner_kwargs = inner_kwargs or {}
    
    # Call the inner function, passing the unpacked inner_kwargs
    inner_function(a + 1, b + 1, **inner_kwargs)

# Call the outer function with optional inner_kwargs
outer_function(2, 3, inner_kwargs={"option1": "test", "option2": 42})


In [None]:
from datasets import load_dataset
cnndm_articles = load_dataset('cnn_dailymail', '3.0.0')