## Libraries

In [1]:
# %pip uninstall torch torchvision torchaudio -y
# %pip install torch==2.2.2+cu121 torchvision==0.17.2+cu121 torchaudio==2.2.2+cu121 -f https://download.pytorch.org/whl/torch_stable.html


In [2]:
from datasets import load_dataset
from datasets import Dataset
from transformers import AutoTokenizer
from transformers import DataCollatorForSeq2Seq
from transformers import pipeline
from transformers import AutoModelForSeq2SeqLM, Seq2SeqTrainingArguments, Seq2SeqTrainer
from transformers import AutoTokenizer
from transformers import AutoModelForSeq2SeqLM
import numpy as np
import pandas as pd
import evaluate

  from .autonotebook import tqdm as notebook_tqdm


# Reading Dataset

`The method loadDataset receieves the path where the datasets json files of the HaluEval repository are. You just need to pass your path and the name of the dataset you are going to use.`

## Dataset Names:
- summarization
- dialogue
- qa
- general


In [3]:
## As a recomendation keep these two with the same naming if you do not want to change many things
datasetName = "qa"
task = datasetName


In [4]:
def loadDataset(path="HaluEval/data", datasetName="qa"):
    data = pd.read_json(
        (path + "/" + datasetName + "_data.json"), lines=True
    )
    return data

## For this particular example we are loading the qa_data.json since is the one that takes the less time to process in case you want to test quickly how it works.

In [5]:
data = loadDataset(datasetName=datasetName)

In [6]:
data.head()

Unnamed: 0,knowledge,question,right_answer,hallucinated_answer
0,Arthur's Magazine (1844–1846) was an American ...,Which magazine was started first Arthur's Maga...,Arthur's Magazine,First for Women was started first.
1,The Oberoi family is an Indian family that is ...,The Oberoi family is part of a hotel company t...,Delhi,The Oberoi family's hotel company is based in ...
2,"Allison Beth ""Allie"" Goertz (born March 2, 199...",Musician and satirist Allie Goertz wrote a son...,President Richard Nixon,"Allie Goertz wrote a song about Milhouse, a po..."
3,"Margaret ""Peggy"" Seeger (born June 17, 1935) i...",What nationality was James Henry Miller's wife?,American,James Henry Miller's wife was British.
4,It is a hygroscopic solid that is highly solu...,Cadmium Chloride is slightly soluble in this c...,alcohol,water with a hint of alcohol


In [7]:
len(data)

10000

# Setting Device to use the GPU

We use the T4 GPU in Colab since the heaviest computation for us is the inference of the LLM-Evaluator. Therefore, T4 seem as the better fit.

In [8]:
import torch
print(torch.__version__)  # E.g., 2.2.1
print(torch.cuda.is_available())  # Should be True with CUDA support
print(torch.version.cuda)  # Should say 12.1 if matched

2.2.2+cu121
True
12.1


In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

## Generic LLMModel class to reuse the functionality of extracting the features.


In [10]:
import torch
from tqdm import tqdm
import spacy
from transformers import BartForConditionalGeneration, BartTokenizer

# Cargar el modelo de spaCy (español en este caso; cambia a 'en_core_web_sm' para inglés si lo prefieres)
nlp = spacy.load("en_core_web_sm")

class LLMModel:
    def __init__(self):
        self.model_name = None
        self.model = None
        self.tokenizer = None

    def getName(self) -> str:
        return self.model_name

    def getSanitizedName(self) -> str:
        return self.model_name.replace("/", "__")

    def getMaxLength(self):
        return self.model.config.max_position_embeddings

    def truncate_string_by_len(self, s, truncate_len):
        words = s.split()
        truncated_words = words[:-truncate_len] if truncate_len > 0 else words
        return " ".join(truncated_words)

    def getVocabProbsAtPos(self, pos, token_probs):
        sorted_probs, sorted_indices = torch.sort(token_probs[pos, :], descending=True)
        return sorted_probs

    def getDiffVocab(self, vocabProbs, tprob):
        return (vocabProbs[0] - tprob).item()

    def getDiffMaximumWithMinimum(self, vocabProbs):
        return (vocabProbs[0] - vocabProbs[-1]).item()

    def extractFeatures(
        self,
        knowledge="",
        conditionted_text="",
        generated_text="",
        features_to_extract={"mtp": True},
    ):
        total_len = len(knowledge) + len(conditionted_text) + len(generated_text)
        truncate_len = max(total_len - self.tokenizer.model_max_length, 0)

        # Truncar si es necesario
        knowledge = self.truncate_string_by_len(knowledge, truncate_len // 2)
        conditionted_text = self.truncate_string_by_len(
            conditionted_text, truncate_len - (truncate_len // 2)
        )

        # Tokenizar la entrada completa
        inputs = self.tokenizer(
            [knowledge + conditionted_text + generated_text],
            return_tensors="pt",
            max_length=self.getMaxLength(),
            truncation=True,
        )
        inputs = {key: val.to(device) for key, val in inputs.items()}

        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits

        probs = torch.nn.functional.softmax(logits, dim=-1)
        tokens_generated_length = len(self.tokenizer.tokenize(generated_text))
        start_index = logits.shape[1] - tokens_generated_length
        conditional_probs = probs[0, start_index:]

        token_ids_generated = inputs["input_ids"][0, start_index:].tolist()
        token_probs_generated = [
            conditional_probs[i, tid].item() for i, tid in enumerate(token_ids_generated)
        ]
        tokens_generated = self.tokenizer.convert_ids_to_tokens(token_ids_generated)

        # Filtrar tokens de fin de secuencia
        EOS_TOKENS = {'</s>', '<EOS>', '<eos>'}
        non_eos = [
            (token, prob)
            for token, prob in zip(tokens_generated, token_probs_generated)
            if token not in EOS_TOKENS
        ]
        if non_eos:
            min_prob_token, min_prob = min(non_eos, key=lambda x: x[1])
        else:
            min_prob_token, min_prob = None, None

        # Calcular características usando el token filtrado
        allFeatures = {"mtp": min_prob, "MDVTP": -1, "MMDVP": 100000000000}
        selectedFeatures = {}
        if features_to_extract.get("mtp", False):
            selectedFeatures["mtp"] = min_prob

        if features_to_extract.get("MDVTP", False) or features_to_extract.get("MMDVP", False):
            maximum_diff_with_vocab = -1
            minimum_vocab_extreme_diff = 100000000000
            size = len(token_probs_generated)
            for pos in range(size):
                vocabProbs = self.getVocabProbsAtPos(pos, conditional_probs)
                maximum_diff_with_vocab = max(
                    maximum_diff_with_vocab,
                    self.getDiffVocab(vocabProbs, token_probs_generated[pos]),
                )
                minimum_vocab_extreme_diff = min(
                    minimum_vocab_extreme_diff,
                    self.getDiffMaximumWithMinimum(vocabProbs),
                )
            if features_to_extract.get("MDVTP", False):
                selectedFeatures["MDVTP"] = maximum_diff_with_vocab
            if features_to_extract.get("MMDVP", False):
                selectedFeatures["MMDVP"] = minimum_vocab_extreme_diff

        # Ahora se retornan, además, el token y probabilidad filtrados
        return selectedFeatures, tokens_generated, token_probs_generated, min_prob_token, min_prob


    def get_mtp_by_pos(self, knowledge="", conditionted_text="", generated_text=""):
        features, tokens_generated, token_probs_generated, _, _ = self.extractFeatures(
            knowledge, conditionted_text, generated_text
        )


        generated_text_clean = self.tokenizer.convert_tokens_to_string(tokens_generated).replace("Ġ", " ").strip()
        doc = nlp(generated_text_clean)


        pos_probs = {}
        token_idx = 0
        for spacy_token in doc:
            word = spacy_token.text
            pos = spacy_token.pos_
            while token_idx < len(tokens_generated) and tokens_generated[token_idx].replace("Ġ", "").strip() in word:
                if pos not in pos_probs:
                    pos_probs[pos] = []
                pos_probs[pos].append(token_probs_generated[token_idx])
                token_idx += 1
                break
            if token_idx >= len(tokens_generated):
                break

        mtp_by_pos = {pos: min(probs) for pos, probs in pos_probs.items() if probs}
        return features, mtp_by_pos



In [11]:
class BartCNN(LLMModel):
    def __init__(self):
        super().__init__()  # Llamar primero a la inicialización de la clase base
        self.model_name = "facebook/bart-large-cnn"
        self.tokenizer = BartTokenizer.from_pretrained(self.model_name)
        self.model = BartForConditionalGeneration.from_pretrained(self.model_name)
        self.model.to(device)  # Mover el modelo al dispositivo después de cargarlo

    def generate(self, inpt):
        inputs = self.tokenizer(
            [inpt],
            max_length=self.getMaxLength(),
            return_tensors="pt",
            truncation=True
        )
        inputs = {key: value.to(self.model.device) for key, value in inputs.items()}
        summary_ids = self.model.generate(inputs["input_ids"])
        summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
        return summary

# The Dictionary `features_to_extract` defines which features will be use in this experiment.

## Features Meaning:

- `mtp` : Take the minimum of the probabilities that the LLM_E gives to the tokens on the generated-text.
- `avgtp` : Take the average of the probabilities that the LLM_E
gives to the tokens on the generated-text.
- `MDVTP` : Take the maximum from all the differences
between the token with the highest probability
according to LLM_E at position i and the
assigned probability from LLM_E to the token at position i in the generated_text.
- `MMDVP` : Take the maximum from all the differences between the token with the highest probability according to $LLM_E$ at position $i$ ($v^*$) and the token with the lowest probability according to $LLM_E$ at position $i$ ($v^-$).
- Probabilidad mínima del token: El valor más bajo entre las probabilidades de los tokens generados, que puede indicar incertidumbre.
- Probabilidad promedio del token: El promedio de las probabilidades, mostrando la confianza general del modelo.
- Diferencia máxima con la probabilidad más alta del vocabulario: Compara el token elegido con el más probable en el vocabulario, y si la diferencia es grande, podría ser una alucinación.
- Diferencia mínima entre máxima y mínima del vocabulario: Mide la dispersión de probabilidades, que puede indicar si el modelo está muy seguro o no.


In [12]:
feature_to_extract = 'mtp'

available_features_to_extract = ["mtp", "avgtp", "MDVTP", "MMDVP"]
if feature_to_extract == 'all':
    features_to_extract = {
        feature: True for feature in available_features_to_extract
    }
else:
    features_to_extract = {
        feature: True if feature == feature_to_extract else False
        for feature in available_features_to_extract
    }

features_to_extract

{'mtp': True, 'avgtp': False, 'MDVTP': False, 'MMDVP': False}

## Cleaning Cache on GPU to save memory

In [13]:
import torch
import gc

gc.collect()
torch.cuda.empty_cache()

## This cell is to instantiate the model you intend to use for the experiment

## This cell creates the dataset separation of `10%` for training and `90%` for testing depending on what task you are addressing. The following explanation is what happens if summarization is the task used. But the same explanation applies to all tasks and also you cand pass as parameter how many data points you want to include in training.

## Example: The data is separated on 2000 (1000 of document with right summary and 1000 with the same document but with the hallucinated summary). The rest which is 18000 is used to for testing.

### As expected from previous cells the task string expected are:
- `summarization`
- `qa`
- `dialogue`
- `general`

In [14]:
import random

# Extrae y organiza los campos relevantes de cada fila del dataset según la tarea y la opción de incluir conocimiento.
def loadRowData(taskName, row, includeKnowledge=False):
    print(taskName)
    if taskName == "summarization":
        return "", row["document"], row["right_summary"], row["hallucinated_summary"]
    elif taskName == "qa":
        if includeKnowledge:
            return (
                row["knowledge"],
                row["question"],
                row["right_answer"],
                row["hallucinated_answer"],
            )
        else:
            return "", row["question"], row["right_answer"], row["hallucinated_answer"]

    elif taskName == "dialogue":
        if includeKnowledge:
            return (
                row["knowledge"],
                row["dialogue_history"],
                row["right_response"],
                row["hallucinated_response"],
            )
        else:
            return (
                "",
                row["dialogue_history"],
                row["right_response"],
                row["hallucinated_response"],
            )

    elif taskName == "general":
        return (
            "",
            row["user_query"],
            row["chatgpt_response"],
            row["hallucination_label"],
        )

    else:
        raise Exception("Task not supported")

# Transforma la estructura del dataset para que cada ejemplo contenga una entrada (por ejemplo, el texto condicionado) y una etiqueta que indique si la respuesta es correcta o es una alucinación.
def adaptDataset(data, taskName):
    datasetAdapted = None
    if taskName == "general":
        # There is data point that is filling the <mask> token but that gives error with some LLMs
        datasetAdapted = [
            (
                (knowledge, document, response, 1)
                if hallu == "yes"
                else (knowledge, document, response, 0)
            )
            for knowledge, document, response, hallu in data
            if "<mask>" not in document and "<mask>" not in response
        ]

    elif taskName == "summarization" or taskName == "qa" or taskName == "dialogue":
        datasetAdapted = [
            (knowledge, document, right, 1)
            for knowledge, document, right, hallu in data
        ] + [
            (knowledge, document, hallu, 0)
            for knowledge, document, right, hallu in data
        ]

    else:
        raise Exception("Task not supported")

    random.shuffle(datasetAdapted)
    return datasetAdapted

# Divide el DataFrame original en conjuntos de entrenamiento, validación y prueba, adaptándolos para que estén listos para ser usados en un modelo o algoritmo de aprendizaje supervisado.
def splitDataset(
    data: pd.DataFrame,
    taskName: str,
    trainingSize: int,
    valSize: int,
    includeKnowledge=False,
):

    dataset = []
    for _, row in data.iterrows():
        knowledge, text, right, hallu = loadRowData(taskName, row, includeKnowledge)
        dataset.append((knowledge, text, right, hallu))

    random.shuffle(dataset)

    dataset_train = dataset[:trainingSize]  # Take only trainingSize
    dataset_val = (
        []
    )  # dataset[trainingSize:trainingSize + valSize] # Take only trainingSize
    dataset_test = dataset[trainingSize:]  # Take the rest as testing

    datasetAdaptedTrain = adaptDataset(dataset_train, taskName)
    datasetAdaptedValidation = adaptDataset(dataset_val, taskName)
    datasetAdaptedTest = adaptDataset(dataset_test, taskName)

    X_train = [(x, q, y) for x, q, y, _ in datasetAdaptedTrain]
    Y_train = [z for _, _, _, z in datasetAdaptedTrain]

    X_val = [(x, q, y) for x, q, y, _ in datasetAdaptedValidation]
    Y_val = [z for _, _, _, z in datasetAdaptedValidation]

    X_test = [(x, q, y) for x, q, y, _ in datasetAdaptedTest]
    Y_test = [z for _, _, _, z in datasetAdaptedTest]

    return X_train, Y_train, X_val, Y_val, X_test, Y_test

In [15]:
includeKnowledge = True
includeConditioned = True

In [16]:
X_train, Y_train, X_val, Y_val, X_test, Y_test = splitDataset(
    data, task, 1000, 0, includeKnowledge=includeKnowledge
)

qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
qa
q

In [17]:
print(len(X_train), len(Y_train))
print(len(X_val), len(Y_val))
print(len(X_test), len(Y_test))  # verify the sizes look right

2000 2000
0 0
18000 18000


## To Save the separation if needed

In [18]:
train_df = pd.DataFrame(
    {
        "Knowledge": [x[0] for x in X_train],
        "Conditioned Text": [x[1] for x in X_train],
        "Generated Text": [x[2] for x in X_train],
        "Label": Y_train,
    }
)

val_df = pd.DataFrame(
    {
        "Knowledge": [x[0] for x in X_val],
        "Conditioned Text": [x[1] for x in X_val],
        "Generated Text": [x[2] for x in X_val],
        "Label": Y_val,
    }
)

test_df = pd.DataFrame(
    {
        "Knowledge": [x[0] for x in X_test],
        "Conditioned Text": [x[1] for x in X_test],
        "Generated Text": [x[2] for x in X_test],
        "Label": Y_test,
    }
)

#Export to CSV
if includeKnowledge:
    train_df.to_csv( (task + '_knowledge_train_data.csv'), index=False)
    test_df.to_csv( (task + '_knowledge_test_data.csv'), index=False)
else:
    train_df.to_csv( (task + '_train_data.csv'), index=False)
    val_df.to_csv((task + '_val_data.csv'), index=False)
    test_df.to_csv( (task + '_test_data.csv'), index=False)

In [19]:
# Esta función permite transformar un DataFrame en dos listas de datos, donde cada elemento de X es una tupla formada por los valores de "Knowledge", "Conditioned Text" y "Generated Text" (según los flags de inclusión), y cada elemento de Y es la etiqueta correspondiente, facilitando así su uso en tareas de entrenamiento o evaluación de modelos.

def getXY(df: pd.DataFrame, includeKnowledge=True, includeConditioned=True):
    X = []
    Y = []

    # Iterate over rows using itertuples
    for _, row in df.iterrows():
        x, c, g = (
            row["Knowledge"] if includeKnowledge else "",
            row["Conditioned Text"] if includeConditioned else "",
            row["Generated Text"],
        )
        y = row["Label"]

        # Append values to respective lists
        X.append((x, c, g))
        Y.append(y)
    return X, Y

In [20]:
X_train, Y_train = getXY(
    train_df, includeKnowledge=includeKnowledge, includeConditioned=includeConditioned
)
X_val, Y_val = getXY(
    val_df, includeKnowledge=includeKnowledge, includeConditioned=includeConditioned
)
X_test, Y_test = getXY(
    test_df, includeKnowledge=includeKnowledge, includeConditioned=includeConditioned
)

In [21]:
print(len(X_train), len(Y_train))
print(len(X_val), len(Y_val))
print(len(X_test), len(Y_test))  # verify the sizes look right

2000 2000
0 0
18000 18000


In [22]:
X_test[0]

('The Rhymers\' Club was a group of London-based male poets, founded in 1890 by W. B. Yeats and Ernest Rhys. They met at the London pub ‘Ye Olde Cheshire Cheese’ in Fleet Street and in the \'Domino Room\' of the "Café Royal".Ye Olde Cheshire Cheese is a Grade II listed public house at 145 Fleet Street, on Wine Office Court, City of London.',
 'What was the address of the public house where a group of London-based male poets, founded in 1890 met?',
 '145 Fleet Street')

In [23]:
Y_test[0]

1

## Extracting the features for the Training Data

In [24]:
import torch
from tqdm import tqdm

# El código recorre todos los ejemplos del conjunto de entrenamiento, extrae características relevantes mediante un método del modelo y almacena los resultados en una lista.

def extract_features(
    knowledge: str,
    conditioned_text: str,
    generated_text: str,
    features_to_extract: dict[str, bool],
):
    # Obtener características y mapeo por posición
    features, mtp_by_pos = model.get_mtp_by_pos(knowledge, conditioned_text, generated_text)

    # Extraer tokens y probabilidades; desempaquetamos solo lo necesario
    _, tokens_generated, token_probs_generated, _, _ = model.extractFeatures(
        knowledge, conditioned_text, generated_text, features_to_extract
    )

    # Limpiar tokens: quitar el prefijo "Ġ" y eliminar signos de puntuación
    import string
    clean_tokens = [t.replace("Ġ", "") for t in tokens_generated]
    clean_tokens = [t.translate(str.maketrans("", "", string.punctuation)) for t in clean_tokens]

    # Definir tokens de fin de secuencia (ajusta según tu tokenizer)
    EOS_TOKENS = {'</s>', '<EOS>', '<eos>'}
    candidatos = [
        (token, prob)
        for token, prob in zip(clean_tokens, token_probs_generated)
        if token not in EOS_TOKENS and token.strip() != "" and not (len(token) == 1 and token.lower() not in {"a", "i"})
    ]



    if candidatos:
        # Seleccionar el token con la probabilidad mínima entre los candidatos
        min_prob_token, min_prob = min(candidatos, key=lambda x: x[1])
    else:
        min_prob_token, min_prob = None, None

    return {
        "generated_text": generated_text,
        "features": features,
        "mtp_by_pos": mtp_by_pos,
        "min_prob_token": min_prob_token,
        "min_prob": min_prob
    }




model = BartCNN()
X_train_features_maps = []

for i in tqdm(range(100), desc="Processing"):
    try:
        knowledge, conditioned_text, generated_text = X_train[i][:3]
        result = extract_features(knowledge, conditioned_text, generated_text, {"mtp": True})
        # Comprobar si el diccionario mtp_by_pos está vacío y descartar la muestra
        if result["mtp_by_pos"]:
            X_train_features_maps.append(result)
        else:
            print(f"Ejemplo {i + 1} descartado: mtp_by_pos está vacío.")
    except Exception as e:
        print(f"Error en el ejemplo {i + 1}: {e}")
    finally:
        torch.cuda.empty_cache()


  attn_output = torch.nn.functional.scaled_dot_product_attention(
Processing:   8%|▊         | 8/100 [00:01<00:11,  7.80it/s]

Ejemplo 7 descartado: mtp_by_pos está vacío.


Processing:  17%|█▋        | 17/100 [00:02<00:08, 10.11it/s]

Ejemplo 16 descartado: mtp_by_pos está vacío.


Processing:  23%|██▎       | 23/100 [00:03<00:07,  9.74it/s]

Ejemplo 23 descartado: mtp_by_pos está vacío.


Processing:  30%|███       | 30/100 [00:04<00:07,  9.58it/s]

Ejemplo 29 descartado: mtp_by_pos está vacío.
Ejemplo 30 descartado: mtp_by_pos está vacío.


Processing:  36%|███▌      | 36/100 [00:04<00:06,  9.48it/s]

Ejemplo 35 descartado: mtp_by_pos está vacío.


Processing:  48%|████▊     | 48/100 [00:06<00:05,  8.69it/s]

Ejemplo 47 descartado: mtp_by_pos está vacío.


Processing:  50%|█████     | 50/100 [00:06<00:05,  9.52it/s]

Ejemplo 49 descartado: mtp_by_pos está vacío.


Processing:  55%|█████▌    | 55/100 [00:06<00:04, 10.26it/s]

Ejemplo 54 descartado: mtp_by_pos está vacío.
Ejemplo 55 descartado: mtp_by_pos está vacío.


Processing:  59%|█████▉    | 59/100 [00:07<00:03, 10.32it/s]

Ejemplo 59 descartado: mtp_by_pos está vacío.


Processing:  63%|██████▎   | 63/100 [00:07<00:03, 10.44it/s]

Ejemplo 62 descartado: mtp_by_pos está vacío.


Processing:  79%|███████▉  | 79/100 [00:09<00:02, 10.31it/s]

Ejemplo 77 descartado: mtp_by_pos está vacío.
Ejemplo 79 descartado: mtp_by_pos está vacío.


Processing:  98%|█████████▊| 98/100 [00:11<00:00,  9.90it/s]

Ejemplo 97 descartado: mtp_by_pos está vacío.


Processing: 100%|██████████| 100/100 [00:11<00:00,  8.85it/s]


In [25]:
for i, result in enumerate(X_train_features_maps):
    print(f"\nEjemplo {i + 1}:")
    print(f"Generated Text: '{result['generated_text']}'")
    print("Características:", result["features"])
    print("MTP por categoría gramatical:", result["mtp_by_pos"])
    print(f"Token con probabilidad más baja: '{result['min_prob_token']}' (Probabilidad: {result['min_prob']})")
    print("--------------------------------------------------------------")
    print("\n\n")


Ejemplo 1:
Generated Text: 'General Motors'
Características: {'mtp': 0.9077625274658203}
MTP por categoría gramatical: {'PROPN': 0.9077625274658203}
Token con probabilidad más baja: 'Motors' (Probabilidad: 0.9077625274658203)
--------------------------------------------------------------




Ejemplo 2:
Generated Text: 'The Kwahu East District with the capital of Abetifi is in the Republic of Africa Ghana.'
Características: {'mtp': 0.8618128299713135}
MTP por categoría gramatical: {'PROPN': 0.9314356446266174}
Token con probabilidad más baja: 'Africa' (Probabilidad: 0.8618128299713135)
--------------------------------------------------------------




Ejemplo 3:
Generated Text: 'City of London'
Características: {'mtp': 0.9041664004325867}
MTP por categoría gramatical: {'ADP': 0.9041664004325867, 'PROPN': 0.9295479655265808}
Token con probabilidad más baja: 'of' (Probabilidad: 0.9041664004325867)
--------------------------------------------------------------




Ejemplo 4:
Generated Tex

In [26]:
print(X_train_features_maps)

[{'generated_text': 'General Motors', 'features': {'mtp': 0.9077625274658203}, 'mtp_by_pos': {'PROPN': 0.9077625274658203}, 'min_prob_token': 'Motors', 'min_prob': 0.9077625274658203}, {'generated_text': 'The Kwahu East District with the capital of Abetifi is in the Republic of Africa Ghana.', 'features': {'mtp': 0.8618128299713135}, 'mtp_by_pos': {'PROPN': 0.9314356446266174}, 'min_prob_token': 'Africa', 'min_prob': 0.8618128299713135}, {'generated_text': 'City of London', 'features': {'mtp': 0.9041664004325867}, 'mtp_by_pos': {'ADP': 0.9041664004325867, 'PROPN': 0.9295479655265808}, 'min_prob_token': 'of', 'min_prob': 0.9041664004325867}, {'generated_text': 'Andrew Harwood Mills is also Irish.', 'features': {'mtp': 0.9059568643569946}, 'mtp_by_pos': {'PROPN': 0.966061532497406}, 'min_prob_token': 'is', 'min_prob': 0.9059568643569946}, {'generated_text': 'anti-sharia movement', 'features': {'mtp': 0.895056962966919}, 'mtp_by_pos': {'PROPN': 0.895056962966919}, 'min_prob_token': 'aria'

In [27]:
X_train_features_maps[0]

{'generated_text': 'General Motors',
 'features': {'mtp': 0.9077625274658203},
 'mtp_by_pos': {'PROPN': 0.9077625274658203},
 'min_prob_token': 'Motors',
 'min_prob': 0.9077625274658203}

In [34]:
processed_data = []
all_pos_keys = set()

for item in X_train_features_maps:
    if 'mtp_by_pos' in item:
        all_pos_keys.update(item['mtp_by_pos'].keys())

sorted_pos_keys = sorted(list(all_pos_keys))
print(f"Claves POS encontradas en mtp_by_pos: {sorted_pos_keys}")

Claves POS encontradas en mtp_by_pos: ['ADJ', 'ADP', 'ADV', 'AUX', 'CCONJ', 'DET', 'NOUN', 'NUM', 'PART', 'PRON', 'PROPN', 'PUNCT', 'SCONJ', 'SYM', 'VERB', 'X']


In [36]:
for item in X_train_features_maps:
    flat_features = {}

    # 1. Añadir características generales de 'features'
    if 'features' in item:
        flat_features.update(item['features']) # Copia todas las claves/valores de features

    # 2. Añadir características de 'mtp_by_pos' como columnas separadas
    # Inicializa todas las claves POS a 0 para esta muestra
    for pos_key in sorted_pos_keys:
        flat_features[f'mtp_{pos_key}'] = 0.0

    # Actualiza con los valores reales que sí existen para esta muestra
    if 'mtp_by_pos' in item:
        for pos_key, value in item['mtp_by_pos'].items():
            flat_features[f'mtp_{pos_key}'] = value

    processed_data.append(flat_features)

processed_data

[{'mtp': 0.9077625274658203,
  'mtp_ADJ': 0.0,
  'mtp_ADP': 0.0,
  'mtp_ADV': 0.0,
  'mtp_AUX': 0.0,
  'mtp_CCONJ': 0.0,
  'mtp_DET': 0.0,
  'mtp_NOUN': 0.0,
  'mtp_NUM': 0.0,
  'mtp_PART': 0.0,
  'mtp_PRON': 0.0,
  'mtp_PROPN': 0.9077625274658203,
  'mtp_PUNCT': 0.0,
  'mtp_SCONJ': 0.0,
  'mtp_SYM': 0.0,
  'mtp_VERB': 0.0,
  'mtp_X': 0.0},
 {'mtp': 0.8618128299713135,
  'mtp_ADJ': 0.0,
  'mtp_ADP': 0.0,
  'mtp_ADV': 0.0,
  'mtp_AUX': 0.0,
  'mtp_CCONJ': 0.0,
  'mtp_DET': 0.0,
  'mtp_NOUN': 0.0,
  'mtp_NUM': 0.0,
  'mtp_PART': 0.0,
  'mtp_PRON': 0.0,
  'mtp_PROPN': 0.9314356446266174,
  'mtp_PUNCT': 0.0,
  'mtp_SCONJ': 0.0,
  'mtp_SYM': 0.0,
  'mtp_VERB': 0.0,
  'mtp_X': 0.0},
 {'mtp': 0.9041664004325867,
  'mtp_ADJ': 0.0,
  'mtp_ADP': 0.9041664004325867,
  'mtp_ADV': 0.0,
  'mtp_AUX': 0.0,
  'mtp_CCONJ': 0.0,
  'mtp_DET': 0.0,
  'mtp_NOUN': 0.0,
  'mtp_NUM': 0.0,
  'mtp_PART': 0.0,
  'mtp_PRON': 0.0,
  'mtp_PROPN': 0.9295479655265808,
  'mtp_PUNCT': 0.0,
  'mtp_SCONJ': 0.0,
  'mtp_SYM

In [37]:
X_train_df = pd.DataFrame(processed_data)

# Rellenar posibles valores NaN que puedan quedar (si alguna 'features' faltara en algún item) con 0
X_train_df = X_train_df.fillna(0)

print("\nDataFrame de características procesado (X_train_df):")
print(X_train_df)


DataFrame de características procesado (X_train_df):
          mtp   mtp_ADJ   mtp_ADP  mtp_ADV  mtp_AUX  mtp_CCONJ  mtp_DET  \
0    0.907763  0.000000  0.000000      0.0      0.0        0.0      0.0   
1    0.861813  0.000000  0.000000      0.0      0.0        0.0      0.0   
2    0.904166  0.000000  0.904166      0.0      0.0        0.0      0.0   
3    0.905957  0.000000  0.000000      0.0      0.0        0.0      0.0   
4    0.895057  0.000000  0.000000      0.0      0.0        0.0      0.0   
..        ...       ...       ...      ...      ...        ...      ...   
165  0.873748  0.000000  0.000000      0.0      0.0        0.0      0.0   
166  0.884459  0.000000  0.000000      0.0      0.0        0.0      0.0   
167  0.876126  0.890097  0.000000      0.0      0.0        0.0      0.0   
168  0.926292  0.000000  0.000000      0.0      0.0        0.0      0.0   
169  0.853764  0.000000  0.000000      0.0      0.0        0.0      0.0   

     mtp_NOUN  mtp_NUM  mtp_PART  mtp_PRON  m

In [42]:
Y_train_subset = Y_train[:100]


In [43]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

In [28]:
X_train_features = [list(dic.values()) for dic in X_train_features_maps]

In [29]:
len(X_train_features)

85

In [30]:
X_train_features[0]

['General Motors',
 {'mtp': 0.9077625274658203},
 {'PROPN': 0.9077625274658203},
 'Motors',
 0.9077625274658203]

## Training Logistic Regression

In [31]:
from sklearn.linear_model import LogisticRegression

clf = LogisticRegression(verbose=1)
clf.fit(X_train_features, Y_train)

ValueError: could not convert string to float: 'General Motors'

## Evaluate accuracy of Logistic Regression on the training set

In [57]:
from sklearn.metrics import accuracy_score

Y_Pred = clf.predict(X_train_features)

accuracy = accuracy_score(Y_train, Y_Pred)
print(f"Accuracy: {accuracy * 100:.2f}%")

Accuracy: 89.40%


In [58]:
log_odds = clf.coef_[0]
odds = np.exp(clf.coef_[0])
lr_features_log = {k: v for k, v in zip(X_train_features_maps[0].keys(), log_odds)}
lr_features_no_log = {k: v for k, v in zip(X_train_features_maps[0].keys(), odds)}

print("log", lr_features_log)
print("no_log", lr_features_no_log)

log {'mtp': -7.111009509658728}
no_log {'mtp': 0.0008160707448331231}


## Extracting the Features of the Validation Set

## Extracting the Features of the Test Set

In [59]:
from tqdm import tqdm

X_test_features_map = []

for knowledge, conditioned_text, generated_text in tqdm(X_test, desc="Processing"):
    X_test_features_map.append(
        extract_features(
            knowledge, conditioned_text, generated_text, features_to_extract
        )
    )
    torch.cuda.empty_cache()

Processing: 100%|██████████| 18000/18000 [16:26<00:00, 18.25it/s]


In [60]:
X_test_features = [list(dic.values()) for dic in X_test_features_map]

## Evaluate accuracy of the LogisticRegression on the testing set

In [61]:
from sklearn.metrics import accuracy_score

Y_Pred = clf.predict(X_test_features)

lr_accuracy = accuracy_score(Y_test, Y_Pred)
print(f"Accuracy: {lr_accuracy * 100:.2f}%")

Accuracy: 88.33%


In [62]:
log_odds = clf.coef_[0]
pd.DataFrame(log_odds, X_train_features_maps[0].keys(), columns=["coef"]).sort_values(
    by="coef", ascending=False
)

Unnamed: 0,coef
mtp,-7.11101


In [63]:
odds = np.exp(clf.coef_[0])
pd.DataFrame(odds, X_train_features_maps[0].keys(), columns=["coef"]).sort_values(
    by="coef", ascending=False
)

Unnamed: 0,coef
mtp,0.000816


In [64]:
import torch.nn as nn


class SimpleDenseNet(nn.Module):
    def __init__(self, input_dim: int, hidden_dim: int, output_dim=1, dropout_prob=0.3):
        super(SimpleDenseNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.sigmoid(x)
        return x

In [65]:
denseModel = SimpleDenseNet(
    input_dim=np.array([v for v in features_to_extract.values()]).sum(), hidden_dim=512
).to(device)

# Code declaring and computing all the metrics to measure

In [66]:
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    roc_auc_score,
    precision_recall_curve,
    auc
)


def compute_metrics(model, input_tensor, true_labels):
    with torch.no_grad():
        outputs = model(input_tensor)
        predicted_probs = torch.sigmoid(outputs).cpu().numpy()
        predicted = (outputs > 0.5).float().cpu().numpy()

        true_labels = true_labels.cpu().numpy()

        acc = accuracy_score(true_labels, predicted)
        precision = precision_score(true_labels, predicted)
        recall = recall_score(true_labels, predicted)
        f1 = f1_score(true_labels, predicted)

        precision_negative = precision_score(true_labels, predicted, pos_label=0)
        recall_negative = recall_score(true_labels, predicted, pos_label=0)
        f1_negative = f1_score(true_labels, predicted, pos_label=0)

        tn, fp, fn, tp = confusion_matrix(true_labels, predicted).ravel()
        roc_auc = roc_auc_score(true_labels, predicted_probs)

        P, R, thre = precision_recall_curve(true_labels, predicted, pos_label=1)
        pr_auc = auc(R, P)

        roc_auc_negative = roc_auc_score(
            true_labels, 1 - predicted_probs
        )  # If predicted_probs is the probability of the positive class
        P_neg, R_neg, _ = precision_recall_curve(true_labels, predicted, pos_label=0)
        pr_auc_negative = auc(R_neg, P_neg)

        return {
            "Accuracy": acc,
            "Precision": precision,
            "Recall": recall,
            "F1": f1,
            "TP": tp,
            "TN": tn,
            "FP": fp,
            "FN": fn,
            "ROC AUC": roc_auc,
            "PR AUC": pr_auc,
            "Precision-Negative": precision_negative,
            "Recall-Negative": recall_negative,
            "F1-Negative": f1_negative,
            "ROC AUC-Negative": roc_auc_negative,
            "PR AUC-Negative": pr_auc_negative,
        }

## Code for training the Dense Model and getting the result of all metrics corresponding to the Testing Set.

In [67]:
def compute_accuracy(model, input_tensor, true_labels):
    with torch.no_grad():
        outputs = model(input_tensor)
        predicted = (outputs > 0.5).float()
        correct = (predicted == true_labels).float().sum()
        accuracy = correct / len(true_labels)
        return accuracy.item()


X_train_tensor = torch.tensor(X_train_features, dtype=torch.float32).to(device)
Y_train_tensor = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)

print(X_train_tensor.shape, Y_train_tensor.shape)

# Define loss and optimizer
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(denseModel.parameters(), lr=0.001)

bestValAcc = 0
# Training loop
num_epochs = 20000
for epoch in range(num_epochs):
    denseModel.train()
    optimizer.zero_grad()
    outputs = denseModel(X_train_tensor)
    loss = criterion(outputs, Y_train_tensor)
    loss.backward()
    optimizer.step()

    # Compute training accuracy
    train_accuracy = compute_accuracy(denseModel, X_train_tensor, Y_train_tensor)

    # Uncomment this if you want to see how the accuracy of testing improves during the training process.
    ##Compute testing accuracy
    # X_val_tensor = torch.tensor(X_val_features, dtype=torch.float32).to(device)
    # Y_val_tensor = torch.tensor(Y_val, dtype=torch.float32).view(-1, 1).to(device)

    # val_accuracy = compute_accuracy(denseModel, X_val_tensor, Y_val_tensor)

    # if bestValAcc < val_accuracy:
    #     bestValAcc = val_accuracy
    #     print(f'Saving model with best validation accuracy ...')
    #     torch.save(denseModel.state_dict(), 'llama-' + task + '-best-model')

    if (epoch + 1) % 10 == 0:
        print(
            f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Training Accuracy: {train_accuracy:.4f}"
        )  # , "Validation Accuracy": {val_accuracy:.4f}')

torch.Size([2000, 1]) torch.Size([2000, 1])
Epoch [10/20000], Loss: 0.3119, Training Accuracy: 0.8910
Epoch [20/20000], Loss: 0.2958, Training Accuracy: 0.9050
Epoch [30/20000], Loss: 0.2859, Training Accuracy: 0.9055
Epoch [40/20000], Loss: 0.2829, Training Accuracy: 0.9025
Epoch [50/20000], Loss: 0.2816, Training Accuracy: 0.9035
Epoch [60/20000], Loss: 0.2812, Training Accuracy: 0.9040
Epoch [70/20000], Loss: 0.2810, Training Accuracy: 0.9040
Epoch [80/20000], Loss: 0.2810, Training Accuracy: 0.9040
Epoch [90/20000], Loss: 0.2809, Training Accuracy: 0.9035
Epoch [100/20000], Loss: 0.2809, Training Accuracy: 0.9035
Epoch [110/20000], Loss: 0.2808, Training Accuracy: 0.9035
Epoch [120/20000], Loss: 0.2808, Training Accuracy: 0.9035
Epoch [130/20000], Loss: 0.2808, Training Accuracy: 0.9035
Epoch [140/20000], Loss: 0.2808, Training Accuracy: 0.9035
Epoch [150/20000], Loss: 0.2807, Training Accuracy: 0.9035
Epoch [160/20000], Loss: 0.2807, Training Accuracy: 0.9035
Epoch [170/20000], Lo

## Uncomment next cell if you want to load a particular model you already trained.

In [46]:
# loaded_model = SimpleDenseNet(input_dim=len(list(features_to_extract.keys())), hidden_dim=512).to(device)
# loaded_model.load_state_dict(torch.load('llama-' + task + '-best-model'))

# # Set the model to evaluation mode"
# loaded_model.eval()

#Compute the metrics using the model on the Test Set.

In [68]:
X_test_tensor = torch.tensor(X_test_features, dtype=torch.float32).to(device)
Y_test_tensor = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)

test_metrics = compute_metrics(denseModel, X_test_tensor, Y_test_tensor)

print(
    f"Testing - Accuracy: {test_metrics['Accuracy']:.4f}, Precision: {test_metrics['Precision']:.4f}, Recall: {test_metrics['Recall']:.4f}, F1: {test_metrics['F1']:.4f}, ROC AUC: {test_metrics['ROC AUC']:.4f}, PR AUC: {test_metrics['PR AUC']:.4f}"
)
print(
    f"Testing - Negative: {test_metrics['Accuracy']:.4f}, Precision-Negative: {test_metrics['Precision-Negative']:.4f}, Recall-Negative: {test_metrics['Recall-Negative']:.4f}, F1-Negative: {test_metrics['F1-Negative']:.4f}, ROC AUC-Negative: {test_metrics['ROC AUC-Negative']:.4f}, PR AUC-Negative: {test_metrics['PR AUC-Negative']:.4f}"
)

Testing - Accuracy: 0.8897, Precision: 0.8908, Recall: 0.8883, F1: 0.8896, ROC AUC: 0.9377, PR AUC: 0.9175
Testing - Negative: 0.8897, Precision-Negative: 0.8886, Recall-Negative: 0.8911, F1-Negative: 0.8899, ROC AUC-Negative: 0.0623, PR AUC-Negative: 0.3318


## Save the results on a CSV if you want.

In [None]:
model_dataframe = pd.DataFrame(
    columns=[
        "features",
        "model_name",
        "feature_to_extract",
        "method",
        "accuracy",
        "precision",
        "recall",
        "roc auc",
        "pr auc",
        "negative",
        "precision-negative",
        "recall-negative",
        "negative f1",
        "lr_accuracy",
        "lr_features_log",
        "lr_features_no_log",
    ]
)

In [None]:
d = {
    "features": features_to_extract,
    "model_name": str(model.getName()),
    "feature_to_extract": feature_to_extract,
    "method": "TEST",
    "accuracy": test_metrics["Accuracy"],
    "precision": test_metrics["Precision"],
    "recall": test_metrics["Recall"],
    "f1": test_metrics["F1"],
    "pr auc": test_metrics["PR AUC"],
    "precision-negative": test_metrics["Precision-Negative"],
    "recall-negative": test_metrics["Recall-Negative"],
    "negative-f1": test_metrics["F1-Negative"],
    "lr_accuracy": lr_accuracy,
    "lr_features_log": lr_features_log,
    "lr_features_no_log": lr_features_no_log,
}

model_dataframe.loc[len(model_dataframe.index)] = d

In [None]:
model_dataframe.head()

Unnamed: 0,features,model_name,feature_to_extract,method,accuracy,precision,recall,roc auc,pr auc,negative,precision-negative,recall-negative,negative f1,lr_accuracy,lr_features_log,lr_features_no_log
0,"{'mtp': True, 'avgtp': False, 'MDVTP': False, ...",facebook/bart-large-cnn,mtp,TEST,0.890556,0.880494,0.903778,,0.916191,,0.901164,0.877333,,0.884222,{'mtp': -6.95953145273689},{'mtp': 0.000949541375340262}


In [None]:
csv_name = f"{model.getSanitizedName()}_{task}_{includeKnowledge=}_{includeConditioned=}_{'_'.join([f'{k}={v}' for k, v in features_to_extract.items()])}.csv"
print(csv_name)
model_dataframe.to_csv(csv_name, index=False)

facebook__bart-large-cnn_qa_includeKnowledge=True_includeConditioned=True_mtp=True_avgtp=False_MDVTP=False_MMDVP=False.csv
