In [None]:
import itertools
import json
import nltk
import random
import torch

import numpy as np
import pandas as pd

from evaluate import load
from scipy.spatial import distance
from transformers import (
    RobertaTokenizerFast,
    RobertaForSequenceClassification,
    TrainingArguments,
    Trainer,
    AutoConfig,
)
from zss import simple_distance, Node

nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
from nltk import pos_tag, word_tokenize, RegexpParser

In [None]:
with open("results/pplm_results.json", "r", encoding='utf-8') as file:
    pplm = json.loads(file.read())
with open("results/relitc_results.json", "r", encoding='utf-8') as file:
    relitc = json.load(file)
with open("results/polyjuice_results.json", "r", encoding='utf-8') as file:
    polyjuice = json.loads(file.read())
with open("results/gpt-4o-results.json", "r", encoding='utf-8') as file:
    poc = json.loads(file.read())
with open("results/ground_truth.json", "r", encoding='utf-8') as file:
    gt = json.loads(file.read())

# Success rate

In [None]:
def success_rate(results):
    i = 0
    for row in results:
        target = row['target']
        row['success'] = False
        for ce in row['counterfactuals']:
            if ce['label'] == target:
                i +=1
                row['success'] = True
                break
                
    return i / len(results)

In [None]:
success_rate(pplm)

In [None]:
success_rate(polyjuice)

In [None]:
success_rate(relitc)

In [None]:
success_rate(poc)

# Prepare counterfactuals

In [None]:
def get_counterfactuals(results):
    ces = []
    for result in results:
        if result['success']:
            best = 0
            string = ''
            for ce in result['counterfactuals']:
                if ce['label'] == result['target'] and ce['score'] > best:
                    string = ce['text']
                    best = ce['score']
        else:
            if len(result['counterfactuals']) == 0:
                string = ''
            else:
                string = result['counterfactuals'][0]['text']
                
        string = string.replace("hawkish : ", "").replace("dovish : ", "").replace("neutral :", "").capitalize()
        if len(string) == 0:
            string = None
            
        ces.append(string)
    
    return ces

In [None]:
poc_indexes = [ce['index'] for ce in poc]

relitc = [relitc[i] for i in poc_indexes]
pplm = [pplm[i] for i in poc_indexes]
polyjuice = [polyjuice[i] for i in poc_indexes]
gt = [gt[i] for i in poc_indexes]

In [None]:
relitc_ces = get_counterfactuals(relitc)
pplm_ces = get_counterfactuals(pplm)
polyjuice_ces = get_counterfactuals(polyjuice)
poc_ces = get_counterfactuals(poc)

relitc_success = [x['success'] for x in relitc]
pplm_success = [x['success'] for x in pplm]
polyjuice_success = [x['success'] for x in polyjuice]
poc_success = [x['success'] for x in poc]

factuals = [x['text'] for x in relitc]
ids = [x['id'] for x in relitc]
labels = [x['label'] for x in relitc]
targets = [x['target'] for x in relitc]

assert len(relitc_ces) == len(pplm_ces) == len(polyjuice_ces) == len(poc_ces) == len(factuals) == len(ids)

In [None]:
data = {
    'id': ids,
    'factual': factuals,
    'polyjuice': polyjuice_ces,
    'pplm': pplm_ces,
    'poc': poc_ces,
    'relitc': relitc_ces,
    'label': labels,
    'target': targets,
    'polyjuice_success': polyjuice_success,
    'pplm_success': pplm_success,
    'relitc_success': relitc_success,
    'poc_success': poc_success
}

counterfactuals = pd.DataFrame.from_dict(data).dropna()

In [None]:
#counterfactuals = pd.read_csv('metrics_calculated.csv')

# Faithfulness (naive)

In [None]:
def faithfulness(results):
    res = []
    for row, gt_row in zip(results, gt):
        if len(row['counterfactuals']) == 0:
            continue
        
        r = 1 * (gt_row['label'] == gt_row['classification']['label'])
        
        if not row['success']:
            nce = np.random.choice(row['counterfactuals'], size=1)
            if nce[0]['label'] == gt_row['label']:
                r -= 1
                
        res.append(r)
                
    return np.mean(res), np.std(res)

In [None]:
faithfulness(polyjuice)

In [None]:
faithfulness(pplm)

In [None]:
faithfulness(relitc)

In [None]:
faithfulness(poc)

# Perplexity

In [None]:
perplexity = load("perplexity", module_type="metric")
model_id = 'gpt2'
row_ending = ''

In [None]:
perplexity_factual = perplexity.compute(predictions=counterfactuals['factual'], model_id=model_id)

perplexity_pplm = perplexity.compute(predictions=counterfactuals['pplm'], model_id=model_id)

perplexity_relitc = perplexity.compute(predictions=counterfactuals['relitc'], model_id=model_id)

perplexity_polyjuice = perplexity.compute(predictions=counterfactuals['polyjuice'], model_id=model_id)

perplexity_poc = perplexity.compute(predictions=counterfactuals['poc'], model_id=model_id)

In [None]:
print(f"Mean perplexity: PPLM {perplexity_pplm['mean_perplexity']}, RELITC {perplexity_relitc['mean_perplexity']}, Polyjuice {perplexity_polyjuice['mean_perplexity']}, POC {perplexity_poc['mean_perplexity']}")

In [None]:
counterfactuals[f'polyjuice_perplexity{row_ending}'] = perplexity_polyjuice['perplexities']
counterfactuals[f'poc_perplexity{row_ending}'] = perplexity_poc['perplexities']
counterfactuals[f'pplm_perplexity{row_ending}'] = perplexity_pplm['perplexities']
counterfactuals[f'relitc_perplexity{row_ending}'] = perplexity_relitc['perplexities']
counterfactuals[f'factual_perplexity{row_ending}'] = perplexity_factual['perplexities']

In [None]:
perp_ratio_polyjuice = np.mean(np.array(counterfactuals[f'polyjuice_perplexity{row_ending}'])/np.array(counterfactuals[f'factual_perplexity{row_ending}']))
perp_ratio_poc = np.mean(np.array(counterfactuals[f'poc_perplexity{row_ending}'])/np.array(counterfactuals[f'factual_perplexity{row_ending}']))
perp_ratio_pplm = np.mean(np.array(counterfactuals[f'pplm_perplexity{row_ending}'])/np.array(counterfactuals[f'factual_perplexity{row_ending}']))
perp_ratio_relitc = np.mean(np.array(counterfactuals[f'relitc_perplexity{row_ending}'])/np.array(counterfactuals[f'factual_perplexity{row_ending}']))

In [None]:
print(f"Mean perplexity ratio: PPLM {perp_ratio_pplm}, RELITC {perp_ratio_relitc}, Polyjuice {perp_ratio_polyjuice}, POC {perp_ratio_poc}")

# Edit Distance

In [None]:
# https://github.com/RedTeamingforLLMs/RedTeamingforLLMs/blob/main/utils/metrics.py
def levenshtein(a, b):
    # We want a to be the (potentially) longer string
    if len(a) > len(b):
        a, b = b, a

    distances = range(len(a) + 1)
    for b_index, b_element in enumerate(b):
        min_distances = [b_index + 1]

        for a_index, a_element in enumerate(a):
            if a_element == b_element:
                min_distances.append(distances[a_index])
            else:
                min_distances.append(
                    1 + min(distances[a_index], distances[a_index + 1], min_distances[-1]))

        distances = min_distances
    return distances[0 - 1]

In [None]:
pplm_dist = [levenshtein(cont, fact)/len(fact) for cont, fact in zip(counterfactuals['pplm'], counterfactuals['factual'])]

relitc_dist = [levenshtein(cont, fact)/len(fact) for cont, fact in zip(counterfactuals['relitc'], counterfactuals['factual'])]

polyjuice_dist = [levenshtein(cont, fact)/len(fact) for cont, fact in zip(counterfactuals['polyjuice'], counterfactuals['factual'])]

poc_dist = [levenshtein(cont, fact)/len(fact) for cont, fact in zip(counterfactuals['poc'], counterfactuals['factual'])]

In [None]:
print(f"Mean edit distance: PPLM {np.mean(pplm_dist)}, RELITC {np.mean(relitc_dist)}, Polyjuice {np.mean(polyjuice_dist)}, POC {np.mean(poc_dist)}")

In [None]:
counterfactuals['polyjuice_edit_distance'] = polyjuice_dist
counterfactuals['poc_edit_distance'] = poc_dist
counterfactuals['pplm_edit_distance'] = pplm_dist
counterfactuals['relitc_edit_distance'] = relitc_dist

# Tree edit distance

In [None]:
# Source: https://www.geeksforgeeks.org/syntax-tree-natural-language-processing/

#Extract all parts of speech from any text
chunker = RegexpParser("""
                       NP: {<DT>?<JJ>*<NN>}    #To extract Noun Phrases
                       P: {<IN>}               #To extract Prepositions
                       V: {<V.*>}              #To extract Verbs
                       PP: {<p> <NP>}          #To extract Prepositional Phrases
                       VP: {<V> <NP|PP>*}      #To extract Verb Phrases
                       """)

def get_tree(text):
    # Find all parts of speech in above sentence
    tagged = pos_tag(word_tokenize(text))

    # Print all parts of speech in above sentence
    output = chunker.parse(tagged)
    return output

In [None]:
def traverse_tree(tree):
    
    zss_tree = Node(tree.label())
    
    for subtree in tree:
        if type(subtree) == nltk.tree.Tree:
            zss_tree = zss_tree.addkid(traverse_tree(subtree))
        else:
            zss_tree = zss_tree.addkid(Node(subtree[0]))
            
    return zss_tree

In [None]:
def get_zss(text):
    
    tree = get_tree(text)
    
    return traverse_tree(tree)

In [None]:
def tree_edit_dist(text_a, text_b):
    return simple_distance(get_zss(text_a), get_zss(text_b))

In [None]:
pplm_tree_dist = [tree_edit_dist(cont, fact) for cont, fact in zip(counterfactuals['pplm'], counterfactuals['factual'])]
relitc_tree_dist = [tree_edit_dist(cont, fact) for cont, fact in zip(counterfactuals['relitc'], counterfactuals['factual'])]
polyjuice_tree_dist = [tree_edit_dist(cont, fact) for cont, fact in zip(counterfactuals['polyjuice'], counterfactuals['factual'])]
poc_tree_dist = [tree_edit_dist(cont, fact) for cont, fact in zip(counterfactuals['poc'], counterfactuals['factual'])]

In [None]:
print(f"Mean tree edit distance: PPLM {np.mean(pplm_tree_dist)}, RELITC {np.mean(relitc_tree_dist)}, Polyjuice {np.mean(polyjuice_tree_dist)}, POC {np.mean(poc_tree_dist)}")

In [None]:
counterfactuals['polyjuice_tree_edit_distance'] = polyjuice_tree_dist
counterfactuals['poc_tree_edit_distance'] = poc_tree_dist
counterfactuals['pplm_tree_edit_distance'] = pplm_tree_dist
counterfactuals['relitc_tree_edit_distance'] = relitc_tree_dist

# Embedding distance

In [None]:
model_id = "gtfintechlab/FOMC-RoBERTa"
tokenizer = RobertaTokenizerFast.from_pretrained(model_id)
config = AutoConfig.from_pretrained(model_id)
model = RobertaForSequenceClassification.from_pretrained(model_id, config=config).cuda()


In [None]:
def get_embeddings(texts):
    idxs = [x for x in range(0, len(texts), 32)]
    idxs.append(len(texts))

    hiddens = []
    for i in range(len(idxs)-1):
        tokens = tokenizer(texts[idxs[i]:idxs[i+1]], return_tensors="pt", padding=True).to('cuda')
        embedding = model(**tokens, output_hidden_states=True).hidden_states[-1].detach()[:, -1, :]
        hiddens += embedding
    return torch.stack(hiddens).cpu()

In [None]:
names = ['polyjuice', 'poc', 'pplm', 'relitc']
classes = [0, 1, 2]

embeddings = {}
cl_emb = {c: get_embeddings(counterfactuals['factual'][counterfactuals['label'] == c].to_list()) for c in classes}
embeddings['factual'] = cl_emb

for name in names:    
    cl_emb = get_embeddings(counterfactuals[name].to_list())
    embeddings[name] = cl_emb

In [None]:
def get_embedding_distance(cfs):
    out = []
    for i in range(len(counterfactuals)):
        out.append(np.min(distance.cdist([cfs[i]], embeddings['factual'][counterfactuals.iloc[i]['target']])))
    return out

In [None]:
counterfactuals['polyjuice_embedding_distance'] = get_embedding_distance(embeddings['polyjuice'])
counterfactuals['poc_embedding_distance'] = get_embedding_distance(embeddings['poc'])
counterfactuals['pplm_embedding_distance'] = get_embedding_distance(embeddings['pplm'])
counterfactuals['relitc_embedding_distance'] = get_embedding_distance(embeddings['relitc'])

In [None]:
min_distances = {name: {cl: distance.cdist(embeddings[name], embeddings['factual'][cl]).min(axis=1) for cl in classes} for name in names}

In [None]:
min_distances_mean = {name: np.mean(np.concatenate([min_distances[name][cl] for cl in classes])) for name in names}

In [None]:
min_distances_mean

In [None]:
n_samples = 50
indices = {c: torch.randperm(len(embeddings['factual'][c]))[:n_samples] for c in classes} # Random indices for subsampling target class embeddings

def get_implausibility(cfs):
    out = []
    for i in range(len(counterfactuals)):
        target = counterfactuals.iloc[i]['target']
        out.append(np.mean(distance.cdist([cfs[i]], embeddings['factual'][target][indices[target]])))
    return out

In [None]:
implausibilities = {name: get_implausibility(embeddings[name]) for name in names}

In [None]:
implausibilities_mean = {name: np.mean(implausibilities[name]) for name in implausibilities}

In [None]:
implausibilities_mean

In [None]:
for name in ['poc', 'polyjuice', 'pplm', 'relitc']:
    counterfactuals[f'{name}_implausibility'] = implausibilities[name]

In [None]:
counterfactuals.to_csv('metrics_calculated.csv')

# Tables

In [None]:
counterfactuals = pd.read_csv('metrics_calculated.csv')

In [None]:
counterfactuals[['polyjuice_success', 'pplm_success', 'relitc_success']]

In [None]:
def get_table(df, success_only=False):
    generators = ['POC', 'Polyjuice', 'PPLM', 'RELITC']

    metrics = [
        ('Perplexity', [], []),
        ('Perplexity ratio', [], []),
        ('Edit distance', [], []),
        ('Tree edit distance', [], []),
        ('Embedding distance', [], []),
        ('Implausibility', [], []),
        ('Success rate', [], []),
    ]
    
    for generator in generators:
        curr_df = df
        gen_lower = generator.lower()
        
        if success_only:
            curr_df = df[df[f'{gen_lower}_success'] == True]
        
        metrics[0][1].append(np.mean(curr_df[f'{gen_lower}_perplexity']))
        metrics[1][1].append(np.mean(np.array(curr_df[f'{gen_lower}_perplexity'])/np.array(curr_df[f'factual_perplexity'])))
        metrics[2][1].append(np.mean(curr_df[f'{gen_lower}_edit_distance']))
        metrics[3][1].append(np.mean(curr_df[f'{gen_lower}_tree_edit_distance']))
        metrics[4][1].append(np.mean(curr_df[f'{gen_lower}_embedding_distance']))
        metrics[5][1].append(np.mean(curr_df[f'{gen_lower}_implausibility']))
        metrics[6][1].append(np.mean(curr_df[f'{gen_lower}_success']))
        
        metrics[0][2].append(np.std(curr_df[f'{gen_lower}_perplexity']))
        metrics[1][2].append(np.std(np.array(curr_df[f'{gen_lower}_perplexity'])/np.array(curr_df[f'factual_perplexity'])))
        metrics[2][2].append(np.std(curr_df[f'{gen_lower}_edit_distance']))
        metrics[3][2].append(np.std(curr_df[f'{gen_lower}_tree_edit_distance']))
        metrics[4][2].append(np.std(curr_df[f'{gen_lower}_embedding_distance']))
        metrics[5][2].append(np.std(curr_df[f'{gen_lower}_implausibility']))
        metrics[6][2].append(np.std(curr_df[f'{gen_lower}_success']))
        
    
    out = ''
    
    out += '|   |'
    for metric in metrics:
        out += f' {metric[0]} |'
    
    out += '\n'
    out += '|---|'
    
    for _ in range(len(metrics)):
        out += f'---|'
        
    out += '\n'
    
    for i, gen in enumerate(generators):
        
        out += f'| {gen} |'
        
        for metric in metrics:
            out += f' {metric[1][i]:.2f} ({metric[2][i]:.1f}) |'
            
        out += '\n'
        
    return out   

In [None]:
print(get_table(counterfactuals))

|   | Perplexity | Perplexity ratio | Edit distance | Tree edit distance | Embedding distance | Success rate |
|---|---|---|---|---|---|---|
| POC | 74.000 | 1.373 | 0.293 | 19.400 | 24.863 | 0.880 |
| Polyjuice | 86.485 | 1.577 | 0.264 | 17.360 | 24.779 | 0.360 |
| PPLM | 37.113 | 0.761 | 0.565 | 37.480 | 24.974 | 0.520 |
| RELITC | 86.723 | 1.536 | 0.127 | 11.000 | 25.832 | 0.800 |

In [None]:
print(get_table(counterfactuals, True))

|   | Perplexity | Perplexity ratio | Edit distance | Tree edit distance | Embedding distance | Success rate |
|---|---|---|---|---|---|---|
| POC | 76.485 | 1.340 | 0.301 | 20.455 | 24.448 | 1.000 |
| Polyjuice | 101.116 | 1.456 | 0.268 | 17.000 | 23.498 | 1.000 |
| PPLM | 33.623 | 0.671 | 0.631 | 34.308 | 23.296 | 1.000 |
| RELITC | 87.206 | 1.484 | 0.098 | 10.350 | 25.703 | 1.000 |