In [1]:
import os
os.chdir("..")
from engine.ner_detector import tokenize_evaluate_and_detect_NERs
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    AutoConfig,
    TextClassificationPipeline,
)
import pickle
import seaborn as sns
import numpy as np
from typing import Any
from engine.data import prepare_data_for_fine_tuning, read_data
import torch
import pandas as pd
import random
import re
import copy
import spacy

def get_device():
    if torch.cuda.is_available():
        print("CUDA is available. Using GPU.")
        return "cuda"
    else:
        print("CUDA not available. Using CPU.")
        return "cpu"
    
device = get_device()

2025-01-21 21:08:11.310667: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-01-21 21:08:11.319297: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1737490091.330669  108375 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1737490091.334125  108375 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-01-21 21:08:11.346049: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

CUDA is available. Using GPU.


In [2]:
os.getcwd()

'/home/dawid/studies/master-2/master24nlp/nlp-2024-fake'

In [19]:
from pathlib import Path

models = Path("output")
all_files = [path for path in models.rglob("model_final/model.safetensors")]
accuracies = []
# for file in all_files:
    # with open(file.parent.with_name("test_acc.json"), "r") as f:
        # accuracies.append(float(f.readline().strip()))

models = [(path.parts[2], path.parts[1], path.parts[3], path.parts[4]) for path in all_files]
model_df = pd.DataFrame(models, columns=["model", "dataset", "training_type", "run"])
# model_df["accuracy"] = accuracies
# model_df = model_df.iloc[model_df.groupby(["model", "dataset", "training_type"])["accuracy"].idxmax(), ]
model_df = model_df[model_df["model"] != "roberta"]
model_df

# model_df = pd.DataFrame(['ernie', 'coaid', 'masked', '1']).T
# model_df.rename({0: 'model', 1: 'dataset', 2: 'training_type', 3: 'run'}, inplace=True, axis=1)
# model_df

Unnamed: 0,model,dataset,training_type,run
1,ernie,liar,unmasked,3
3,ernie,isot,unmasked,1
5,ernie,coaid,unmasked,1


In [20]:
def get_person_relative_importance(pipeline, 
                                  test_dataset):
    res2 = tokenize_evaluate_and_detect_NERs(pipeline, 
                                  test_dataset['text'], 
                                  spacy_model="en_core_web_lg",
                                  return_mappings_for_each_text=True)
    ratios = []
    for sentence in res2:
        avg_per_imp = np.array(list(map(lambda x: abs(x[1]), filter(lambda z: z[2] == 'PERSON', sentence)))).mean()
        avg_imp = np.array(list(map(lambda x: abs(x[1]), sentence))).mean()
        ratios.append(avg_per_imp / avg_imp)
        
    return ratios

In [21]:
def get_map_person_importance(res):
    persons = list(map(lambda x: x[0], filter(lambda x: x[2] == 'PERSON', res)))
    importances = list(map(lambda x: x[1], filter(lambda x: x[2] == 'PERSON', res)))
    
    nlp = spacy.load("en_core_web_lg")
    
    persons_unique = {}
    per = ''

    idx = 0
    imp = 0
    cnt = 0
    while idx != len(persons):
        if per == '':
            per = persons[idx]
            imp = importances[idx]
            cnt = 1
            idx += 1
        elif persons[idx][:2] == '##':
            per = per + persons[idx][2:]
            imp += importances[idx]
            cnt += 1
            idx += 1
        else:
            if per not in persons_unique.keys():
                persons_unique[per] = []
            persons_unique[per].append(imp / cnt)
            cnt = 0
            per = ''
            
    new_persons = {}
    for key in persons_unique:
        
        doc = nlp(key)
        is_ok = False
        for ent in doc.ents:
            if ent.label_ == "PERSON":
                is_ok = True
                break
        
        if is_ok:
            new_persons[key.lower()] = np.mean(persons_unique[key])
        
    return new_persons

In [22]:
def get_top_persons(persons_unique, negative = False, n = 5):
    importance = list(persons_unique.values())
    persons = list(persons_unique.keys())
    importance = np.array(importance)
    persons = np.array(persons)
    if not negative:
        importance = -importance
    top_persons = persons[np.argsort(importance)[:n]]
    return top_persons.tolist()

In [23]:
def pipeline_out_to_vec(pipeline_out):
    preds = []
    for out in pipeline_out:
        if out[0]['label'] == 'LABEL_1':
            preds.append(out[0]['score'])
        else:
            preds.append(out[1]['score'])
            
    return preds

In [24]:
def find_random_person_words(sentence, persons):
    found = set(sentence.lower().split()).intersection(persons)
    if len(found) == 0:
        return 'NOT EXIST'
    else:
        return list(found)[np.random.choice(len(found))]

In [25]:
def convert_prediction(pred):
    if pred[0]["label"] == "LABEL_1":
        return pred[0]["score"]
    else:
        return pred[1]["score"]

results = {}
results_misc = {}

for _, row in model_df.iterrows():
    
    results_misc['-'.join([row["dataset"], row["model"], row["training_type"]])] = {}
    
    print(row["dataset"])
    model_path = Path("output", row["dataset"], row["model"], row["training_type"], row["run"], "model_final", "model.safetensors")
    model_id = "roberta-base" if row["model"] == "roberta" else "nghuyong/ernie-2.0-base-en"
    config = AutoConfig.from_pretrained(model_id)
    model = AutoModelForSequenceClassification.from_pretrained(
        model_path, config=config
    )
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    data = pd.read_csv(Path("data", row["dataset"], "test.csv"), header=0)
    test_dataset = prepare_data_for_fine_tuning(data, tokenizer)
    test_dataset = test_dataset.select(np.arange(min(1000, test_dataset.shape[0])))
    model.eval()
    pipeline = TextClassificationPipeline(
        model=model, tokenizer=tokenizer, top_k=2, device=device
    )

    if(device == "cuda"):
        model.cuda()
    else:
        model.cpu()
        
    ratios = get_person_relative_importance(pipeline, test_dataset)
    results_misc['-'.join([row["dataset"], row["model"], row["training_type"]])]['ratios'] = ratios
    
    
    res = tokenize_evaluate_and_detect_NERs(pipeline, 
                                  test_dataset['text'], 
                                  spacy_model="en_core_web_lg")
    
    person_importance_mapping = get_map_person_importance(res)
    top_positive_persons = get_top_persons(person_importance_mapping, negative=False, n=10)
    top_negative_persons = get_top_persons(person_importance_mapping, negative=True, n=10)
    
    orig_pred = pipeline_out_to_vec(pipeline(test_dataset["text"]))
    preds = orig_pred
    
    replacements = []
    test_counterfactuals = test_dataset.to_pandas().copy()

    for ix in test_counterfactuals.index:
        top_per_idx = np.random.choice(10)
        if preds[ix] > 0.5:
            person_to_add = top_negative_persons[top_per_idx]
        else:
            person_to_add = top_positive_persons[top_per_idx]
            
        text = test_counterfactuals.loc[ix, ["text"]].get(0)
        person_to_remove = find_random_person_words(text.lower(), person_importance_mapping.keys())
        test_counterfactuals.loc[ix, ["text"]] = text.lower().replace(person_to_remove, person_to_add)
        
        replacements.append((person_to_remove, person_to_add))
            
    
    adv_pred = pipeline_out_to_vec(pipeline(test_counterfactuals["text"].to_list()))
    
    
    results['-'.join([row["dataset"], row["model"], row["training_type"]])] = {"orig_pred": orig_pred, "adv_pred": adv_pred, "replacements": replacements}


liar


Map:   0%|          | 0/795 [00:00<?, ? examples/s]

0it [00:00, ?it/s]We strongly recommend passing in an `attention_mask` since your input_ids may be padded. See https://huggingface.co/docs/transformers/troubleshooting#incorrect-output-when-padding-tokens-arent-masked.
795it [01:21,  9.76it/s]
100%|██████████| 795/795 [00:00<00:00, 65250.02it/s]
  avg_per_imp = np.array(list(map(lambda x: abs(x[1]), filter(lambda z: z[2] == 'PERSON', sentence)))).mean()
  ret = ret.dtype.type(ret / rcount)
795it [01:22,  9.60it/s]
100%|██████████| 795/795 [00:00<00:00, 55482.98it/s]
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, 

isot


Map:   0%|          | 0/8980 [00:00<?, ? examples/s]

1000it [01:28, 11.31it/s]
100%|██████████| 1000/1000 [00:00<00:00, 89859.97it/s]
  avg_per_imp = np.array(list(map(lambda x: abs(x[1]), filter(lambda z: z[2] == 'PERSON', sentence)))).mean()
  ret = ret.dtype.type(ret / rcount)
1000it [01:31, 10.97it/s]
100%|██████████| 1000/1000 [00:00<00:00, 83002.93it/s]
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.l

coaid


Map:   0%|          | 0/1092 [00:00<?, ? examples/s]

1000it [01:17, 12.89it/s]
100%|██████████| 1000/1000 [00:00<00:00, 105004.61it/s]
  avg_per_imp = np.array(list(map(lambda x: abs(x[1]), filter(lambda z: z[2] == 'PERSON', sentence)))).mean()
  ret = ret.dtype.type(ret / rcount)
1000it [01:18, 12.79it/s]
100%|██████████| 1000/1000 [00:00<00:00, 3634.60it/s]
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.loc[ix, ["text"]].get(0)
  text = test_counterfactuals.l

In [26]:
import json
import pickle as pkl

with open("results.json", "w") as file:
    json.dump(results, file)
    
with open('results_misc.pkl', 'wb') as file:
    pkl.dump(results_misc, file)