# Prerequisite

In [None]:
import os
import random

import torch
import torch.nn.functional as F
from transformers import CLIPProcessor, CLIPModel

from torchvision.datasets import ImageNet
import torchvision.transforms as transforms

import numpy as np
import matplotlib.pyplot as plt

from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

from PIL import Image, ImageDraw, ImageFont
import umap

from tqdm import tqdm

# 设置新的工作目录 
os.chdir('/hpc2hdd/home/erjiaxiao/erjia/LLaVA')
print("change curr dir to", os.getcwd())

# CLIP Embedding Analysis

In [None]:
# 加载 CLIP 模型
device = "cuda" if torch.cuda.is_available() else "cpu"
model_name = "openai/clip-vit-large-patch14-336"
model = CLIPModel.from_pretrained(model_name).to(device)
processor = CLIPProcessor.from_pretrained(model_name)

# 加载 ImageNet 数据集
dataset = ImageNet(root='datasets/ImageNet/', split='val', transform=transforms.Compose([transforms.Resize((336, 336))]))

In [None]:
# 查看一张添加typo的图片与原图和目标图的embedding之间的关系
label_image_dict = {}
for image, label in tqdm(dataset):
    
    label = dataset.classes[label][0]
    
    if label in label_image_dict:
        label_image_dict[label].append(image)
    else:
        label_image_dict[label] = [image]

In [None]:
def add_text_background(label):
    
    num_prints = 1
    resolution = (336, 336)
    bg_color = 'white'
    font_color = 'black'
    font_size = 15
    font_path = 'fonts/arial_bold.ttf'
    
    image = Image.new('RGB', resolution, bg_color)
    draw = ImageDraw.Draw(image)

    font = ImageFont.truetype(font_path, font_size)

    max_x = image.width
    max_y = image.height

    text_width = int(draw.textlength(label, font=font))
    text_height = 30

    start_x = (max_x - text_width) // 2
    start_y = (max_y - num_prints * text_height) // 2   # 确保多行文本都在图片中心

    positions = [(start_x, start_y + i * text_height) for i in range(num_prints)]

    for pos in positions:
        draw.text(pos, label, fill=font_color, font=font)

    return image

total_sim = [[] for i in range(8)]

image_root = "images/species-r1/"
for i, img in enumerate(os.listdir(image_root)):

    name = img
    img_typo = "images/species-r1/" + img
    img = "images/species-r0/" + img
    label = img_typo.split('.')[-2].split('-')[-2]
    mislabel = img_typo.split('.')[-2].split('-')[-1]
        
    # 保存 embedding 用于降维分析
    embeddings_label = [] 
    embeddings_mislabel = [] 
    embedding_img_typo = None
    embedding_img = None
    embedding_bg_label = None
    embedding_bg_mislabel = None

    with torch.no_grad():
        try:
            img = Image.open(img)
            inputs = processor(text=label, images=img, return_tensors="pt", padding=True).to(device)
            embedding_img = model(**inputs).image_embeds
            
            img_typo = Image.open(img_typo)
            inputs = processor(text=label, images=img_typo, return_tensors="pt", padding=True).to(device)
            embedding_img_typo = model(**inputs).image_embeds
            
            bg_label = add_text_background(label)
            inputs = processor(text=label, images=bg_label, return_tensors="pt", padding=True).to(device)
            embedding_bg_label = model(**inputs).image_embeds
            
            bg_mislabel = add_text_background(mislabel)
            inputs = processor(text=label, images=bg_mislabel, return_tensors="pt", padding=True).to(device)
            embedding_bg_mislabel = model(**inputs).image_embeds

            for img in label_image_dict[label]:
                inputs = processor(text=label, images=img, return_tensors="pt", padding=True).to(device)
                embedding = model(**inputs).image_embeds
                embeddings_label.append(embedding)
                
            for img in label_image_dict[mislabel]:
                inputs = processor(text=mislabel, images=img, return_tensors="pt", padding=True).to(device)
                embedding = model(**inputs).image_embeds
                embeddings_mislabel.append(embedding)
                
            # 计算余弦相似度并存储平均值的函数
            def calculate_and_store_similarity(embedding1, embedding2, total_sim_list):
                similarities = F.cosine_similarity(embedding1.unsqueeze(0), embedding2)
                average_similarity = torch.mean(similarities).item()
                # print(average_similarity)
                total_sim_list.append(average_similarity)

            # 余弦相似度计算的组合列表
            embedding_combinations = [
                (embedding_img, torch.stack(embeddings_label), total_sim[0]),
                (embedding_img, torch.stack(embeddings_mislabel), total_sim[1]),
                (embedding_img_typo, torch.stack(embeddings_label), total_sim[2]),
                (embedding_img_typo, torch.stack(embeddings_mislabel), total_sim[3]),
                (embedding_img, embedding_bg_label.unsqueeze(0), total_sim[4]),
                (embedding_img, embedding_bg_mislabel.unsqueeze(0), total_sim[5]),
                (embedding_img_typo, embedding_bg_label.unsqueeze(0), total_sim[6]),
                (embedding_img_typo, embedding_bg_mislabel.unsqueeze(0), total_sim[7]),
            ]

            # 计算每一对嵌入的余弦相似度并打印和存储结果
            for embedding1, embedding2, total_sim_list in embedding_combinations:
                calculate_and_store_similarity(embedding1, embedding2, total_sim_list)

            # 设置 Matplotlib 的风格为 'seaborn-colorblind'
            plt.style.use('seaborn-colorblind')

            numpy_embeddings_label = np.vstack([e.cpu().detach().numpy() for e in embeddings_label])
            numpy_embeddings_mislabel = np.vstack([e.cpu().detach().numpy() for e in embeddings_mislabel])
            numpy_embedding_img_typo = embedding_img_typo.cpu().detach().numpy().reshape(1, -1)
            numpy_embedding_img = embedding_img.cpu().detach().numpy().reshape(1, -1)
            numpy_embedding_bg_label = embedding_bg_label.cpu().detach().numpy().reshape(1, -1)
            numpy_embedding_bg_mislabel = embedding_bg_mislabel.cpu().detach().numpy().reshape(1, -1)

            # 将所有numpy数组堆叠成一个大的数组
            numpy_embeddings = np.vstack((
                numpy_embeddings_label,
                numpy_embeddings_mislabel,
                numpy_embedding_img_typo,
                numpy_embedding_img,
                numpy_embedding_bg_label,
                numpy_embedding_bg_mislabel
            ))

            # 创建一个颜色数组，对应每个embedding的来源
            colors = np.concatenate([
                ['blue'] * len(embeddings_label),
                ['red'] * len(embeddings_mislabel),  
                ['green'],  
                ['purple'],  
                ['orange'],  
                ['gray']  
            ])

            # # 标准化数据
            # scaler = StandardScaler()
            # data_scaled = scaler.fit_transform(numpy_embeddings)

            # # 计算PCA，选择保留前两个主成分
            # pca = PCA(n_components=2)
            # principal_components = pca.fit_transform(data_scaled)

            # # 可视化PCA结果，为不同的embedding设置不同的颜色
            # for i, color in enumerate(colors):
            #     plt.scatter(principal_components[i, 0], principal_components[i, 1], s=8, c=color)

            # plt.xlabel('Principal Component 1')
            # plt.ylabel('Principal Component 2')
            # plt.title('PCA Visualization of Embeddings')
            # plt.grid(True)
            # plt.savefig(f"pca/{name}.png")
            # plt.clf()
            # plt.show()

            # 使用 TSNE 降维
            tsne = TSNE(n_components=2)
            tsne_embeddings = tsne.fit_transform(numpy_embeddings)

            # 可视化 TSNE 结果，为不同的embedding设置不同的颜色
            for i, color in enumerate(colors):
                plt.scatter(tsne_embeddings[i, 0], tsne_embeddings[i, 1], s=8, c=color)

            plt.xlabel('t-SNE Dimension 1')
            plt.ylabel('t-SNE Dimension 2')
            plt.title('t-SNE Visualization of Embeddings')
            plt.grid(True)
            plt.savefig(f"tsne/{name}.png")
            plt.clf()
            
        except Exception as e:
            print(e)
            pass
        
# for sim_list in total_sim:
#     print(sum(sim_list) / len(sim_list) if len(sim_list) > 0 else 0)

In [None]:
def add_text_img(image, text, num_prints=1, min_distance=20):
    draw = ImageDraw.Draw(image)
    font = ImageFont.load_default()
    text_color = "white"

    text_width, text_height = draw.textsize(text, font=font)

    max_x = image.width - text_width
    max_y = image.height - text_height

    if max_x < 0:
        max_x = 0
    if max_y < 0:
        max_y = 0

    positions = []
    for _ in range(num_prints):
        while True:
            text_x = random.randint(0, max_x)
            text_y = random.randint(0, max_y)
            valid_position = True

            for pos in positions:
                if abs(text_x - pos[0]) < min_distance and abs(text_y - pos[1]) < min_distance:
                    valid_position = False
                    break

            if valid_position:
                positions.append((text_x, text_y))
                break

        draw.text((text_x, text_y), text, fill=text_color, font=font)

    return image

def add_text_bg(text):
    color_names = [
        "white", "yellow", "cyan", "lightgreen", "lightblue",
        "orange", "pink", "gold", "peach", "lime"
    ]
    colors = {
        "white": (255, 255, 255),
        "yellow": (255, 255, 0),
        "cyan": (0, 255, 255),
        "lightgreen": (144, 238, 144),
        "lightblue": (173, 216, 230),
        "orange": (255, 165, 0),
        "pink": (255, 192, 203),
        "gold": (255, 215, 0),
        "peach": (255, 218, 185),
        "lime": (0, 255, 0)
    }

    selected_color_name = random.choice(color_names)
    selected_color = colors[selected_color_name]

    image = Image.new("RGB", (224, 224), selected_color)

    draw = ImageDraw.Draw(image)
    font = ImageFont.load_default()
    text_color = "black"

    text_width, text_height = draw.textsize(text, font=font)

    max_x = 224 - text_width
    max_y = 224 - text_height

    if max_x < 0:
        max_x = 0
    if max_y < 0:
        max_y = 0

    text_x = random.randint(0, max_x)
    text_y = random.randint(0, max_y)

    draw.text((text_x, text_y), text, fill=text_color, font=font)
    return image

# 保存 embedding 用于降维分析
embeddings = []

# 定义阈值，达到这个次数后才继续下一张图片输入，使用字典来跟踪标签出现的次数
threshold = 5
duplicate = {}
num = 50
count = 0

# 遍历前 num 张图像和标签
for k, (image, label) in enumerate(dataset):
    if count >= num:
        break

    # 获取图像的类别名
    label = dataset.classes[label][0]

    # 检查标签是否在字典中
    if label in duplicate:
        # 如果标签出现次数未达到阈值，增加计数
        if duplicate[label] < threshold:
            duplicate[label] += 1
        else:
            # 如果标签已经达到阈值，继续下一张图片
            continue
    else:
        # 如果标签不在字典中，添加并初始化计数
        duplicate[label] = 1

    classes = [t[0] for t in dataset.classes]
    mislabel1 = random.choice(classes)
    mislabel2 = random.choice(classes)

    # 处理图像
    img_mislabel1 = add_text_img(image.copy(), mislabel1)
    bg_mislabel1 = add_text_bg(mislabel1)
    bg_mislabel2 = add_text_bg(mislabel2)
    bg_label = add_text_bg(label)

    # 提取 CLIP embedding
    inputs1 = processor(text=[label], images=image,
                        return_tensors="pt", padding=True).to(device)
    inputs2 = processor(text=[label], images=img_mislabel1,
                        return_tensors="pt", padding=True).to(device)
    inputs3 = processor(text=[label], images=bg_mislabel1,
                        return_tensors="pt", padding=True).to(device)
    inputs4 = processor(text=[label], images=bg_mislabel2,
                        return_tensors="pt", padding=True).to(device)
    inputs5 = processor(text=[label], images=bg_label,
                        return_tensors="pt", padding=True).to(device)

    with torch.no_grad():
        img1_embedding = model(**inputs1).image_embeds
        img2_embedding = model(**inputs2).image_embeds
        img3_embedding = model(**inputs3).image_embeds
        img4_embedding = model(**inputs4).image_embeds
        img5_embedding = model(**inputs5).image_embeds
        diff21_embedding = img2_embedding - img1_embedding
        diff23_embedding = img2_embedding - img3_embedding
        diff24_embedding = img2_embedding - img4_embedding
        diff25_embedding = img2_embedding - img5_embedding
        text_embedding = model(**inputs1).text_embeds[0].unsqueeze(0)

    temp_embeddings = [img1_embedding, img2_embedding, img3_embedding, img4_embedding, img5_embedding,
                       diff21_embedding, diff23_embedding, diff24_embedding, diff25_embedding, text_embedding]

    # 用于降维分析
    for emb in temp_embeddings:
        embeddings.append(emb)

    # 初始化一个矩阵用于存储余弦相似度
    # num_embeddings = len(temp_embeddings)
    # cosine_similarities = torch.zeros((num_embeddings, num_embeddings))

    # 计算余弦相似度
    # for i in range(num_embeddings):
    #     for j in range(num_embeddings):
    #         similarity = F.cosine_similarity(temp_embeddings[i], temp_embeddings[j])
    #         cosine_similarities[i, j] = round(similarity.item(), 3)

    # 打印余弦相似度矩阵
    # print(cosine_similarities)
    # print()

    # 保存处理后的图像
    img_mislabel1.save(str(k) + label + '_img_mislabel1.jpg')
    bg_mislabel1.save(str(k) + label + '_bg_mislabel1.jpg')
    bg_mislabel2.save(str(k) + label + '_bg_mislabel2.jpg')
    bg_label.save(str(k) + label + '_bg_label.jpg')

    count += 1
    if count % 10 == 0:
        print(str(k), ":", str(count))

In [None]:
# PCA
numpy_embeddings = np.empty((0, embeddings[0].shape[1]))
for emb in embeddings:
    numpy_embeddings = np.vstack(
        (numpy_embeddings, emb.cpu().detach().numpy()))

# 每组图片的数量
group = len(temp_embeddings)

# 使用布尔索引去除指定下标
# arr = np.arange(len(numpy_embeddings))
# temp = [arr % group == 9]
# indices_to_keep = np.logical_not(np.logical_or.reduce(temp))
# numpy_embeddings = numpy_embeddings[indices_to_keep]
# group -= len(temp)

# 标准化数据
scaler = StandardScaler()
data_scaled = scaler.fit_transform(numpy_embeddings)

# 计算PCA，选择保留前两个主成分
pca = PCA(n_components=2)
principal_components = pca.fit_transform(data_scaled)

# 只显示前k组数据
# principal_components = principal_components[:group*2]

# 根据下标为每个数据点分配不同的颜色
colors = ['magenta', 'blue', 'green', 'purple', 'orange', 'pink', 'brown',
          'gray', 'cyan', 'red', 'teal', 'lime', 'indigo', 'maroon', 'olive']
colors_for_points = [colors[i % group]
                     for i in range(len(principal_components))]

# 可视化PCA结果
plt.scatter(principal_components[:, 0],
            principal_components[:, 1], c=colors_for_points, s=8)

# 使用循环标记每个点的序号
for i, (x, y) in enumerate(zip(principal_components[:, 0], principal_components[:, 1])):
    if i < group:
        plt.annotate(str(i+1), (x, y), textcoords="offset points",
                     xytext=(0, 0), ha='center')

plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.title('PCA Visualization with Labels (Starting from 1)')
# plt.savefig("pca_visualization.png")
plt.show()

In [None]:
# UMAP
numpy_embeddings = np.empty((0, embeddings[0].shape[1]))
for emb in embeddings:
    numpy_embeddings = np.vstack(
        (numpy_embeddings, emb.cpu().detach().numpy()))

# 每组图片的数量
group = len(temp_embeddings)

# 使用布尔索引去除指定下标
# arr = np.arange(len(numpy_embeddings))
# temp = [arr % group == 7]
# indices_to_keep = np.logical_not(np.logical_or.reduce(temp))
# numpy_embeddings = numpy_embeddings[indices_to_keep]
# group -= len(temp)

# 使用 UMAP 进行降维，选择保留前两个主成分
umap_model = umap.UMAP(n_components=2)
umap_embeddings = umap_model.fit_transform(numpy_embeddings)

# 根据下标为每个数据点分配不同的颜色
colors = ['magenta', 'blue', 'green', 'purple', 'orange', 'pink', 'brown',
          'gray', 'cyan', 'red', 'teal', 'lime', 'indigo', 'maroon', 'olive']
colors_for_points = [colors[i % group] for i in range(len(umap_embeddings))]

# 可视化 UMAP 结果
plt.scatter(umap_embeddings[:, 0],
            umap_embeddings[:, 1], c=colors_for_points, s=8)

# 使用循环标记每个点的序号
for i, (x, y) in enumerate(zip(umap_embeddings[:, 0], umap_embeddings[:, 1])):
    if i < group:
        plt.annotate(str(i+1), (x, y), textcoords="offset points",
                     xytext=(0, 10), ha='center')

plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
plt.title('UMAP Visualization with Labels (Starting from 1)')
# plt.savefig("umap_visualization.png")
plt.show()

In [None]:
# T-sne
numpy_embeddings = np.empty((0, embeddings[0].shape[1]))
for emb in embeddings:
    numpy_embeddings = np.vstack(
        (numpy_embeddings, emb.cpu().detach().numpy()))

# 每组图片的数量
group = len(temp_embeddings)

# 使用布尔索引去除指定下标
# arr = np.arange(len(numpy_embeddings))
# temp = [arr % group == 7]
# indices_to_keep = np.logical_not(np.logical_or.reduce(temp))
# numpy_embeddings = numpy_embeddings[indices_to_keep]
# group -= len(temp)

# 创建一个 t-SNE 对象，perplexity 参数是 t-SNE 的一个重要参数，调整它以得到更好的可视化效果
tsne = TSNE(n_components=2, perplexity=20, learning_rate=200)

# 使用 t-SNE 对数据进行降维，得到的 transformed_data 将是一个 (n_samples, 2) 的数组，其中 n_samples 是样本数量
transformed_data = tsne.fit_transform(numpy_embeddings)

# 根据下标为每个数据点分配不同的颜色
colors = ['magenta', 'blue', 'green', 'purple', 'orange', 'pink', 'brown',
          'gray', 'cyan', 'red', 'teal', 'lime', 'indigo', 'maroon', 'olive']
colors_for_points = [colors[i % group] for i in range(len(transformed_data))]

# 使用 matplotlib 进行绘图
plt.figure(figsize=(10, 7))
plt.scatter(transformed_data[:, 0],
            transformed_data[:, 1], c=colors_for_points, s=8)
for i in range(transformed_data.shape[0]):
    if i < group:
        plt.text(transformed_data[i, 0],
                 transformed_data[i, 1], str(i + 1), fontsize=12)

plt.xlabel("t-SNE Dimension 1")
plt.ylabel("t-SNE Dimension 2")
plt.title("t-SNE Visualization of (512,) Dimensional Vectors")
# plt.savefig("tsne_visualization.png")
plt.show()