In [None]:
import torch
import yaml
import os
import base64
import numpy as np
import cv2
import matplotlib.pyplot as plt
import matplotlib as mpl
import warnings
from PIL import Image, ImageDraw, ImageFont
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, AutoTokenizer
from qwen_vl_utils import process_vision_info
from matplotlib.colors import LinearSegmentedColormap
warnings.filterwarnings("ignore")

In [None]:
model_name = "your/path/to/Qwen2-VL-7B-Instruct"
model = Qwen2VLForConditionalGeneration.from_pretrained(
    model_name,
    torch_dtype=torch.bfloat16,
    device_map="cuda:6",
    attn_implementation="eager"
)
processor = AutoProcessor.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
print(model)
print(model.config)

In [None]:
save_path = "../result/attention_result"
save_name = "position1_combined_result.png"
att_pos_path = os.path.join(save_path, "att_pos1.json")
if not os.path.exists(save_path):
    os.makedirs(save_path)
# 配置参数
original_paths = [    
    "../image/image1-chart-visual.png",    
    "../image/image2-chart-visual.png",
    "../image/image3-chart-visual.png",    
]
save_directory = save_path
target_dim = (1092,756)

file_names = ["att_pos1.json", "att_pos2.json", "att_pos3.json"]
keys = ["figure1", "figure2", "figure3"]


In [None]:
def resize_images(image_paths, save_dir, target_size=(2184,1512)):
    """缩放图像并返回新路径列表"""
    os.makedirs(save_dir, exist_ok=True)
    resized_paths = []
    
    for idx, path in enumerate(image_paths):
        with Image.open(path) as img:
            # 保持原始文件名并添加尺寸后缀
            filename = os.path.basename(path)
            name, ext = os.path.splitext(filename)
            new_name = f"{name}_{target_size[0]}x{target_size[1]}{ext}"
            save_path = os.path.join(save_dir, new_name)
            
            # 执行缩放并保存
            resized = img.resize(target_size, Image.LANCZOS)
            resized.save(save_path)
            resized_paths.append(save_path)
    
    return resized_paths

# 执行缩放并获取新路径
processed_paths = resize_images(original_paths, save_directory, target_dim)

# 构建消息结构
prompt = "What was the youth unemployment rate in Lebanon in 2020?"
#3838, 572, 279, 12537, 25608, 4379, 304, 39271( Lebanon), 304, 220(2020), 17, 15, 17, 15(2020), 30(?)
messages = [
    {
        "role": "user",
        "content": [
            *[{"type": "image", "image": path} for path in processed_paths],
            {"type": "text", "text": prompt}
        ]
    }
]

text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
print(text)
print(tokenizer.encode(text))
print(len(tokenizer.encode(text)))

image_inputs, video_inputs = process_vision_info(messages)
print(image_inputs)

inputs = processor(
    text=[text],
    images=image_inputs,
    videos=video_inputs,
    padding=True,
    return_tensors="pt"
).to(model.device)
print(inputs.keys())
print(inputs.pixel_values.shape)
# 30-36
# -7~-13

In [None]:
# 1092*756的图像patch为39*27
with torch.no_grad():
    outputs = model(**inputs, output_attentions=True)
    attns = outputs.attentions
print(len(attns))
print(attns[0].shape)

In [None]:
# 几张图片就减去几
if attns[0].shape[-1] != len(tokenizer.encode(text)) + inputs.pixel_values.shape[0] / 4 - 3:
    raise ValueError("Error")

In [None]:
attention_avg = attns[14].mean(dim=1).squeeze(0)
# 每一层
# in Lebanon in 2020:30~36,-7~-13位置
#inputs.pixel_values.shape[0]是所有 vision 图像的token数
# attention_map = attention_avg[inputs.pixel_values.shape[0] // 4 + 30 - 3, 15:inputs.pixel_values.shape[0] // 12 + 15]
attention_map = attention_avg[inputs.pixel_values.shape[0] // 4 +  27 : inputs.pixel_values.shape[0] // 4 + 33, 15:inputs.pixel_values.shape[0] // 12 + 15]
attn_sum = attention_map.sum(axis=0)
attn_sum = (attn_sum - attn_sum.min()) / (attn_sum.max() - attn_sum.min())

image = np.array(image_inputs[0])
heatmap = np.zeros_like(image)
patch_size = 28

for col in range(image_inputs[0].size[0] // patch_size):
    for row in range(image_inputs[0].size[1] // patch_size):
        x1, y1 = col * patch_size, row * patch_size
        x2, y2 = x1 + patch_size, y1 + patch_size
        color = cv2.applyColorMap(np.array([[int(attn_sum[row * (image_inputs[0].size[0] // patch_size) + col] * 255)]], dtype=np.uint8), cv2.COLORMAP_TURBO)[0, 0]
        heatmap[y1:y2, x1:x2] = color

overlay = cv2.addWeighted(image, 0.50, heatmap, 0.50, 0)

plt.figure(figsize=(8, 8))
plt.imshow(overlay)
plt.axis("off")
plt.show()

In [None]:
def generate_combined_plot(images_list, labels=["position 1", "position 2", "position 3"]):
    # 统一图片尺寸（假设三张图尺寸相同）
    widths, heights = zip(*(img.size for img in images_list))
    max_width, max_height = max(widths), max(heights)
    
    # 创建新画布（含文字标注区域）
    spacing = 50  # 图片间距
    text_height = 150  # 文字区域高度
    total_width = sum(widths) + spacing*(len(images_list)-1)
    combined_img = Image.new('RGB', (total_width, max_height + text_height), color=(255, 255, 255))
    
    # 加载字体
    font_size = 100
    try:
        font = ImageFont.truetype("../utils/Arial.ttf", font_size)
    except IOError:
        font = ImageFont.load_default()
    draw = ImageDraw.Draw(combined_img)
    x_offset = 0
    
    for idx, (img, label) in enumerate(zip(images_list, labels)):
        # 粘贴图片
        combined_img.paste(img, (x_offset, 0))
        # 添加文字标注（居中显示）
        text_width = draw.textlength(label, font=font)
        draw.text(
            (x_offset + (img.width - text_width)//2, max_height + 5),
            label,
            fill=(0, 0, 0),
            font=font
        )
        x_offset += img.width + spacing
    
    return combined_img

In [None]:
# 生成不同位置对应的注意力分布热力图
white = (255 / 255, 255 / 255, 255 / 255)
dark_red = (255 / 255, 0 / 255, 0 / 255)
custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", [white, dark_red])

# ========== 定义三个注意力映射 ========== 
attention_maps = {
    "figure1": {
        "start": 15,
        "end": inputs.pixel_values.shape[0] // 12 + 15
    },
    "figure2": {
        "start": 17 + inputs.pixel_values.shape[0] // 12,
        "end": inputs.pixel_values.shape[0] // 6 + 17
    },
    "figure3": {
        "start": 19 + inputs.pixel_values.shape[0] // 6,
        "end": inputs.pixel_values.shape[0] // 4 + 19
    }
}
position_attentions = {}
temp_images = []
position_scores = {}  # 新增：存储各位置得分
position_fullimage_scores = {}  # 新增：存储全局得分
for idx, (name, pos) in enumerate(attention_maps.items()):
    # ========== 注意力映射提取 ==========
    # attention_map = attention_avg[inputs.pixel_values.shape[0] // 4 +  27 : inputs.pixel_values.shape[0] // 4 + 33, 15:inputs.pixel_values.shape[0] // 12 + 15]
    # attn_sum = attention_map.sum(axis=0)
    attention_map = attention_avg[
        inputs.pixel_values.shape[0] // 4 + 27 : inputs.pixel_values.shape[0] // 4 + 33,
        pos["start"]:pos["end"]
    ]
    attention_map = attention_map.sum(axis=0)
    position_attentions[name] = (attention_map - attention_map.min()) / (attention_map.max() - attention_map.min())
    # print(idx, name, attention_map.shape)
    # ========== 注意力得分计算 ==========
    # 定义区域参数
    start_row = 10
    end_row = 18
    patches_per_row = 23
    # 计算索引范围
    start_idx = (start_row - 1) * patches_per_row + 15  
    end_idx = end_row * patches_per_row - 1 + 15     
    region_attn = attention_map[start_idx:end_idx]
    position_scores[name] = region_attn.mean().item()   # 计算平均值作为得分
    # ========== 全局注意力得分计算 ==========
    position_fullimage_scores[name] = attention_map.mean().item()  # 直接计算全局平均注意力得分
    # ========== 热力图生成 ==========
    image = np.array(image_inputs[idx])
    heatmap = np.zeros_like(image)
    num_cols = image_inputs[idx].size[0] // patch_size
    num_rows = image_inputs[idx].size[1] // patch_size
    for col in range(num_cols):
        for row in range(num_rows):
            x1, y1 = col * patch_size, row * patch_size
            x2, y2 = x1 + patch_size, y1 + patch_size
            idx = row * num_cols + col
            if idx < len(position_attentions[name]):  # 防止索引越界
                attn_value = position_attentions[name][idx].item() if hasattr(position_attentions[name][idx], "item") else position_attentions[name][idx]
                color_float = custom_cmap(attn_value)
                color_uint8 = (np.array(color_float[:3]) * 255).astype(np.uint8)
                heatmap[y1:y2, x1:x2] = color_uint8

    # ========== 图像合成与保存 ==========
    overlay = cv2.addWeighted(image, 0.50, heatmap, 0.50, 0)
    
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.imshow(overlay)
    ax.axis("off")
    
    # 动态文件名生成.生成单张图后暂存到列表
    save_path_tmp = os.path.join(save_path, f"{name}.png")  # 与原有计数逻辑配合
    plt.savefig(save_path_tmp, bbox_inches="tight", dpi=300, pad_inches=0.05)
    # 打开保存的图像并转换为 RGB
    temp_images.append(Image.open(save_path_tmp).convert('RGB'))

    plt.close()# 防止内存泄漏
# 拼接并保存最终图像
combined = generate_combined_plot(temp_images)
combined.save(os.path.join(save_path, save_name))

In [None]:
# 生成不同位置对应的注意力分布热力图
# 深蓝色蒙版参数
MASK_COLOR = (44, 56, 109)  # RGB, 0-255
MASK_OPACITY = 0.55

# 高饱和度热力图渐变（可自定义更多节点）
HEATMAP_COLORS = [
    (0.0, (0.1, 0.2, 0.7)),   # 深蓝
    (0.3, (0.2, 0.75, 0.6)),  # 青绿
    (0.6, (1.0, 0.9, 0.1)),   # 黄
    (1.0, (1.0, 0.2, 0.0)),   # 红
]
custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", [c[1] for c in HEATMAP_COLORS], N=256)

def apply_blue_mask(image, mask_color=MASK_COLOR, opacity=MASK_OPACITY):
    mask = np.full_like(image, mask_color, dtype=np.uint8)
    return cv2.addWeighted(image, 1-opacity, mask, opacity, 0)


# ========== 定义三个注意力映射 ========== 
attention_maps = {
    "figure1": {
        "start": 15,
        "end": inputs.pixel_values.shape[0] // 12 + 15
    },
    "figure2": {
        "start": 17 + inputs.pixel_values.shape[0] // 12,
        "end": inputs.pixel_values.shape[0] // 6 + 17
    },
    "figure3": {
        "start": 19 + inputs.pixel_values.shape[0] // 6,
        "end": inputs.pixel_values.shape[0] // 4 + 19
    }
}
position_attentions = {}
temp_images = []
position_scores = {}  # 新增：存储各位置得分
position_fullimage_scores = {}  # 新增：存储全局得分
for idx, (name, pos) in enumerate(attention_maps.items()):
    # ========== 注意力映射提取 ==========
    attention_map = attention_avg[
        inputs.pixel_values.shape[0] // 4 + 27 : inputs.pixel_values.shape[0] // 4 + 33,
        pos["start"]:pos["end"]
    ]
    attention_map = attention_map.sum(axis=0)
    position_attentions[name] = (attention_map - attention_map.min()) / (attention_map.max() - attention_map.min())
    # print(idx, name, attention_map.shape)
    # ========== 热力图生成 ==========
    image = np.array(image_inputs[idx])
    heatmap = np.zeros_like(image)
    num_cols = image_inputs[idx].size[0] // patch_size
    num_rows = image_inputs[idx].size[1] // patch_size
    for col in range(num_cols):
        for row in range(num_rows):
            x1, y1 = col * patch_size, row * patch_size
            x2, y2 = x1 + patch_size, y1 + patch_size
            idx = row * num_cols + col
            if idx < len(attn_sum):
                color_float = custom_cmap(attn_sum[idx])
                color_uint8 = (np.array(color_float[:3]) * 255).astype(np.uint8)
                heatmap[y1:y2, x1:x2] = color_uint8

    # ========== 图像合成与保存 ==========
    # 可选：高斯模糊让热力图更平滑
    heatmap = cv2.GaussianBlur(heatmap, (35, 35), 20)

    # 叠加深蓝色蒙版
    masked_image = apply_blue_mask(image)

    # 叠加热力图
    overlay = cv2.addWeighted(masked_image, 0.4, heatmap, 0.6, 0)

In [None]:
import json

# 序列化前处理 numpy/tensor → list
position_attentions_data = {}
for name, tensor in position_attentions.items():
    # print(name, tensor.shape)
    position_attentions_data[name] = tensor.to(torch.float32).cpu().numpy().tolist()  # 转换为 float32 再转换为标准列表
# print(position_attentions_data)
# 添加 position_scores 和 position_fullimage_scores
position_attentions_data["position_scores"] = position_scores
position_attentions_data["position_fullimage_scores"] = position_fullimage_scores

# 保存到文件
with open(att_pos_path, 'w') as f:
    json.dump(position_attentions_data, f, indent=2)

In [None]:
# 生成差异热图
def plot_diff_heatmap(diff_matrix, save_path):
    # 创建包含三列子图的画布
    fig = plt.figure(figsize=(24, 8))
    gs = fig.add_gridspec(1, 3, wspace=0.15)
    
    # 颜色映射定义
    diff_cmap = LinearSegmentedColormap.from_list("diff_cmap", ["blue", "white", "red"])
    norm = plt.Normalize(vmin=-1, vmax=1)
    
    # 共享颜色条
    cax = fig.add_axes([0.92, 0.15, 0.02, 0.7])
    
    for idx, (diff_name, diff) in enumerate(diff_matrix.items()):
        ax = fig.add_subplot(gs[0, idx])
        
        # 数据转换
        diff_np = diff.to(torch.float32).cpu().detach().numpy()
        num_cols = image_inputs[0].size[0] // patch_size
        num_rows = image_inputs[0].size[1] // patch_size
        diff_grid = diff_np.reshape(num_rows, num_cols).T
        
        # 获取原始图像
        image = np.array(image_inputs[2])
        
        # 绘制底片原图
        ax.imshow(image)
        
        # 叠加半透明热力层
        heatmap = ax.imshow(
            diff_grid,
            cmap=diff_cmap,
            norm=norm,
            alpha=0.9,
            extent=[0, image.shape[1], image.shape[0], 0],  # 匹配原图尺寸
            interpolation='bilinear'  # nearest保持patch边界
        )
        
        ax.set_title(f"{diff_name.replace('-', ' vs ')}", fontsize=14)
        ax.axis('off')
        
        if idx == 2:
            fig.colorbar(heatmap, cax=cax, orientation='vertical')
    
    plt.subplots_adjust(left=0.05, right=0.9, top=0.85)
    tmp_path = os.path.join(save_path, "combined_diff.png")
    plt.savefig(tmp_path, bbox_inches="tight", dpi=300)
    plt.close()
    temp_images.append(Image.open(tmp_path).convert('RGB'))
    
# 加载保存的数据
# 初始化 position_attentions 字典
position_attentions = {}
for file_name, key in zip(file_names, keys):
    file_path = os.path.join(save_path, file_name)
    with open(file_path, 'r') as f:
        loaded_data = json.load(f)
    # 提取对应的值并转换为 tensor
    position_attentions[key] = torch.tensor(loaded_data[key], dtype=torch.float32)
for name, tensor in position_attentions.items():
    print(name, tensor.shape)
diff_matrix = {
    "pos1-pos2": position_attentions["figure1"] - position_attentions["figure2"],
    "pos1-pos3": position_attentions["figure1"] - position_attentions["figure3"],
    "pos2-pos3": position_attentions["figure2"] - position_attentions["figure3"]
}
plot_diff_heatmap(diff_matrix, save_path)

In [None]:
for name, tensor in position_attentions.items():
    print(name, tensor.shape)

In [None]:
print("关键区域注意力得分:")
for name, score in position_scores.items():
    print(f"{name}: {score}")

print("全局注意力得分:")
for name, score in position_fullimage_scores.items():
    print(f"{name}: {score}")