In [1]:
import numpy as np
import torch
from pkg_resources import packaging
import sys
sys.path.append("..")
from pathlib import Path
import clip.clip as clip
from PIL import Image
import os 
device = torch.device("cuda:7" if torch.cuda.is_available() else "cpu")

# model, preprocess = clip.load("ViT-B/32")
model, preprocess = clip.load(name="/data/sswang/NFT_search/models/ViT-L-14-336px.pt", device=device)
# 将模型加载到GPU中并切换到评估模式
# model.cuda(device).eval()
model.eval()
input_resolution = model.visual.input_resolution
context_length = model.context_length
vocab_size = model.vocab_size

print("Model parameters:", f"{np.sum([int(np.prod(p.shape)) for p in model.parameters()]):,}")
print("Input resolution:", input_resolution)
print("Context length:", context_length)
print("Vocab size:", vocab_size)

preprocess

  from .autonotebook import tqdm as notebook_tqdm


Model parameters: 427,944,193
Input resolution: 336
Context length: 77
Vocab size: 49408


Compose(
    Resize(size=336, interpolation=bicubic, max_size=None, antialias=None)
    CenterCrop(size=(336, 336))
    <function _convert_image_to_rgb at 0x7efb11775550>
    ToTensor()
    Normalize(mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711))
)

np.stack(images) 函数将一个由多个图像组成的列表 images 沿着一个新轴（默认为 0）进行连接，生成一个新的数组。这个新的数组的维度比原来的数组多了一个维度，用于存储连接后的图像。如果 images 中的每个图像的尺寸都相同，那么连接后的数组的第一个维度将是 len(images)，第二个维度将是图像的高度，第三个维度将是图像的宽度，第四个维度将是图像的通道数。

这行代码是从一个图像列表中创建了一个 PyTorch 张量 image_input。np.stack(images) 函数沿着一个新轴（默认为 0）将图像列表进行连接。然后，torch.tensor() 将连接后的图像列表转换为 PyTorch 张量。.cuda() 方法将张量移动到 GPU 上进行加速运算。


`img_features` 是一个存放了100个图像的特征向量的列表。归一化操作 `img_features /= img_features.norm(dim=-1, keepdim=True)` 用于将特征向量进行归一化处理。

下面是对归一化操作的细节解释：

1. `img_features.norm(dim=-1, keepdim=True)`：这部分代码计算了 `img_features` 列表中每个特征向量的范数（即向量的长度）。`dim=-1` 表示在最后一个维度上进行计算，即对每个特征向量的元素进行平方求和，然后取平方根。

2. `keepdim=True`：这部分代码保持结果的维度与输入的 `img_features` 保持一致。这样，计算的结果将是一个具有相同形状的张量，每个特征向量的范数都被保留在一个单独的维度中。

3. `img_features /= img_features.norm(dim=-1, keepdim=True)`：这部分代码进行了归一化操作。通过除以每个特征向量的范数，将每个特征向量的长度缩放到1。`/=` 是就地除法运算符，它将结果保存回 `img_features` 列表中。

归一化操作的目的是将特征向量的尺度标准化，使其具有相似的大小。这有助于确保特征向量在计算相似度或进行其他操作时具有一致的权重和规模。通过将特征向量缩放到单位长度，可以使它们在相似度计算中具有更好的可比性和一致性。

In [80]:
import json

def check_dir(dir_path):
    """
    检查文件夹路径是否存在，不存在则创建

    Args:
        dir_path (str): 待检查的文件夹路径
    """
    if not os.path.exists(dir_path):
        try:
            os.makedirs(dir_path)
        except Exception as e:
            raise e

def load_json(json_path):
    """
    以只读的方式打开json文件

    Args:
        config_path: json文件路径

    Returns:
        A dictionary

    """
    with open(json_path, 'r', encoding='UTF-8') as f:
        return json.load(f)
    
def save_json(save_path, data):
    """
    Saves the data to a file with the given filename in the given path

    Args:
        :param save_path: The path to the folder where you want to save the file
        :param filename: The name of the file to save
        :param data: The data to be saved

    """
    with open(save_path, 'w', encoding='UTF-8') as file:
        json.dump(data, file, ensure_ascii=False, indent=4)

def is_str_Length_valid(str_list) -> bool:
    """
    判断字符串长度是否超过77

    Args:
        str_list (list): 字符串列表

    Returns:
        bool: 是否超过77
    """
    try:
        for str in str_list:
            clip.tokenize(str)
        return True
    except:
        return False


def tensorlize_imgs(model, img_path_list) -> torch.Tensor:
    """
    使用模型提取图片特征，返回图片特征向量列表

    Args:
        img_path_list (list): 图片路径列表

    Returns:
        torch.Tensor: 图片特征向量列表
    """

    images = []
    for img_path in img_path_list:

        image = Image.open(img_path).convert("RGB")
            # 首先将图片预处理成模型需要的格式
        images.append(preprocess(image))
        # 把图片加载进cuda中
    image_input = torch.tensor(np.stack(images)).cuda(device=device)
    with torch.no_grad():
        image_features = model.encode_image(image_input).float()
        # 将image_features从GPU移动到CPU，并返回
        return image_features.cpu()
            

def tensorlize_texts(model, text_tokens_list) -> torch.Tensor:
    """
    使用模型提取单句文本特征，返回文本特征向量列表

    Args:
        text_tokens_list (list): 文本列表

    Returns:
        torch.Tensor: 文本特征向量列表
    """
    text_tokens = clip.tokenize(text_tokens_list).cuda(device=device)
    with torch.no_grad():
        text_features = model.encode_text(text_tokens).float()
        # 将text_features从GPU移动到CPU，并返回
        # print(text_features.shape)
        return text_features.cpu().numpy().tolist()
    
def load_img_tensor(device, imgTensor_path):
    """
    加载图片的tensor 向量到指定cuda中

    Args:
        device (str): cuda
        img_path (str): 图片tensor路径

    Returns:
        torch.Tensor: 图片特征向量
    """
    # 加载json文件
    NFT_tensor_data = load_json(imgTensor_path)
    image_features = NFT_tensor_data['image_features']
    image_tensors = torch.tensor(image_features).to(device)
    return image_tensors

def load_des_tensor(device, desTensor_path):
    """
    加载描述的tensor 向量到指定cuda中

    Args:
        device (str): cuda
        img_path (str): 描述的tensor路径

    Returns:
        torch.Tensor: 图片特征向量
    """
    # 加载json文件
    NFT_tensor_data = load_json(desTensor_path)
    des_features = NFT_tensor_data['des_tensors']
    
    # # 确保每个元素都被转换为张量
    # des_tensors = [torch.tensor(feature).to(device) for feature in des_features]
    # for des_tensor in des_tensors:
    #     print(des_tensor.shape)
    
    des_tensors = torch.tensor(des_features).to(device)
    for des_tensor in des_tensors:
        print(des_tensor.shape)

    # 将所有张量堆叠在一起
    des_tensors = torch.stack(tuple(des_tensors), dim=1)

    print(des_tensors.shape)
    return des_tensors

def slide_window_tokenizer(text, window_size, step_size) -> list:
    """
    为了处理长度超过77的句子，这里设计滑动窗口分词

    Args:
        text (str): 将要被拆分的句子

    Returns:
        list: 拆分后的句子列表
    """
    words = text.split()
    sentences = []
    slide_window_list = [i for i in range(0, len(words) - 1, step_size) if i + step_size < len(words) - 1]
    for i in slide_window_list:
        sentence = ' '.join(words[i:i+window_size])
        sentences.append(sentence)
    return sentences

def calculate_cosine_similarity_topk(img_features, des_features, k = 10) -> tuple:
    """
    计算图片特征和描述特征的余弦相似度，并返回topk的结果

    Args:
        img_features (torch.tensor): 图像特征向量
        des_features (torch.tensor): 描述特征向量
        k (int, optional): 前k位结果. Defaults to 10.

    Returns:
        tuple: (topk的相似度，topk的索引)
    """
    # 对每一个特征向量进行归一化，使其范数为1
    img_features /= img_features.norm(dim=-1, keepdim=True)

    # 归一化描述特征
    des_features /= des_features.norm(dim=-1, keepdim=True)
    # similarity = des_features.cpu().numpy() @ img_features.cpu().numpy().T
    # 每个图像向量都与100个文本向量计算余弦相似度，然后计算100次
    text_probs = (100.0 * img_features @ des_features.T).softmax(dim=-1)
    top_probs, top_labels = text_probs.cpu().topk(k, dim=-1)
    return top_probs, top_labels


def slide_window_tokenizer(text, window_size, step_size) -> list:
    """
    为了处理长度超过77的句子，这里设计滑动窗口分词

    Args:
        text (str): 将要被拆分的句子

    Returns:
        list: 拆分后的句子列表
    """
    words = text.split()
    sentences = []
    slide_window_list = [i for i in range(0, len(words) - 1, step_size) if i + step_size < len(words) - 1]
    for i in slide_window_list:
        sentence = ' '.join(words[i:i+window_size])
        sentences.append(sentence)
    return sentences

def tensorlize_texts_slideWindow(model, text, window_size, step_size):
    """
    将长文本分块，然后对每块文本进行向量化，最后将这些向量平均。

    Args:
        model (CLIP): 使用的 CLIP 模型。
        text (str): 输入的文本。

    Returns:
        torch.Tensor: 文本特征向量。
    """

    chunks = slide_window_tokenizer(text, window_size, step_size)
    tensor_list = []
    for chunk in chunks:
        tokens = clip.tokenize([chunk]).cuda(device=device)
        with torch.no_grad():
            tensor_list.append(model.encode_text(tokens).float().cpu())
    # 使用所有块的平均值作为文本的表示
    avg_tensor = torch.mean(torch.stack(tensor_list), dim=0)
    return avg_tensor.numpy().tolist()


def tensorlize_valid_subsentence(model, text) -> torch.Tensor:
    """
    截取有效的子句，然后求特征向量值

    Args:
        model (CLIP): 使用的 CLIP 模型。
        text (str): 输入的文本。

    Returns:
        torch.Tensor: 文本特征向量。
    """
    text_tensor = None
    # 标记为False时，表示该句子无法被模型处理，需要进行拆分
    flag = False
    words = text.split()
    text_length = len(words)
    while not flag:
        try:
            text_tensor = tensorlize_texts(model, text)
            flag = True
        except:
            text_length -= 1
            text = ' '.join(words[:text_length])
    return text_tensor

def legalize_text(text) -> str:
    """
    截取有效长度的句子

    Args:
        text (str): 输入的文本。

    Returns:
        str: 截断之后的文本。
    """
    # 标记为False时，表示该句子无法被模型处理，需要进行拆分
    flag = False
    words = text.split()
    text_length = len(words)
    while not flag:
        try:
            clip.tokenize(text)
            flag = True
        except:
            text_length -= 1
            text = ' '.join(words[:text_length])
    return text

def handle_long_texts(model, text_list) -> list:
    """
    处理长文本，将长文本分块，然后对每块文本进行向量化，最后将这些向量平均。

    Args:
        model (CLIP): 使用的 CLIP 模型。
        text_list (list): 输入的文本列表。
        window_size (int): 窗口宽度
        step_size (int): 窗口移动的步长

    Returns:
        list: 文本特征向量列表。
    """
    # 先将长句子截断为有效短句子
    legal_text_list = list(map(legalize_text, text_list))
    text_features = tensorlize_texts(model, legal_text_list)
    return text_features


In [82]:
dataset_base_path = Path("/data/sswang/data/mini100")
target_dataset_path = Path("/data/sswang/data/mini100_tensor_V2")

In [83]:
import re
import copy

NFT_list_dict = load_json(dataset_base_path.joinpath("NFT_list.json"))

collection_list_copy = copy.deepcopy(NFT_list_dict["collection_list_copy"])

for key, value in collection_list_copy.items():
    NFT_collection_path = dataset_base_path.joinpath(value)
    print("开始处理：", NFT_collection_path.name, "...")
    check_dir(target_dataset_path.joinpath(NFT_collection_path.name))

    NFT_tensor_data = {}
    img_path_list = list(NFT_collection_path.joinpath("img").iterdir())
    # 提取图片名称
    img_name_list = [img_path.stem for img_path in img_path_list]

    # 提取图片特征向量
    image_features_CPU = tensorlize_imgs(model, img_path_list)

    # 提取文本特征向量
    des_tensor = []
    des_query_dict = load_json(NFT_collection_path.joinpath("description.json"))

    for img_name in img_name_list:
        # 去掉 description 中的序号（也就是1. 2. 3.）
        des_list = [re.sub(r'^\d+\. ', '', des) for des in des_query_dict[img_name]]
        des_features = None
        # 判断描述的长度有没有超过77
        if is_str_Length_valid(des_list) == False:
            # 如果超过77，就使用特殊处理方式
            des_features = handle_long_texts(model, des_list)
        else:
            des_features = tensorlize_texts(model, des_list)
        des_tensor.append(des_features)

    NFT_tensor_data["img_name_list"] = img_name_list
    NFT_tensor_data["image_features"] = image_features_CPU.numpy().tolist()
    NFT_tensor_data["des_tensors"] = des_tensor
    save_json(target_dataset_path.joinpath(NFT_collection_path.name, "NFT_tensor_data.json"), NFT_tensor_data)
    print("处理完成：", NFT_collection_path.name)
    
    # 将已经处理完成的项目从列表中删除
    del NFT_list_dict["collection_list_copy"][key]
    # 将处理完成的项目列表保存到json文件中
    save_json(dataset_base_path.joinpath("NFT_list.json"), NFT_list_dict)
    

开始处理： CryptoPunks ...
处理完成： CryptoPunks
开始处理： BoredApeYachtClub ...
处理完成： BoredApeYachtClub
开始处理： MutantApeYachtClub ...
处理完成： MutantApeYachtClub
开始处理： Azuki ...
处理完成： Azuki
开始处理： CLONEX ...
处理完成： CLONEX
开始处理： Moonbirds ...
处理完成： Moonbirds
开始处理： Doodles ...
处理完成： Doodles
开始处理： BoredApeKennelClub ...
处理完成： BoredApeKennelClub
开始处理： Meebits ...
处理完成： Meebits
开始处理： PudgyPenguins ...
处理完成： PudgyPenguins
开始处理： Cool Cats ...
处理完成： Cool Cats
开始处理： Beanz ...
处理完成： Beanz
开始处理： MechMinds ...
处理完成： MechMinds
开始处理： World of Women ...
处理完成： World of Women
开始处理： CrypToadz ...
处理完成： CrypToadz
开始处理： 0N1 Force ...
处理完成： 0N1 Force
开始处理： mfers ...
处理完成： mfers
开始处理： Karafuru ...
处理完成： Karafuru
开始处理： HAPE PRIME ...
处理完成： HAPE PRIME
开始处理： MekaVerse ...
处理完成： MekaVerse
开始处理： projectPXN ...
处理完成： projectPXN
开始处理： FLUF ...
处理完成： FLUF
开始处理： Hashmasks ...
处理完成： Hashmasks
开始处理： Moonbirds Oddities ...
处理完成： Moonbirds Oddities
开始处理： Creature World ...
处理完成： Creature World
开始处理： 3Landers ...
处理完成： 3Landers
开始处理： Phan



处理完成： VeeFriends Series 2
开始处理： Lazy Lions ...
处理完成： Lazy Lions
开始处理： World of Women Galaxy ...
处理完成： World of Women Galaxy
开始处理： ALIENFRENS ...
处理完成： ALIENFRENS
开始处理： Prime Ape Planet ...
处理完成： Prime Ape Planet
开始处理： The Doge Pound ...
处理完成： The Doge Pound
开始处理： Sappy Seals ...
处理完成： Sappy Seals
开始处理： CyberKongz ...
处理完成： CyberKongz
开始处理： DigiDaigaku ...
处理完成： DigiDaigaku
开始处理： CoolmansUniverse ...
处理完成： CoolmansUniverse
开始处理： VOX Series 1 ...
处理完成： VOX Series 1
开始处理： Capsule ...
处理完成： Capsule
开始处理： Murakami.Flowers ...
处理完成： Murakami.Flowers
开始处理： SupDucks ...
处理完成： SupDucks
开始处理： Valhalla ...
处理完成： Valhalla
开始处理： DEGEN TOONZ ...
处理完成： DEGEN TOONZ
开始处理： Lives of Asuna ...
处理完成： Lives of Asuna
开始处理： Nakamigos ...
处理完成： Nakamigos
开始处理： Sneaky Vampire Syndicate ...
处理完成： Sneaky Vampire Syndicate
开始处理： Killer GF ...
处理完成： Killer GF
开始处理： Adam Bomb Squad ...
处理完成： Adam Bomb Squad
开始处理： Impostors Genesis ...
处理完成： Impostors Genesis
开始处理： CryptoSkulls ...
处理完成： CryptoSkulls
开始处理： MURI ...
处

In [None]:
# 计算图像特征和描述特征的余弦相似度
image_features = load_img_tensor(device, target_dataset_path.joinpath("Prime Ape Planet", "NFT_tensor_data.json"))
des_features = load_des_tensor(device, target_dataset_path.joinpath("Prime Ape Planet", "NFT_tensor_data.json"))
des_features1, des_features2, des_feature3 = des_features
top_probs, top_labels = calculate_cosine_similarity_topk(image_features, des_features1, 10)
print(top_probs.shape)
print(top_probs)
print(top_labels.shape)
print(top_labels)


# collection 内部测试

In [79]:
# 加载特征向量
collection_path_list = target_dataset_path.iterdir()
for collection in collection_path_list:
    # 找到路径下的tensor文件
    tensor_path = collection.joinpath("NFT_tensor_data.json")
    # image_features = load_img_tensor(device, tensor_path)
    des_features = load_des_tensor(device, tensor_path)
    # print(image_features.shape)
    # print(des_features.shape)
    # for des_feature in des_features:
    #     top_probs, top_labels = calculate_cosine_similarity_topk(image_features, des_feature, 10)
    #     print(top_probs.shape)
    #     print(top_probs)
    #     print(top_labels.shape)
    #     print(top_labels)

    break




torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3, 768])
torch.Size([3