In [2]:
from evaluate import load
import numpy as np
import csv
import itertools
from tqdm import tqdm, trange

In [3]:
bertscore = load("bertscore")
predictions = ["hello there", "general kenobi"]
references = ["hello there", "general kenobi"]
results = bertscore.compute(predictions=predictions, references=references, lang="en")

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [13]:
eval_dir = 'datasets/sst-2-0.05/stylebkd'
styles = ['bible', 'shakespeare', 'tweets', 'lyrics', 'poetry']
style_eval_dict = {}
for style in styles:
    with open(f'{eval_dir}/{style}-eval/test-clean.csv', 'r') as f:
        style_eval_dict[style] = {
            'test-clean': [value[0] for value in list(csv.reader(f))[1:]]
        }
    with open(f'{eval_dir}/{style}-eval/test-poison.csv', 'r') as f:
        style_eval_dict[style]['test-poison'] = [value[0] for value in list(csv.reader(f))[1:]]


def outlier_get(similarity_matrix):
    similarity_sums = similarity_matrix.sum(axis=1)
    outliers_indices = np.argsort(similarity_sums)[:2]

    return outliers_indices

In [14]:
def compute_similarity_matrix(data_list):
    """ 计算相似性矩阵 """
    size = len(data_list)
    similarity_matrix = np.zeros((size, size))
    
    comb_index = list(itertools.combinations(range(size), 2))

    predictions = [data_list[index[0]] for index in comb_index]
    references = [data_list[index[1]] for index in comb_index]
        
    res = bertscore.compute(predictions=predictions, references=references, lang="en")
    
    similarity = res['f1']
    for k, (i, j) in enumerate(comb_index):
        similarity_matrix[i, j] = similarity[k]
        similarity_matrix[j, i] = similarity[k]
    
    return similarity_matrix

def compute_dispersion_scores(similarity_matrix):
    """ 计算离散分数 """
    mean_similarity = similarity_matrix.mean(axis=1)
    dispersion_scores = 1 - mean_similarity
    return dispersion_scores

def find_non_outliers(dispersion_scores, n=3):
    """ 筛选出n个非离群数据的索引 """
    sorted_indices = np.argsort(dispersion_scores)
    non_outliers = sorted_indices[-n:]
    return non_outliers

def main(data):
    non_outlier_indices = {'test-clean': {}, 'test-poison': {}}

    similarity = {}
    for test_type in ['test-clean', 'test-poison']:
        data_list = [data[category][test_type] for category in data.keys()]
        for idx in trange(len(data_list[0]), desc=f'Computing non outliers on {test_type}'):
            samples = [category_data[idx] for category_data in data_list]

            similarity_matrix = compute_similarity_matrix(samples)
            similarity[test_type] = similarity_matrix
            dispersion_scores = compute_dispersion_scores(similarity_matrix)
            non_outliers = find_non_outliers(dispersion_scores)
            
            non_outlier_indices[test_type][idx] = sorted(non_outliers.tolist())
    return non_outlier_indices, similarity

result, similarity = main(style_eval_dict)

Computing non outliers on test-clean:   0%|          | 0/1821 [00:00<?, ?it/s]

Computing non outliers on test-clean: 100%|██████████| 1821/1821 [00:29<00:00, 61.46it/s]
Computing non outliers on test-poison: 100%|██████████| 909/909 [00:15<00:00, 57.08it/s]


In [16]:
for test_key in result:
    with open(f'datasets/sst-2-0.05/stylebkd/bertscore/{test_key}.csv', 'w') as f:
        writer = csv.writer(f)
        writer.writerows(result[test_key].values())