# 1. 准备

In [1]:
# 设置可见的GPU 为 3 和4
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "4, 5"
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'

In [2]:
# 导入必备的包

import torch.nn.functional as F

from PIL import Image
from matplotlib.pyplot import imshow

from llava.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN, DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN
from llava.conversation import conv_templates
from llava.model.builder import load_pretrained_model
from llava.utils import disable_torch_init
from llava.mm_utils import tokenizer_image_token, process_images, get_model_name_from_path, select_best_resolution

In [None]:
# 模型、测试样例设置

model_path = '/home/kaiyu/Model/liuhaotian/llava-v1.6-vicuna-7b/'
model_base = None
question = 'Is there a cat in the image?\\nAnswer the question using a single word or phrase.'
# question = 'describe the image'
# image_file = "./sample/dogsled.jpg"
image_file = "./sample/tigercat.jpg"
conv_mode = 'vicuna_v1'
temperature = 0
top_p = None
num_beams = 1

# from ImageNet
image = Image.open(image_file).convert('RGB')
# image = Image.open('./sample/dogsled.jpg')
imshow(image)

In [None]:
# 加载模型
# Suppress warnings
import warnings
warnings.filterwarnings("ignore", category=UserWarning, message=".*copying from a non-meta parameter.*")

# Model
disable_torch_init()
model_path = os.path.expanduser(model_path)
model_name = get_model_name_from_path(model_path)
tokenizer, model, image_processor, context_len = load_pretrained_model(model_path, model_base, model_name)

In [5]:
# input 处理

qs = question
cur_prompt = qs
if model.config.mm_use_im_start_end:
    qs = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + qs
else:
    qs = DEFAULT_IMAGE_TOKEN + '\n' + qs

conv = conv_templates[conv_mode].copy()
conv.append_message(conv.roles[0], qs)
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt()

input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).cuda()

image_tensor = process_images([image], image_processor, model.config)[0]


In [None]:
for i, ids in enumerate(input_ids[0]):
    if(i == 35):
        # print(i + 1 - len(temp_ids[0]), tokenizer.batch_decode([ids], skip_special_tokens=True), end='  ')
        continue
    print(i + 1 - len(input_ids[0]), tokenizer.batch_decode([ids], skip_special_tokens=True), end='  ')

# 2. 推理

In [None]:
# 推理

model.train()

image_tensor = image_tensor.unsqueeze(0).half().cuda()
image_tensor.requires_grad_()

output = model(
    input_ids=input_ids,
    use_cache=False,
    images=image_tensor,
    image_sizes=[image.size],
    output_attentions=True,
)

# 3. 计算 CAM

## 3.1 获取注意力分数并激活梯度跟踪

In [8]:
attentions = output.attentions  # 假设out是模型的输出，并且包含注意力权重
for attn_layer in attentions:
    attn_layer.retain_grad()  # 保留非叶张量的梯度

## 3.2 预测输出并反向传播

In [9]:
prediction = output.logits
label_index = prediction.argmax(dim=-1)  # 获取输出索引，这里假设是argmax作为预测类别
prediction_score = prediction[:, -1, label_index[0][-1]]  # 获取对应的得分

prediction_score.backward()  # 对得分进行反向传播，生成梯度

## 3.3 获取梯度并计算 cam 值

## 3.4 可视化

In [10]:
gradient = [attn_layer.grad for attn_layer in attentions]  # 获取所有层的梯度

attn_res = []
modified_attention = []
grad_res = []

for attn_layer, grad_layer in zip(attentions, gradient):
    attn_single = attn_layer
    attn_single = attn_single.mean(dim=1)  # 假设我们合并头部的注意力
    # attn_single = F.relu(attn_single)
    attn_res.append(attn_single)

    if(grad_layer is None):
        continue
    
    # 广播后点乘的shape可能是 [batch_size, num_heads, seq_length, seq_length]
    grad_weighted_attn = grad_layer * attn_layer
    grad_weighted_attn = grad_weighted_attn.mean(dim=1)  # 假设我们合并头部的注意力
    # grad_weighted_attn = F.relu(grad_weighted_attn)
    modified_attention.append(grad_weighted_attn)

    grad_single = grad_layer
    grad_single = grad_single.mean(dim=1)  # 假设我们合并头部的注意力
    # grad_single = F.relu(grad_single)
    grad_res.append(grad_single)

In [None]:
print(modified_attention[0][0][-1, -63])

In [12]:
import csv

# 假设 attn_res, grad_res, modified_attention 已经定义
# attn_res = ...
# grad_res = ...
# modified_attention = ...

layer_nums_attention = range(32)  # 0-31
layer_nums_grad_cam = range(5)    # 0-4
target_tokens = [-1]  # 固定
ttypes = ['attention', 'grad', 'cam']

# 准备表头
headers = []
data = []

for ttype in ttypes:
    if ttype == 'attention':
        layer_nums = layer_nums_attention
        temp_info = attn_res
    elif ttype == 'grad':
        layer_nums = layer_nums_grad_cam
        temp_info = grad_res
    elif ttype == 'cam':
        layer_nums = layer_nums_grad_cam
        temp_info = modified_attention

    for layer_num in layer_nums:
        header = f"{ttype}_{layer_num}"
        headers.append(header)
        column_data = []
        for target_token in target_tokens:
            res = temp_info[layer_num][0][target_token, -63:]
            column_data.extend([r.item() for r in res])
        data.append(column_data)

# 转置数据，使每行对应一个数据点
transposed_data = list(zip(*data))

# 写入 CSV 文件
with open('results.csv', mode='w', newline='') as file:
    writer = csv.writer(file)
    # 写入表头
    writer.writerow(headers)
    # 写入数据
    writer.writerows(transposed_data)

In [None]:
import time
import torch.nn.functional as F
import matplotlib.pyplot as plt

# 假设 attn_res, grad_res, modified_attention, image 已经定义
# attn_res = ...
# grad_res = ...
# modified_attention = ...
# image = ...

target_token = -1

ttype = 'attention'
# ttype = 'grad'
# ttype = 'cam'

if ttype == 'attention':
    temp_info = attn_res
elif ttype == 'grad':
    temp_info = grad_res
elif ttype == 'cam':
    temp_info = modified_attention

for layer_num in range(32):
    res = temp_info[layer_num][0][target_token, -63:]

    visual_cam = temp_info[layer_num][0][target_token, 35:611]

    # visual_cam 转为 24 * 24 的矩阵
    visual_cam = visual_cam.reshape(24, 24).cpu().detach().float()

    # 对 CAM 分数进行拉伸差值
    H, W = image.size
    cam = F.interpolate(visual_cam.unsqueeze(0).unsqueeze(0), size=(W, H), mode='bilinear', align_corners=False).squeeze()

    # 显示原始图像
    plt.imshow(image)

    # 叠加热力图
    plt.imshow(cam, alpha=0.5, cmap='jet')
    plt.title(f'Layer {layer_num}')
    plt.show()

    # 等待1秒
    time.sleep(1)

In [None]:
# 获取对应的 CAM 分数

layer_num = 0
target_token = -1

ttype = 'attention'
# ttype = 'grad'
# ttype = 'cam'

if ttype == 'attention':
    temp_info = attn_res
elif ttype == 'grad':
    temp_info = grad_res
elif ttype == 'cam':
    temp_info = modified_attention

res = temp_info[layer_num][0][target_token, -63:]
# for r in res:
#     print(r.item())

visual_cam = temp_info[layer_num][0][target_token, 35:611]

# visial_cam 转为 24 * 24 的矩阵
visual_cam = visual_cam.reshape(24, 24).cpu().detach().float()

# 对 CAM 分数进行拉伸差值
H, W = image.size
cam = F.interpolate(visual_cam.unsqueeze(0).unsqueeze(0), size=(W, H), mode='bilinear', align_corners=False).squeeze()

# 显示原始图像
imshow(image)

# 叠加热力图
imshow(cam, alpha=0.5, cmap='jet')

In [None]:
print(input_ids[0][1])
input_ids[0][35]=1
for i, ids in enumerate(input_ids[0]):
    print(tokenizer.batch_decode([ids], skip_special_tokens=True))