In [11]:
import sys
import numpy as np
import matplotlib.pyplot as plt
import pickle as pc
from sklearn.metrics import average_precision_score
from sklearn.metrics import precision_recall_fscore_support
from sklearn.model_selection import train_test_split

sys.path.insert(0,'../NeuroX/')
from neurox.data.extraction import transformers_extractor
import neurox.data.loader as data_loader
import neurox.interpretation.ablation as ablation
import neurox.interpretation.linear_probe as linear_probe
import neurox.interpretation.utils as utils
import os

In [16]:
def load_reshape(dataset, method,avg):
    activations = {}
    for split in ['train','test']:
        file = f'data/llama-2/{dataset}/{method}/activations-{split}-{avg}.json'
        if not os.path.exists(file):
            continue
        activation,_ = data_loader.load_activations(file, 4096)
        activations[split] = np.array(activation)
    return activations
    
def load_labels(dataset, method='echo',avg='mean'):
    labels = {}
    for split in ['train','test']:
        file = f'data/llama-2/{dataset}/{method}/labels-{split}.pth'
        if not os.path.exists(file):
            continue
        labels[split] = pc.load(open(file,'rb'))
    return labels

def get_activations(method,avg):
    activations_toxic = load_reshape('toxic', method,avg)
    activations_xstest = load_reshape('xstest', method,avg)
    activations_adv = load_reshape('adv', method,avg)
    activations_mt = load_reshape('mt', method,avg)
    return activations_toxic,activations_xstest, activations_adv, activations_mt
                                      
def get_labels():
    toxic_labels = load_labels('toxic')
    xstest_labels = load_labels('xstest')
    adv_labels = load_labels('adv')
    mt_labels = load_labels('mt')
    return toxic_labels, xstest_labels, adv_labels, mt_labels
               
def get_activation_dict(method,avg):
    activations_toxic,activations_xstest, activations_adv, activations_mt = get_activations(method,avg)
    activations = {'train':activations_toxic['train'], 
                   'test_toxic':activations_toxic['test'],
                   'test_xstest':activations_xstest['test'],
                   'test_adv':activations_adv['test'], 'test_mt':activations_mt['test']}
    return activations

def get_labels_dict(method,avg):
    toxic_labels, xstest_labels, adv_labels,mt_labels = get_labels()
    labels = {'train':toxic_labels['train'], 
               'test_toxic':toxic_labels['test'],
               'test_xstest':xstest_labels['test'],
               'test_adv':adv_labels['test'], 'test_mt':mt_labels['test']}
    return labels
                                      
def get_activations_labels(method,avg):
    activations = get_activation_dict(method,avg)
    labels = get_labels_dict(method,avg)
    return activations, labels

In [17]:
def get_split_data(activation, label):
    target = [[c] for c in label]#[740:]
    source = [np.array([c]) for c in label]
    tokens = {"source": source, "target": target}
    X_train, y_train, mapping = utils.create_tensors(tokens, activation, 1)
    label2idx, idx2label, src2idx, idx2src = mapping
    return {'X':X_train, 'y':y_train, 'label2idx':label2idx , 'idx2label':idx2label}
def get_train_test_data(activations, labels):
    train_test_data = {}
    for split in labels:
        train_test_data[split] = get_split_data(activations[split], labels[split])
    return train_test_data

In [30]:

from sklearn.metrics import precision_score, recall_score, f1_score, precision_recall_curve, auc
def calculate_f1(true_labels, predicted_labels):
    f1 = f1_score(true_labels, predicted_labels)
    precision = precision_score(true_labels, predicted_labels)
    recall = recall_score(true_labels, predicted_labels)

    print("Precision:", precision)
    print("Recall:", recall)
    print("F1 Score:", f1)
    return [precision, recall, f1]
def get_metrics(true_labels,outputs_all):
    # Calculate AUPRC
    precision, recall, thresholds = precision_recall_curve(true_labels,outputs_all[:,1])
    auprc = auc(recall, precision)


    # Calculate Precision, Recall, F1
    # predicted_labels = np.argmax(outputs_all,axis=1)# 
    best_f1 = 0
    best_thr = 0
    for thr in thresholds:
        predicted_labels = [1 if feature >=thr else 0 for feature in outputs_all[:,1]]
        f1 = f1_score(true_labels, predicted_labels)
        if f1>best_f1:
            best_f1=f1
            best_thr = thr

    predicted_labels = [1 if feature >=best_thr else 0 for feature in outputs_all[:,1]]
    res={}
    res['best']=calculate_f1(true_labels, predicted_labels)+[auprc]
    predicted_labels = np.argmax(outputs_all,axis=1)
    res['normal']=calculate_f1(true_labels, predicted_labels)+[auprc]
    return res
    

def get_layer_data(layer, train_test_data):
    layer_data = {}
    for split in train_test_data:
        X = train_test_data[split]['X']
        if layer == -1:
            layer_X = X
        else:
            layer_X = ablation.filter_activations_by_layers(X, [layer], 33)
        layer_data[split]=layer_X
    return layer_data

def run_test(y_test, outputs_all):
    return get_metrics(y_test,outputs_all)
    auprc = average_precision_score(y_test, outputs_all[:,1])
    p,r,f1,_ = precision_recall_fscore_support(y_test, np.argmax(outputs_all,axis=1), average='macro')
    return p,r,f1,auprc

def get_test_scores(probe,train_test_data,layer_data,split):        
    _,_, outputs_all=linear_probe.evaluate_probe(probe, layer_data[split], train_test_data[split]['y'], idx_to_class=train_test_data['train']['idx2label'],return_predictions=True)
    return run_test(train_test_data[split]['y'], outputs_all), outputs_all
    
# def get_score_filtered(probe, train_test_data,layer_data, split, n=100):
#     X_test = ablation.filter_activations_keep_neurons(layer_data[split], ordering[:n])
#     probe_layer_0 = linear_probe.train_logistic_regression_probe(X_train, y_train, lambda_l1=0.001, lambda_l2=0.001)
#     class_score, prediction, outputs_all=linear_probe.evaluate_probe(probe_layer_0, X_test, y_test, idx_to_class=idx2label,return_predictions=True)
#     return get_scores(y_test, outputs_all)


def get_test_scores_filtered(probe,ordering,train_test_data,layer_data,split,n):
    X_test = ablation.filter_activations_keep_neurons(layer_data[split], ordering[:n])
    _,_, outputs_all=linear_probe.evaluate_probe(probe, X_test, train_test_data[split]['y'], idx_to_class=train_test_data['train']['idx2label'],return_predictions=True)
    return run_test(train_test_data[split]['y'], outputs_all), outputs_all
    
def run_filtered(probe_layer,train_test_data,layer_data,n):
    ordering, cutoffs = linear_probe.get_neuron_ordering(probe_layer, train_test_data['train']['idx2label'])
    X_train = ablation.filter_activations_keep_neurons(layer_data['train'], ordering[:n])
    probe_layer = linear_probe.train_logistic_regression_probe(X_train, train_test_data['train']['y'], lambda_l1=0.001, lambda_l2=0.001)
    results = {}
    for split in ['test_toxic','test_xstest','test_adv','test_mt']:
        results[split] = get_test_scores_filtered(probe,ordering,train_test_data,layer_data,split,n)
    return results[split]   
        
    
def run_layer(train_test_data,layer, n=100):
    layer_data = get_layer_data(layer, train_test_data)
    probe_layer = linear_probe.train_logistic_regression_probe(layer_data['train'], train_test_data['train']['y'], lambda_l1=0.001, lambda_l2=0.001)
    results = {}
    for split in ['test_toxic','test_xstest','test_adv','test_mt']:
        results[split] = get_test_scores(probe_layer,train_test_data,layer_data,split)
    # results['filtered'] = run_filtered(probe,train_test_data,layer_data,n)
    return results
    

In [19]:
def draw_auprc(results,title):
    res = {'test_toxic':{},'test_xstest':{},'test_adv':{},'test_mt':{}}
    for layer in results:
        for split in results[layer]:
            if split=='test_adv':
                res[split][layer]=np.mean(np.argmax(results[layer][split][1],axis=1)==1) 
            elif split=='test_mt':
                res[split][layer]=np.mean(np.argmax(results[layer][split][1],axis=1)==0) 
            else:
                res[split][layer]=results[layer][split][0]['best'][-1]                
    for split in res:
        plt.plot(list(res[split].keys())[1:],list(res[split].values())[1:],label=split)
        plt.plot([0,33],[list(res[split].values())[0],list(res[split].values())[0]],color='black',linestyle='-.')
        if split in base:
            plt.plot([0,33],[base[split],base[split]],color='red',linestyle='-.')
    plt.legend()
    plt.title(title)
    # Adjust layout for better spacing
    plt.tight_layout()

    # Show the plot
    plt.savefig(f'outs/{title}.pdf',dpi=300)
    plt.show()

In [None]:
total_train_sampels = 768 ###it is dataset dependens (384*2) ##len(questions_dic['train'])
method2last_mean = {'echo':['mean'],  ### for echo only average embedding sugested
                    'sure':['last'], ### for sure, we only need to embedding for sure
                    'standard':['last','mean','weighted']}
base = {'test_toxic':0.816,'test_xstest':0.936}## baselines
results_all = {}
for method in ['echo','standard','sure']:
    for avg in method2last_mean[method]:
        activations, labels = get_activations_labels(method,avg)
        train_test_data = get_train_test_data(activations, labels)
        results = {}
        for layer in range(-1,33):  ##-1 for all layers
            results[layer] = run_layer(train_test_data,layer, n=100)
        draw_auprc(results,f'{method}_{avg}')   
        results_all[f'{method}{avg}']=results
