In [None]:
from utils.utils import load_conversation_template
template_name = 'llama-2'
conv_template = load_conversation_template(template_name)

In [None]:
from utils.modelUtils import *
import seaborn as sns
import torch
import numpy as np
import matplotlib.pyplot as plt
from casper import nethook


In [None]:
model_name ="gpt2"  # or "Llama2-7B" or "EleutherAI/gpt-neox-20b"
mt = ModelAndTokenizer(
    model_name,
    low_cpu_mem_usage=False,
    torch_dtype=(torch.float16 if "20b" in model_name else None),
    device = 'cuda:0'
)
mt.model

In [None]:
test_prompt = "What is your name?"
predict_token(
    mt,
    [test_prompt],
    return_p=True,
)


In [None]:
generate_outputs(test_prompt,mt)

In [None]:
def trace_with_patch_layer(
    model,  # The model
    inp,  # A set of inputs
    states_to_patch,  # A list of (token index, layername) triples to restore
    answers_t,  # Answer probabilities to collect  
):
    prng = np.random.RandomState(1)  # For reproducibility, use pseudorandom noise
    layers = [states_to_patch[0], states_to_patch[1]]

    # Create dictionary to store intermediate results
    inter_results = {}

    def untuple(x):
        return x[0] if isinstance(x, tuple) else x

    # Define the model-patching rule.
    def patch_rep(x, layer):
        if layer not in layers:
            return x

        if layer == layers[0]:
            inter_results["hidden_states"] = x[0].cpu()
            inter_results["attention_mask"] = x[1][0].cpu()
            inter_results["position_ids"] = x[1][1].cpu()
            return x
        elif layer == layers[1]:
            short_cut_1 = inter_results["hidden_states"].cuda()
            short_cut_2_1 = inter_results["attention_mask"].cuda()
            short_cut_2_2 = inter_results["position_ids"].cuda()
            short_cut_2 = (short_cut_2_1, short_cut_2_2)
            short_cut = (short_cut_1, short_cut_2)
            return short_cut
            
    with torch.no_grad(), nethook.TraceDict(
        model,
        layers,
        edit_output=patch_rep,
    ) as td:
        outputs_exp = model(**inp)

    probs = torch.softmax(outputs_exp.logits[1:, -1, :], dim=1).mean(dim=0)[answers_t]

    return probs

In [None]:
import matplotlib.pyplot as plt
def analyse_based_on_layer(prompt,):
    inp = make_inputs(mt.tokenizer,[prompt]*2)
    with torch.no_grad():
        answer_t, logits = [d[0] for d in predict_from_input(mt.model, inp)]
    [answer] = decode_tokens(mt.tokenizer, [answer_t])
    print(answer)
    model = mt.model
    result_prob = []
    for layer in range(mt.num_layers-1):
        layers = [layername(model, layer),layername(model, layer + 1)]
        print(layers)
        prob =  trace_with_patch_layer(model, inp, layers,answer_t)
        result_prob.append(prob)
     # Convert tensors to a list of numbers
    data_on_cpu = [abs(x.item() - logits.item()) for x in result_prob]
    # Create a list of indices for x-axis
        
    return logits.item() ,data_on_cpu

In [None]:
logits, layerAIE = analyse_based_on_layer(test_prompt)

In [None]:
from scipy.stats import kurtosis
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import kurtosis

seq = layerAIE
logits = logits
kurt = kurtosis(seq, fisher=False)

sns.set_theme()

plt.figure(figsize=(9,6))

sns.scatterplot(x=range(1, len(seq)+1), y=seq, color='b')

plt.title('Prompt: ' + test_prompt)


plt.figtext(0.3, 0.03, f'Logits: {logits:.4f}', ha='center', va='center')
plt.figtext(0.7, 0.03, f'Kurtosis: {kurt:.4f}', ha='center', va='center')
# plt.savefig("M:\Causallm-attack\paper\\figure\pdf\\adver_13b.pdf",bbox_inches="tight")


In [None]:
def trace_with_patch_neuron(
    model,  # The model
    inp,  # A set of inputs
    layers,  # what layer to perform causlity analysis
    neuron_zone, # zone of neurons
    answers_t,  # Answer probabilities to collect  
):

    prng = np.random.RandomState(1)  # For reproducibility, use pseudorandom noise
    layer = layers[0]
    start_neuron =  neuron_zone[0]
    end_neuron = neuron_zone[1]
    def untuple(x):
        return x[0] if isinstance(x, tuple) else x

    # Define the model-patching rule.
    def patch_rep(x, layer):
        if layer != layer:
            return x

        if layer == layer:
            h = untuple(x)
            zeros = torch.zeros_like(h)
            h[:, :, start_neuron:end_neuron] =  zeros[:, :, start_neuron:end_neuron] 
            x_2_1 = x[1][0]
            x_2_2 = x[1][1]
            result = (h,(x_2_1,x_2_2))
           
            return result

    with torch.no_grad(), nethook.TraceDict(
        model,
        layers,
        edit_output=patch_rep,
    ) as td:
        outputs_exp = model(**inp)
    
    probs = torch.softmax(outputs_exp.logits[1:, -1, :], dim=1).mean(dim=0)[answers_t]

    return probs

In [None]:
from tqdm import tqdm
def analysed_based_on_neuron(prompt, mt, analyse_layer,analysed_neurons, save_numpy = None, ):
    inp = make_inputs(mt.tokenizer,[prompt]*2,)
    with torch.no_grad():
        answer_t, logits = [d[0] for d in predict_from_input(mt.model, inp)]
    [answer] = decode_tokens(mt.tokenizer, [answer_t])
    
    result_prob = []
    for zone_index in tqdm(analysed_neurons):
        layers = [layername(mt.model, analyse_layer)]
                # print(layers)
        neuron_zone = [zone_index ,(zone_index+1)]
        prob = trace_with_patch_neuron(mt.model, inp, layers,neuron_zone,answer_t)
        result_prob.append(prob)

    data_on_cpu = [abs(logits.item() - x.item()) for x in result_prob]
        # print(data_on_cpu)
    if save_numpy is not None:
        np.save(save_numpy,data_on_cpu)
        
        
    return logits.item() , data_on_cpu

In [None]:
logits, neuronAIE = analysed_based_on_neuron(test_prompt,mt, 0,range(768))

In [None]:
seq = neuronAIE
aieRange = np.max(seq) - np.min(seq)

# 设定 seaborn 的样式
sns.set_theme()
plt.figure(dpi=1000)
plt.figure(figsize=(9,6))


sns.scatterplot(x=range(0, len(seq)), y=seq, color='b' ,s =20)

plt.title('Layer')


plt.figtext(0.3, 0, f'Logits: {logits:.4f}', ha='center', va='center')
plt.figtext(0.7, 0, f'Range: {kurt:.4f}', ha='center', va='center')
plt.annotate(f'({np.argmax(seq)},{str(seq[np.argmax(seq)])[:5]})', (np.argmax(seq), seq[np.argmax(seq)]), textcoords="offset points", xytext=(-2,-15), ha='center')
plt.annotate(f'({2100},{str(seq[2100])[:5]})', (2100, seq[2100]), textcoords="offset points", xytext=(-2,-15), ha='center')

plt.tight_layout()
# plt.savefig("M:\Causallm-attack\paper\\figure\pdf\\13b_neuron_2_adv.pdf",bbox_inches="tight")

plt.show()