In [None]:
import torch
device = "cuda"

In [3]:
# example iamge and text
sample_img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg' 
sample_text = 'a woman sitting on the beach with a dog'


# BLIP-1

In [None]:
import os
import json
from tqdm import tqdm
import numpy as np
import torch
from PIL import Image
from transformers import BlipForImageTextRetrieval, AutoProcessor
import requests
from io import BytesIO

blip_itm_large_coco_name = "Salesforce/blip-itm-large-coco"
model = BlipForImageTextRetrieval.from_pretrained("Salesforce/blip-itm-large-coco", local_files_only=True)
processor = AutoProcessor.from_pretrained("Salesforce/blip-itm-large-coco", local_files_only=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()
print("BLIP1-large-for-zj 模型加载完成")

@torch.no_grad()
def encode_text(text: str):
    """
    编码文本，返回投影后的特征向量（与图像特征维度一致）
    """
    if not text or not isinstance(text, str):
        raise ValueError(f"text is required")
    inputs = processor(text=[text], return_tensors="pt", truncation=True).to(device)
    outputs = model.text_encoder(**inputs)
    # 取 [CLS] token embedding
    cls_emb = outputs.last_hidden_state[:, 0, :]  # shape [1, text_hidden_dim] 通常是 768
    
    # 关键修复：使用投影层将文本特征投影到与图像特征相同的维度
    # HuggingFace 的 BlipForImageTextRetrieval 模型应该有 text_proj 层
    # 参考官方实现 blip_image_text_matching.py 第 96 行：text_feat = F.normalize(self.text_proj(...), dim=-1)
    if hasattr(model, 'text_proj'):
        text_feat = model.text_proj(cls_emb)
    else:
        # 如果找不到投影层，打印模型结构帮助调试
        print("模型属性:", [attr for attr in dir(model) if not attr.startswith('_')])
        raise AttributeError("模型没有找到 text_proj 层。请检查模型结构，可能需要使用不同的方法访问投影层。")
    
    # L2 归一化
    text_feat = torch.nn.functional.normalize(text_feat, dim=-1)
    return text_feat[0].cpu().numpy().tolist()


@torch.no_grad()
def encode_image(image_path: str):
    """
    编码图像，返回投影后的特征向量（与文本特征维度一致）
    """
    # 支持本地路径或URL
    if image_path.startswith("http://") or image_path.startswith("https://"):
        response = requests.get(image_path)
        response.raise_for_status()
        raw_image = Image.open(BytesIO(response.content)).convert("RGB")
    elif os.path.exists(image_path):
        raw_image = Image.open(image_path).convert("RGB")
    else:
        raise ValueError(f"Invalid image path or URL: {image_path}")

    inputs = processor(images=raw_image, return_tensors="pt").to(device)
    outputs = model.vision_model(**inputs)
    # 取 [CLS] token embedding（第一个token），对应官方实现中的 image_embeds[:, 0, :]
    img_emb = outputs.last_hidden_state[:, 0, :]  # shape [1, vision_hidden_dim] 通常是 1024
    
    # 关键修复：使用投影层将图像特征投影到与文本特征相同的维度
    # 参考官方实现 blip_image_text_matching.py 第 94 行：image_feat = F.normalize(self.vision_proj(...), dim=-1)
    if hasattr(model, 'vision_proj'):
        img_feat = model.vision_proj(img_emb)
    else:
        # 如果找不到投影层，打印模型结构帮助调试
        print("模型属性:", [attr for attr in dir(model) if not attr.startswith('_')])
        raise AttributeError("模型没有找到 vision_proj 层。请检查模型结构，可能需要使用不同的方法访问投影层。")
    
    # L2 归一化
    img_feat = torch.nn.functional.normalize(img_feat, dim=-1)
    return img_feat[0].cpu().numpy().tolist()


def eval_retrieval(txt, img_path):
    """
    计算文本和图像的余弦相似度
    """
    text_emb = encode_text(txt)
    img_emb = encode_image(img_path)

    txt_vectors = np.array(text_emb, dtype=np.float32)
    img_vectors = np.array(img_emb, dtype=np.float32)

    # 特征已经归一化，直接计算点积即可得到余弦相似度
    sim_score = np.dot(txt_vectors, img_vectors)
    return sim_score


sample_img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg' 
sample_text = 'a woman sitting on the beach with a dog'
if __name__ == '__main__':
    sim = eval_retrieval(sample_text, sample_img_url)
    print(sim)

  torch.utils._pytree._register_pytree_node(
  torch.utils._pytree._register_pytree_node(


BLIP1-large-for-zj 模型加载完成
0.5116037


# BLIP-2

In [None]:
# simple check
# reference: https://github.com/salesforce/LAVIS/blob/main/examples/blip2_feature_extraction.ipynb
from lavis.common.registry import registry
import os
# lavis_root = "/home/hsh/LAVIS/lavis"
# registry.register_path("library_root", lavis_root)
import torch
from PIL import Image
import json
from tqdm import tqdm
from lavis.models import load_model_and_preprocess

# ========= 配置 =========
device = "cuda" if torch.cuda.is_available() else "cpu"
model_name = "blip2_feature_extractor"
model_type = "pretrain" 

input_path = "/mnt/disk60T/dataset/Culture/Museum/Final_version/Final_Zhejiang.json"
output_path = "/home/hsh/data/zj_blip2.json"
os.makedirs(os.path.dirname(output_path), exist_ok=True)

model, vis_processors, txt_processors = load_model_and_preprocess(
    name=model_name,
    model_type=model_type,
    is_eval=True,
    device=device,
)
print("blip2_feature_extractor 加载完成")

# === text encoding ===
@torch.no_grad()
def encode_text(text: str):
    if not text or not isinstance(text, str):
        raise ValueError(f"text is required")
    text_input = txt_processors["eval"](text)
    tokens = model.tokenizer(text_input, max_length=512, truncation=True, return_tensors="pt").to(device)
    text_output = model.Qformer.bert(tokens.input_ids, attention_mask=tokens.attention_mask, return_dict=True,)
    text_embeds = text_output.last_hidden_state # ([1, len, 768])
    text_embed_cls = text_embeds[:, 0, :]  # [1, 768] - 使用 CLS token
    # text_embed_mean = text_embeds.mean(dim=1)  # 所有 token 的平均值
    return text_embed_cls.cpu().numpy().tolist()

# === image encoding ===
@torch.no_grad()
def encode_image(image_path: str):
    if not os.path.exists(image_path):
        raise ValueError(f"image is required")
    raw_image = Image.open(image_path).convert("RGB")
    image_input = vis_processors["eval"](raw_image).unsqueeze(0).to(device)
    img_features = model.extract_features({"image": image_input}, mode="image")
    img_embeds = img_features.image_embeds  # [1, 32, 768]
    img_embeds_mean = img_embeds.mean(dim=1)    # [1, 768] - 平均所有 query tokens
    return img_embeds_mean.cpu().numpy().tolist()

def eval_retrieval(txt, img_path):
    text_emb = encode_text(txt)
    img_emb = encode_image(img_path)

    txt_vectors = np.array(text_emb, dtype=np.float32)
    img_vectors = np.array(img_emb, dtype=np.float32)

    # squeeze 掉多余的维度 [1,768] -> [768]
    txt_vectors = txt_vectors.squeeze()
    img_vectors = img_vectors.squeeze()

    # L2 normalization
    txt_vectors /= np.linalg.norm(txt_vectors)
    img_vectors /= np.linalg.norm(img_vectors)

    # 计算余弦相似度
    sim_score = np.dot(txt_vectors, img_vectors)
    return sim_score

# sample text和image匹配
# sample_img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg' 
sample_img_url = '/home/hsh/BLIP_local/demo.jpg' 
sample_text = 'a woman sitting on the beach with a dog'
if __name__ == '__main__':
    sim = eval_retrieval(sample_text, sample_img_url)
    print("Similarity:", sim)

# blip2_feature_extractor 加载完成
# Similarity: 0.26567638   <---正常情况应该很高

blip2_feature_extractor 加载完成
Similarity: 0.26567638
