In [1]:
import json
import os
os.environ["HF_HOME"] = "/DATA2/HuggingFace"
os.environ["CUDA_VISIBLE_DEVICES"] = "2"
import os.path as osp
import random
import numpy as np
import torch
from PIL import Image

from transformers import LlavaNextProcessor, LlavaNextForConditionalGeneration
from deephallu.data.mme import MMEDataset
model_name = "llava-hf/llava-v1.6-mistral-7b-hf"

seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
processor = LlavaNextProcessor.from_pretrained(model_name)
model = LlavaNextForConditionalGeneration.from_pretrained(
    model_name, 
    attn_implementation="eager"
).to("cuda")
# 确保模型配置启用 attention 输出
model.config.output_attentions = True
model.language_model.config.output_attentions = True

dataset = MMEDataset()

image, id, image_name, category, question, answer = dataset[0]
print(image_name, category, question, answer)
conversation = [
    {
        "role": "user",
        "content": [
            {"type": "text", "text": question},
            {"type": "image"},
        ],
    },
]

prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
inputs = processor(images=image, text=prompt, return_tensors="pt")

# 将inputs移动到模型设备
inputs = {k: v.to(model.device) for k, v in inputs.items()}

with torch.no_grad():
    outputs = model.generate(
        **inputs,
        max_new_tokens=1000,
        output_attentions=True,
        output_scores=True,  # 添加这个参数
        return_dict_in_generate=True
    )
    # tensor (batch_size, sequence_length)
    output_ids = outputs.sequences
    # tuple of tensors (step, (batch_size, vocab_size))
    scores = outputs.scores  
    # tuple of tuple of tensors (step, layer, (batch_size, head, sequence_length_source, sequence_length_target)), from step 1 the sequence_length_source is 1 and the sequence_length_target is previous length (causal attention)
    attentions = outputs.attentions

Fetching 2 files: 100%|██████████| 2/2 [00:00<00:00, 16288.56it/s]
Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


Loading checkpoint shards: 100%|██████████| 4/4 [00:01<00:00,  2.36it/s]
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


21580 artwork Is this artwork created by linard, jacques? Please answer yes or no. Yes


In [3]:
# for token_id in inputs["input_ids"][0]:
#     print(processor.decode([token_id], skip_special_tokens=False))
# # Create a list of dictionaries containing token IDs and their corresponding tokens
# token_mapping = []
# for i, token_id in enumerate(inputs["input_ids"][0]):
#     token = processor.decode([token_id], skip_special_tokens=False)
#     token_mapping.append({
#         "token_id": token_id.item(),
#         "token": token,
#         "position": i
#     })

token_json = []
for i in range(len(inputs["input_ids"][0])):
    token_json.append({
        "token_id": inputs["input_ids"][0][i].item(),
        "token": processor.decode([inputs["input_ids"][0][i].item()], skip_special_tokens=False),
        "position": i,
        "type": "text" if inputs["input_ids"][0][i].item() != processor.tokenizer.image_token_id else "image",
        "io": "input"
    })
for i in range(len(output_ids[0][len(inputs["input_ids"][0]):])):
    token_json.append({
        "token_id": output_ids[0][len(inputs["input_ids"][0]):][i].item(),
        "token": processor.decode([output_ids[0][len(inputs["input_ids"][0]):][i].item()], skip_special_tokens=False),
        "position": i + len(inputs["input_ids"][0]),
        "type": "text",
        "io": "output"
    })

# 将token_json转换为json文件
print(json.dumps(token_json, indent=2))
with open("./sequences.json", "w") as f:
    json.dump(token_json, f, indent=2)


[
  {
    "token_id": 1,
    "token": "<s>",
    "position": 0,
    "type": "text",
    "io": "input"
  },
  {
    "token_id": 733,
    "token": "[",
    "position": 1,
    "type": "text",
    "io": "input"
  },
  {
    "token_id": 16289,
    "token": "INST",
    "position": 2,
    "type": "text",
    "io": "input"
  },
  {
    "token_id": 28793,
    "token": "]",
    "position": 3,
    "type": "text",
    "io": "input"
  },
  {
    "token_id": 28705,
    "token": "",
    "position": 4,
    "type": "text",
    "io": "input"
  },
  {
    "token_id": 32000,
    "token": "<image>",
    "position": 5,
    "type": "image",
    "io": "input"
  },
  {
    "token_id": 32000,
    "token": "<image>",
    "position": 6,
    "type": "image",
    "io": "input"
  },
  {
    "token_id": 32000,
    "token": "<image>",
    "position": 7,
    "type": "image",
    "io": "input"
  },
  {
    "token_id": 32000,
    "token": "<image>",
    "position": 8,
    "type": "image",
    "io": "input"
  },
  {
    "

In [4]:

import torch
import torch.nn.functional as F
from typing import List, Dict, Tuple

def calculate_entropy(probs:torch.Tensor, log_base='e'):
    """计算概率分布的熵
    log_base: 对数底数，'e'表示自然对数(nats)，'2'表示以2为底(bits)
    """
    if log_base == 'e':
        return - (probs * torch.log(probs + 1e-10)).sum(dim=-1)
    elif log_base == '2':
        return - (probs * torch.log2(probs + 1e-10)).sum(dim=-1)
    else:
        raise ValueError(f"Invalid log base: {log_base}")
predictions_json = []
for s in range(len(scores)):
    probs = F.softmax(scores[s][0], dim=-1)
    entropy_nats = calculate_entropy(probs, log_base='e')
    entropy_bits = calculate_entropy(probs, log_base='2')
    top_k_probs, top_k_indices = torch.topk(probs, k=5)
    top_k_tokens = [
        processor.decode([token_id], skip_special_tokens=False) 
        for token_id in top_k_indices
    ]
    # print(f"step {s} entropy: {entropy_nats} nats, {entropy_bits} bits")
    # print(f"step {s} top-k tokens: {top_k_tokens}")
    # print(f"step {s} top-k probs: {top_k_probs.tolist()}")
    predictions_json.append({
        "step": s,
        "entropy_nats": entropy_nats.tolist(),
        "entropy_bits": entropy_bits.tolist(),
        "top_k_tokens": top_k_tokens,
        "top_k_probs": top_k_probs.tolist(),
        "top_k_token_ids": top_k_indices.tolist()
    })
print(json.dumps(predictions_json, indent=2))
with open("./scores.json", "w") as f:
    json.dump(predictions_json, f, indent=2)


[
  {
    "step": 0,
    "entropy_nats": 0.8799880743026733,
    "entropy_bits": 1.2695542573928833,
    "top_k_tokens": [
      "No",
      "Yes",
      "The",
      "Based",
      "I"
    ],
    "top_k_probs": [
      0.5704723596572876,
      0.39297419786453247,
      0.018950100988149643,
      0.005484007764607668,
      0.004165687598288059
    ],
    "top_k_token_ids": [
      1770,
      5592,
      415,
      17158,
      315
    ]
  },
  {
    "step": 1,
    "entropy_nats": 0.07788734138011932,
    "entropy_bits": 0.11236768215894699,
    "top_k_tokens": [
      ",",
      ".",
      "",
      "pe",
      "to"
    ],
    "top_k_probs": [
      0.9855651259422302,
      0.014067853800952435,
      0.0002545239112805575,
      2.8576298063853756e-05,
      1.6235611838055775e-05
    ],
    "top_k_token_ids": [
      28725,
      28723,
      28705,
      386,
      298
    ]
  },
  {
    "step": 2,
    "entropy_nats": 1.0004420280456543,
    "entropy_bits": 1.4433329105377197,

In [4]:
print(f"shape of generated_ids: {output_ids.shape}")
print(processor.decode(output_ids[0][len(inputs['input_ids'][0]):], skip_special_tokens=True))
print(f"len of input_ids: {len(inputs['input_ids'][0])}, len of generated_ids: {len(output_ids[0][len(inputs['input_ids'][0]):])}")

shape of generated_ids: torch.Size([1, 2413])
No, the artwork is not created by Jacques-Louis David. The painting you've shown is "Still Life with Fruit" by Willem Kalf, a Dutch still life painter. 
len of input_ids: 2369, len of generated_ids: 44


In [5]:
print(f"len of steps of the scores: {len(scores)}")
print(f"len of steps of the attentions: {len(attentions)}")
print(f"number of layers: {len(attentions[0])}")
print(f"number of heads: {len(attentions[0][0])}")
print(f"shape of the first step's scores: {scores[0].shape}")
print(f"shape of the first step's attentions (layer 0): {attentions[0][0].shape}")
print(f"shape of the second step's scores: {scores[1].shape}")
print(f"shape of the second step's attentions (layer 0): {attentions[1][0].shape}")
print(f"shape of the last step's scores: {scores[-1].shape}")
print(f"shape of the last step's attentions (layer 0): {attentions[-1][0].shape}")

len of steps of the scores: 44
len of steps of the attentions: 44
number of layers: 32
number of heads: 1
shape of the first step's scores: torch.Size([1, 32064])
shape of the first step's attentions (layer 0): torch.Size([1, 32, 2369, 2369])
shape of the second step's scores: torch.Size([1, 32064])
shape of the second step's attentions (layer 0): torch.Size([1, 32, 1, 2370])
shape of the last step's scores: torch.Size([1, 32064])
shape of the last step's attentions (layer 0): torch.Size([1, 32, 1, 2412])


# 计算生成的scores（top 10的token）的entropy，并计算每个token的entropy的平均值

In [6]:

import torch
import torch.nn.functional as F
from typing import List, Dict, Tuple

def calculate_entropy(probs:torch.Tensor, log_base='e'):
    """计算概率分布的熵
    log_base: 对数底数，'e'表示自然对数(nats)，'2'表示以2为底(bits)
    """
    if log_base == 'e':
        return - (probs * torch.log(probs + 1e-10)).sum(dim=-1)
    elif log_base == '2':
        return - (probs * torch.log2(probs + 1e-10)).sum(dim=-1)
    else:
        raise ValueError(f"Invalid log base: {log_base}")

def analyze_scores_steps(
    scores: Tuple[torch.Tensor, ...],
    processor,
    top_k: int = 5
) -> List[List[Dict]]:
    """
    分析每个生成步骤的概率分布熵和top-k tokens
    
    Args:
        scores: 来自model.generate()的scores输出，tuple of tensors (step, (batch_size, vocab_size))
        processor: LlavaNextProcessor实例，用于解码token ids
        top_k: 保留top-k个最高概率的tokens
        
    Returns:
        List[List[Dict]]: 每个步骤的分析结果
        第一个维度是batch_size，第二个维度是每个batch的分析结果，包含:
            - step: 步骤索引
            - entropy: 该步骤的熵值
            - top_k_tokens: top-k tokens的列表
            - top_k_probs: top-k tokens对应的概率
            - top_k_token_ids: top-k tokens对应的token ids
    """
    batch_size = scores[0].shape[0]
    results = [[] for _ in range(batch_size)]
    
    for step_idx, logits in enumerate(scores):
        logits = logits.detach().cpu()
        # logits shape: (batch_size, vocab_size)
        for i in range(batch_size):
            logits_single = logits[i]  # shape: (vocab_size,)
            # 计算概率分布
            probs = F.softmax(logits_single, dim=-1)
            # 计算熵 H(p) = -Σ p(x) * log(p(x))
            # 使用loge计算，单位为nats，使用log2计算，单位为bits
            entropy = calculate_entropy(probs)
            # 获取top-k tokens
            top_k_probs, top_k_indices = torch.topk(probs, k=top_k)
            # 解码token ids到文本
            top_k_token_ids = top_k_indices.tolist()
            top_k_tokens = [
                processor.decode([token_id], skip_special_tokens=False) 
                for token_id in top_k_token_ids
            ]
            # 保存结果
            step_result = {
                'step': step_idx,
                'entropy': entropy.item(),
                'top_k_tokens': top_k_tokens,
                'top_k_probs': top_k_probs.tolist(),
                'top_k_token_ids': top_k_token_ids
            }
            results[i].append(step_result)
    return results


def print_analysis_summary(results: List[Dict], num_steps: int = 5):
    """
    打印分析结果的摘要
    
    Args:
        results: analyze_generation_steps的返回结果
        num_steps: 显示前num_steps个步骤的详细信息
    """
    print(f"总生成步骤数: {len(results)}")
    print(f"平均熵: {sum(r['entropy'] for r in results) / len(results):.4f} bits")
    print(f"最小熵: {min(r['entropy'] for r in results):.4f} bits (步骤 {min(results, key=lambda x: x['entropy'])['step']})")
    print(f"最大熵: {max(r['entropy'] for r in results):.4f} bits (步骤 {max(results, key=lambda x: x['entropy'])['step']})")
    print("\n" + "="*80)
    
    print(f"\n前 {num_steps} 个步骤的详细信息:")
    for i, result in enumerate(results[:num_steps]):
        print(f"\n步骤 {result['step']}:")
        print(f"  熵: {result['entropy']:.4f} bits")
        print(f"  Top-{len(result['top_k_tokens'])} Tokens:")
        for j, (token, prob, token_id) in enumerate(zip(
            result['top_k_tokens'], 
            result['top_k_probs'],
            result['top_k_token_ids']
        )):
            print(f"    {j+1}. '{token}' (ID: {token_id}) - 概率: {prob:.4f}")

# 分析生成步骤
results = analyze_scores_steps(scores, processor, top_k=5)

# 打印摘要
print_analysis_summary(results[0], num_steps=5)

总生成步骤数: 44
平均熵: 0.8588 bits
最小熵: 0.0005 bits (步骤 7)
最大熵: 3.3161 bits (步骤 30)


前 5 个步骤的详细信息:

步骤 0:
  熵: 0.8800 bits
  Top-5 Tokens:
    1. 'No' (ID: 1770) - 概率: 0.5705
    2. 'Yes' (ID: 5592) - 概率: 0.3930
    3. 'The' (ID: 415) - 概率: 0.0190
    4. 'Based' (ID: 17158) - 概率: 0.0055
    5. 'I' (ID: 315) - 概率: 0.0042

步骤 1:
  熵: 0.0779 bits
  Top-5 Tokens:
    1. ',' (ID: 28725) - 概率: 0.9856
    2. '.' (ID: 28723) - 概率: 0.0141
    3. '' (ID: 28705) - 概率: 0.0003
    4. 'pe' (ID: 386) - 概率: 0.0000
    5. 'to' (ID: 298) - 概率: 0.0000

步骤 2:
  熵: 1.0004 bits
  Top-5 Tokens:
    1. 'the' (ID: 272) - 概率: 0.4965
    2. 'this' (ID: 456) - 概率: 0.4462
    3. 'that' (ID: 369) - 概率: 0.0178
    4. 'it' (ID: 378) - 概率: 0.0151
    5. 'I' (ID: 315) - 概率: 0.0067

步骤 3:
  熵: 0.5610 bits
  Top-5 Tokens:
    1. 'artwork' (ID: 27261) - 概率: 0.8798
    2. 'artist' (ID: 7325) - 概率: 0.0552
    3. 'painting' (ID: 11514) - 概率: 0.0404
    4. 'image' (ID: 3469) - 概率: 0.0097
    5. 'name' (ID: 1141) - 概率: 0.0037

步骤 4:

In [7]:
# # 清理GPU内存
# del outputs, inputs
# torch.cuda.empty_cache()