# Load

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
import numpy as np

In [None]:
import json
with open ('datas_positive.json','r',encoding='utf-8') as f:
  datas_positive = json.load(f)
with open ('datas_negative.json','r',encoding='utf-8') as f:
  datas_negative = json.load(f)

In [None]:
system_prompt = {
'basic' :"""请仔细阅读以下多条文本,然后回答问题：基于这些文本，这个用户是否存在抑郁情绪？用“是”或者“否”回答
***重要***
你必须用以下格式回答问题
答案：“是”或“否”
""",
'OR': """请仔细阅读以下多条文本，关注用户的情绪表达、对未来的态度、社交参与度、兴趣水平以及任何关于身体症状或睡眠/食欲变化、自我价值、个人感受的表述。回答问题：基于这些文本，这个用户是否存在抑郁情绪？用“是”或者“否”回答，再给出你一步一步的分析过程。
***重要***
你必须用以下格式回答问题
答案：“是”或“否”
分析：你一步一步的分析过程，解释你为什么给出这个答案。""",
'OR2': """请仔细阅读以下多条文本然后回答问题：基于这些文本，这个用户是否存在抑郁情绪？用“是”或者“否”回答，再给出你一步一步的分析过程。
***重要***
你必须用以下格式回答问题
答案：“是”或“否”
分析：你一步一步的分析过程，解释你为什么给出这个答案。""",
'OR3': """请仔细阅读以下多条文本然后回答问题：基于这些文本，这个用户是否存在抑郁情绪？用“是”或者“否”回答，再给出你一步一步的分析过程。
""",

'RET':"""请逐条阅读以下用户的文本，关注用户的情绪表达、对未来的态度、社交参与度、兴趣水平以及任何关于身体症状或睡眠/食欲变化、自我价值、个人感受的表述。回答问题：基于这些文本，这个用户是否存在抑郁情绪？用“是”或者“否”回答，再给出你一步一步的分析过程，指出哪条或哪几条文本让你得出这个回答。
***重要***
如果用户在文本中明确提到自己得到了抑郁的诊断，请直接判断为“是”，并在分析中指出相关文本。
你必须用以下格式回答问题
答案：“是”或“否”
分析：你一步一步的分析过程，指出哪条或哪几条文本让你得出这个回答。""",}

In [None]:

def modify_tweet (tweet,mode = 1):
    input_text_list = tweet.split('\n')
    if mode == 0 :
        tweet = tweet
    elif mode == 1:
        input_text_list_mod= [f'文本{i+1}:{text}' for i,text in enumerate(input_text_list)]
        tweet = '\n'.join(input_text_list_mod)
    elif mode == 2:
        input_text_list_mod = [f'<文本{i+1}>{text}</文本{i+1}> ' for i,text in enumerate(input_text_list)]
        tweet = '\n'.join(input_text_list_mod)
    return tweet
def rmsnorm(x, eps=1e-6):
    """
    手动实现RMSNorm归一化操作
    :param x: 输入张量
    :param eps: 防止除零的小常数
    :return: 归一化后的张量
    """
    rms = torch.sqrt(torch.mean(x ** 2, dim=-1, keepdim=True) + eps)
    return x / rms

def find_start_end_index(start_token = '<|im_start|>',end_token ='<|im_end|>',drift = 3):
    input_start = 0
    input_end = 0
    output_start = 0
    output_end = 0
    input_count = 0
    output_count = 0
    i = 0
    for token in generated_ids[0][0]:
        if tokenizer.decode(token) == start_token:
            if input_count == 0:
                input_start = i+drift
                input_count += 1
            else:
                output_start = i+drift
        elif tokenizer.decode(token) == end_token:
            if output_count == 0:
                input_end = i
                output_count += 1
            else:
                output_end = i
        i += 1
    print('input_start:',input_start,'input_end:',input_end)
    print('output_start:',output_start,'output_end:',output_end)
    assert input_start < input_end
    #assert output_start < output_end
    return input_start,input_end,output_start,output_end

def compute_attention_rollout(converted_attentions,response,norm_type = 'RMS',add_residual=True, eps=1e-6):
    all_attentions =  [(t,a) for t,a in zip(response.tolist(), converted_attentions)]
    joint_attentions =[]
    for pair in all_attentions:
        token = pair[0]
        res_att_mat = pair[1].mean(axis=1) #平均注意力头的权重，得到一个 [layer=32, seq_len=66或者1, seq_len=66或者66+生成步数] 的张量
        if add_residual:
            res_att_mat = res_att_mat + np.eye(res_att_mat.shape[1])[None,...] #残差连接
        if norm_type == 'RMS':
            res_att_mat = rmsnorm(torch.tensor(res_att_mat), eps=eps).numpy() #RMSNorm归一化
        elif norm_type == 'layernorm':
            res_att_mat = res_att_mat / res_att_mat.sum(axis=-1)[...,None] #LayerNorm归一化
        res_att_mat = rmsnorm(torch.tensor(res_att_mat), eps=eps).numpy()
        joint_att = np.zeros(res_att_mat.shape)
        layers = joint_att.shape[0]
        joint_att[0] = res_att_mat[0]
        for i in np.arange(1, layers):
            joint_att[i] = res_att_mat[i].dot(joint_att[i-1].T)
        joint_attentions.append((token,joint_att))
    return joint_attentions

def generate_text_with_scores_html(tensor, text, output_path, normalize=True, method='min-max', window_size=10):
    """
    根据输入的向量和文本生成一个HTML文件，其中每个字符上方显示对应的分数，
    并根据分数调整颜色。最终的HTML文件保存在指定路径中。

    参数:
    tensor (numpy.ndarray): 形状为 (1, N) 的向量，其中 N 是文本的长度。
    text (str): 文本字符串，与向量长度匹配。
    output_path (str): 输出HTML文件的路径。
    normalize (bool): 是否归一化分数，默认为True。
    method (str): 归一化方法，支持 'min-max'（默认）、'mean' 和 'moving-average'。
    window_size (int): 滑动平均窗口大小，默认为10，仅在选择滑动平均归一化时使用。

    返回:
    None
    """
    
    # 检查张量和文本长度是否匹配
    assert tensor.shape[1] == len(text), "张量和文本的长度不匹配！"

    scores = tensor[0]

    # 归一化分数
    if normalize:
        if method == 'min-max':
            scores = (scores - scores.min()) / (scores.max() - scores.min())
        elif method == 'mean':
            mean_value = scores.mean()
            scores = scores - mean_value
        elif method == 'moving-average':
            mean_value = np.convolve(scores, np.ones(window_size) / window_size, mode='same')
            scores = scores - mean_value
        else:
            raise ValueError("未知的归一化方法。请使用 'min-max', 'mean', 或 'moving-average'。")

    # 创建颜色映射函数
    def score_to_color(score):
        r = int(255 * score)
        b = 255 - r
        return f'rgb({r}, 0, {b})'

    # 定义每行显示的字符数
    chars_per_line = 50
    num_lines = len(text) // chars_per_line + (1 if len(text) % chars_per_line else 0)

    # 生成HTML内容
    html_content = "<html><body style='font-family:monospace;'>\n"

    # 设置间隔
    spacing = "20px"  # 可以根据需要调整间隔

    for line in range(num_lines):
        start_idx = line * chars_per_line
        end_idx = start_idx + chars_per_line
        line_text = text[start_idx:end_idx]
        line_scores = scores[start_idx:end_idx]

        for i, char in enumerate(line_text):
            color = score_to_color(line_scores[i])
            score_text = f"{line_scores[i]:.2f}"
            border_style = "border: 1px solid black;" if line_scores[i] > 0 else ""
            html_content += f"<div style='display:inline-block; text-align:center; color:{color}; margin-right:{spacing}; {border_style}'>" \
                            f"<div style='font-size:0.5em;'>{score_text}</div>" \
                            f"<div>{char}</div>" \
                            f"</div>"

        html_content += "<br>\n"

    html_content += "</body></html>"

    # 保存为HTML文件
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(html_content)

    print(f"HTML文件已生成并保存在 {output_path}")

# Llama

In [None]:
model_id = "llama3-8B-Chinese-Chat"

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype= torch.bfloat16, device_map="auto",attn_implementation="eager", 
                                             )

In [None]:
#prompt = "判断以下文本是否表达了抑郁情绪：'我觉得生活毫无意义，每一天都很痛苦。'"
#prompt = system_prompt['RET']+'\n'+modify_tweet(tweet_tp,mode=1)
prompt = system_prompt['OR2'] + '\n' + tweet_x
input_ids = tokenizer.apply_chat_template(
    [{"role": "system", "content": prompt}],
    add_generation_prompt=True,
    return_tensors="pt"
    ).to(model.device)

terminators = [
    tokenizer.eos_token_id,
    tokenizer.convert_tokens_to_ids("<|eot_id|>")
]

model_inputs = tokenizer.apply_chat_template(
    [{"role": "system", "content": prompt}],
    add_generation_prompt=True,
    return_tensors="pt"
    ).to(model.device)

terminators = [
    tokenizer.eos_token_id,
    tokenizer.convert_tokens_to_ids("<|eot_id|>")
]

In [None]:
generation_params = {'max_new_tokens': 100,
                     'pad_token_id': tokenizer.pad_token_id,
                     'terminator_ids': terminators,
                     'do_sample': True,
                     'output_attentions': True,
                     'return_dict_in_generate': True,
                     #'temperature': 0.5,
                    #'num_return_sequences': 1,
                     #'top_k': 50,
                     #'top_p': 0.95,
                    # 'repetition_penalty': 1.2,
                     #'do_sample': True,
                     #'no_repeat_ngram_size': 3,
                     #'max_length': 100,
                     #'use_cache': True
        }
                        

In [None]:
generated_ids = model.generate(
    input_ids,
    **generation_params
    )
response_ids = generated_ids[0][0][input_ids.shape[-1]:]
response = tokenizer.decode(response_ids, skip_special_tokens=False)
print(response)
print(len(response))

## 计算输入序列每个token总的输出输入注意力

In [None]:
start_index,end_index,_,_ = find_start_end_index(start_token='<|end_header_id|>',end_token='<|eot_id|>',drift=2)

In [None]:
s = start_index
t = end_index
attention_input = [
    torch.max(hd[:, :, s:t, s:t].cpu().squeeze(0).to(torch.float32), dim=0)[0]
    for hd in generated_ids['attentions'][0]
]
print(len(attention_input))
print(attention_input[0].shape)

In [None]:
reviced_attention = []
attender_attention = []
recived_pairs = []
index_tokens = [ tokenizer.decode(t,skip_special_tokens=False) for t in generated_ids[0][0][s:t]]
seq_len = len(index_tokens)
for layer in attention_input:
    total_attention_as_object = torch.sum(layer, axis=0).tolist()
    total_attention_as_object = [att/seq_len for att in total_attention_as_object]
    total_attention_as_attender = torch.sum(layer, axis=1).tolist()
    total_attention_as_attender = [att/seq_len for att in total_attention_as_attender]
    reviced_attention.append(total_attention_as_object)
    attender_attention.append(total_attention_as_attender)


In [None]:
i = 1
for layer in reviced_attention:
    recived_pairs.append([(token,att) for token,att in zip(index_tokens,layer)])
    print(f'layer{i}:{sorted(recived_pairs[-1],key=lambda x:x[1],reverse=True)[:10]}')
    i += 1

In [None]:
reviced_attention_tensor = [np.array(layer) for layer in reviced_attention]
bias_vector = np.ones(reviced_attention_tensor[0].shape[0])
bias_vector = bias_vector[None, :]
reviced_attention_tensor = np.array(reviced_attention_tensor) + bias_vector
reviced_attention_tensor = rmsnorm(torch.tensor(reviced_attention_tensor)).numpy()   
layers = reviced_attention_tensor.shape[0]
joint_attention = np.zeros(reviced_attention_tensor.shape)
joint_attention[0] = reviced_attention_tensor[0]
for i in np.arange(1, layers):
    joint_attention[i] = reviced_attention_tensor[i] * joint_attention[i-1]  #逐元素相乘,因为原文中的公式是用于计算注意力方阵的[seq_len,seq_len]，但是这里用于计算注意力向量[1,seq_len]，所以需要逐元素相乘
last_layer_self_rollout = joint_attention[-1]

## 预处理数据

In [None]:
print('生成步骤:',type(generated_ids['attentions']),len(generated_ids['attentions']))
print('每一步包含的层数：',type(generated_ids['attentions'][0]),len(generated_ids['attentions'][0]))
print('每一层中的注意力矩阵:',type(generated_ids['attentions'][0][0]),generated_ids['attentions'][0][0].shape)
#适用于未经过形态转换的注意力权重,其格式如以上所示
all_attentions =  [(t,a) for t,a in zip(response_ids.tolist(), generated_ids['attentions'])] #attention dict for all tokens
all_attentions_avg = [(t,sum(a)/len(a)) for t,a in zip(response_ids.tolist(), generated_ids['attentions'])] #average attention dict for all tokens
print(all_attentions_avg[0][1].shape)

In [None]:
for i in range(len(generated_ids['attentions'])):
    for j in range(len(generated_ids['attentions'][i])):
        print(f'生成步数{i+1},层数{j+1}',generated_ids['attentions'][i][j].shape)

In [None]:
# 假设 attentions_step1 是第 1 步的注意力张量，长度为 28 的 tuple
attentions_step1 = generated_ids['attentions'][0]

# 创建一个列表用于存储每一层的最后一个 token 的注意力
last_token_attention_list = []
assert generated_ids[0][0][end_index:end_index+1] == tokenizer.eos_token_id
# 遍历 tuple 中的每一层注意力矩阵
for layer_attention in attentions_step1:
    # 选择输出最后一个 token （ <|im_end|>）的注意力，并添加到列表中
    last_token_attention = layer_attention[:, :, end_index:end_index+1:, :]
    last_token_attention_list.append(last_token_attention)

# 你可以将这个列表转为 tensor 或者继续保留为列表
# last_token_attention_tensor = torch.stack(last_token_attention_list)

# 查看第一个层的最后一个 token 注意力的形状
print(last_token_attention_list[0].shape)
print(len(last_token_attention_list))
attentions_list = list(generated_ids['attentions'])

# 将生成步骤 0 对应的 tuple 替换为新的 tuple
attentions_list[0] = tuple(last_token_attention_list)

# 将列表转换回 tuple 并赋值回 generated_ids['attentions']
generated_ids['attentions'] = tuple(attentions_list)

# 验证替换后的内容
print(type(generated_ids['attentions']))  # 应该是 tuple
print(generated_ids['attentions'][0][0].shape)  # 应该是 [1, 28, 1, 1776]


# 创建一个新的列表来存储转换后的张量
# 假如输入长度为66，这个数字包含了特殊符号和系统信息，prompt从第54个位置开始
converted_attentions = []
'''
目前find_start_end_nb函数还有问题，所以这里手动指定start_index和end_index
start_index, end_index = find_start_end_nb(text, trigger_word='user')
end_index = end_index + 1
'''
#这里的start_index和end_index代表的是输入序列的起始和结束位置

for i in range(len(generated_ids['attentions'])):
    # 将每个生成步骤中的 28 层中 [batch_size = 1, attention_head=28, seq_len=输入长度或者1, seq_len=输入长度或者输入长度+生成步数] 张量拼接成一个 [layer=28, attention_head=28, seq_len=输入长度或者1, seq_len=(输入长度或者输入长度+生成步数] 的张量
    step_attentions = torch.cat([layer for layer in generated_ids['attentions'][i]], dim=0)
    
    # 保留从start_index到end_index之间的内容
    if step_attentions.shape[-1] > start_index:
        step_attentions = step_attentions[:, :, :, start_index:end_index]
    
    converted_attentions.append(step_attentions.detach().clone().cpu().to(torch.float32).numpy())

# 打印转换后的张量形状
for i, tensor in enumerate(converted_attentions):
    print(f"Shape of tensor at step {i+1}: {tensor.shape}")  # 应该输出 [layer=28, attention_head=28, seq_len=输入长度或者1, seq_len=输入长度或者输入长度+生成步数]，对于最后一个seq_len，只保留从start_index到end_index之间的内容

## 计算RAW ATTENTION

In [None]:
all_attentions =  [(t,a) for t,a in zip(response_ids[0].tolist(), converted_attentions)]
all_attentions_avg =[]
for pair in all_attentions:
    token = pair[0]
    #res_att_mat = pair[1].mean(axis=1) #平均注意力头的权重，得到一个 [layer=28, seq_len=输入长度或者1, seq_len=输入长度或者输入长度+生成步数] 
    res_att_mat = pair[1].max(axis=1) #取最大注意力头的权重，得到一个 [layer=28, seq_len=输入长度或者1, seq_len=输入长度或者输入长度+生成步数]
    bias_vector = np.ones(pair[1].shape[-1])
    bias_vector = bias_vector[None, None, :]
    res_att_mat = res_att_mat + bias_vector #对[layers,1,seq_len]做残差连接
    #res_att_mat = res_att_mat / res_att_mat.sum(axis=-1)[...,None] #归一化
    res_att_mat = rmsnorm(torch.tensor(res_att_mat)).numpy() #RMSNorm归一化
    all_attentions_avg.append((token, res_att_mat)) #token是当前生成的token，res_att_mat是当前生成的token对输入token的注意力权重


## 计算Attention Rollout

In [None]:
eps = 1e-6
all_attentions =  [(t,a) for t,a in zip(response_ids.tolist(), converted_attentions)]
joint_attentions =[]
for pair in all_attentions:
    token = pair[0]
    #res_att_mat = pair[1].mean(axis=1) #平均注意力头的权重，得到一个 [layer=28, seq_len=输入长度或者1, seq_len=输入长度或者输入长度+生成步数] 的张量
    res_att_mat = pair[1].max(axis=1) #取最大注意力头的权重，得到一个 [layer=28, seq_len=输入长度或者1, seq_len=输入长度或者输入长度+生成步数]
    #res_att_mat = pair[1].min(axis=1) #取最小注意力头的权重，得到一个 [layer=28, seq_len=输入长度或者1, seq_len=输入长度或者输入长度+生成步数]
    #res_att_mat = res_att_mat + np.eye(res_att_mat.shape[1])[None,...] #对[layers,seq_len,seq_len]做残差连接
    #res_att_mat = np.eye(res_att_mat.shape[2])[None, None, ...]
    bias_vector = np.ones(pair[1].shape[-1])
    bias_vector = bias_vector[None, None, :]
    res_att_mat = res_att_mat + bias_vector #对[layers,1,seq_len]做残差连接
    res_att_mat = rmsnorm(torch.tensor(res_att_mat), eps=eps).numpy() #RMSNorm归一化
    #res_att_mat = res_att_mat / res_att_mat.sum(axis=-1)[...,None] #归一化
    joint_att = np.zeros(res_att_mat.shape)
    layers = joint_att.shape[0]
    joint_att[0] = res_att_mat[0]
    for i in np.arange(1, layers):
        joint_att[i] = res_att_mat[i] * joint_att[i-1]  #逐元素相乘,因为原文中的公式是用于计算注意力方阵的[seq_len,seq_len]，但是这里用于计算注意力向量[1,seq_len]，所以需要逐元素相乘
    joint_attentions.append((token,joint_att))

#### 计算所有层的注意力最大值/平均值/最后一层，并且输出HTML

##### 最后层

In [None]:
seq_ids = []
seq_ids = (model_inputs[0].tolist())[start_index:end_index]
seq_tokens = [tokenizer.decode([token]) for token in seq_ids] #输入序列的token
all_generated_joint_attentions_avg = np.array([att[1] for att in joint_attentions]).mean(axis=0) #计算所有生成步数的平均注意力rollout
last_layer = all_generated_joint_attentions_avg[-1] #取最后一层的注意力
last_layer = last_layer+last_layer_self_rollout.reshape(1,-1) 
print(last_layer.mean())
print(len(seq_tokens))
generate_text_with_scores_html(last_layer,seq_tokens,normalize= True,method='mean',output_path=f'llama3_Roll_maxhead_lastlayer_self_att_correct__TEST1.html')##### 最后层

##### 最大层

In [None]:
#最大值
seq_ids = []
seq_ids = (model_inputs.input_ids[0].tolist())[start_index:end_index]
seq_tokens = [tokenizer.decode([token]) for token in seq_ids] #输入序列的token
all_generated_joint_attentions_avg = np.array([att[1] for att in joint_attentions]).mean(axis=0) #计算所有生成步数的平均注意力rollout
all_layer_max = all_generated_joint_attentions_avg.max(axis=0) #取所有层的最大值
print(all_layer_max.mean())
generate_text_with_scores_html(all_layer_max,seq_tokens,normalize= True,method='mean',output_path=f'Roll_layermax_tp_correct_{prefix}2.html')

##### 平均层

In [None]:
seq_ids = []
seq_ids = (model_inputs.input_ids[0].tolist())[start_index:end_index]
seq_tokens = [tokenizer.decode([token]) for token in seq_ids] #输入序列的token
all_generated_joint_attentions_avg = np.array([att[1] for att in joint_attentions]).mean(axis=0) #计算所有生成步数的平均注意力rollout
all_layer_mean = all_generated_joint_attentions_avg.mean(axis=0) #取所有层的平均值
print(all_layer_mean.mean())
generate_text_with_scores_html(all_layer_mean,seq_tokens,normalize= True,method='mean',output_path=f'Roll_layermean_tp_correct_{prefix}.html')

##### 累加注意力动画 (WIP)

In [None]:
cumulative_avg_attentions = []
response_per_token = [tokenizer.decode([token]) for token in response_ids[0]]
# 逐步累加生成步骤的平均注意力
for step in range(len(joint_attentions)):
    # 取前 step+1 个生成步的注意力
    avg_attention = np.array([att[1] for att in joint_attentions[:step+1]]).mean(axis=0)[-1]
    # 将当前步骤的累积平均注意力加入列表
    cumulative_avg_attentions.append(avg_attention)
last_layer_self_rollout = last_layer_self_rollout.reshape(1,-1)
last_layer_self_rollout = (last_layer_self_rollout - last_layer_self_rollout.mean(axis=0))/last_layer_self_rollout.std(axis=0)  
cumulative_avg_attentions = [((att - att.mean())/att.std()).tolist()[0] for att in cumulative_avg_attentions]
cumulative_avg_attentions = [att + last_layer_self_rollout for att in cumulative_avg_attentions]

with open('attention_data.js', 'w') as f:
    f.write(f"const seq_tokens = {json.dumps(seq_tokens)};\n")
    f.write(f"const cumulative_avg_attentions = {json.dumps(cumulative_avg_attentions)};\n")
    f.write(f"const response_per_token = {json.dumps(response_per_token)};\n")