进行视觉空间和语义空间都预测的实验，并行和串行的实验方案流程，可以参考《DUAL COLLABORATIVE VISUAL-SEMANTIC MAPPING FOR MULTI-LABEL
ZERO-SHOT IMAGE RECOGNITION》

In [1]:
# 我们首先得到visual space和semantic space预测的标签
# 得到两个空间预测结果相同的标签，不相同的标签我们丢弃(不预测)，可以称为弃检率
# 我们再计算两个空间预测结果相同的标签的准确率

In [2]:
import sys
sys.path.append('/mnt/workspace')

In [3]:
from py_file.M_attri import Att
from py_file.Get_Data import DATA
from py_file.data_set import MyDataSet
from torch.utils.data import DataLoader,Dataset,random_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import numpy as np
import matplotlib.pyplot as plt
import os
import math

In [4]:
# 定义训练的设备
if torch.cuda.is_available():
    device = torch.device('cuda:0') # 只有一张显卡的话，'cuda'和'cuda:0'是一样的
else:
    device = torch.device('cpu')

print(f'使用的设备为：{device}')

使用的设备为：cuda:0


In [5]:
trans = transforms.Resize(224)  # ResNet模型适合的图片大小为224x244
# 输入的张量需要带着批次维度和通道维度

# 准备模型以及数据

In [6]:
# 读取模型以及语义属性
import pickle
model = torch.load('../model_saved_pseudo/train_all.pth')
model = model.to(device)
model.eval()

model_fea = torch.nn.Sequential(*list(model.children())[:-2])  # 由于加了sigmoid层，所以这里是[:-2]
model_fea = model_fea.to(device)
model_fea.eval()

'''
attri = Att()
attri.compute_mul_defect_att()

single_defect_att = attri.single_defect_att
two_defect_att = attri.two_defect_att
three_defect_att = attri.three_defect_att
four_defect_att = attri.four_defect_att
mul_defect_att = attri.mul_defect_att
total_defect_att = attri.total_defect_att
'''

with open('../updated_semantic_all/updated_single_dict.pkl', 'rb') as file:
    single_defect_att = pickle.load(file)
with open('../updated_semantic_all/updated_mul_dict.pkl', 'rb') as file:
    mul_defect_att = pickle.load(file)
with open('../updated_semantic_all/updated_total_dict.pkl', 'rb') as file:
    total_defect_att = pickle.load(file)

from collections import Counter
# 加载二、三、四缺陷的字典
two_defect_att = {}
three_defect_att = {}
four_defect_att = {}
for label in mul_defect_att.keys():
    count = Counter(label)
    if count['+'] == 1:
        two_defect_att[label] = mul_defect_att[label]
    if count['+'] == 2:
        three_defect_att[label] = mul_defect_att[label]
    if count['+'] == 3:
        four_defect_att[label] = mul_defect_att[label]

In [7]:
# 加载视觉中心
with open('single_vc.pkl', 'rb') as file:
    single_vc = pickle.load(file)
with open('two_vc.pkl', 'rb') as file:
    two_vc = pickle.load(file)
with open('three_vc.pkl', 'rb') as file:
    three_vc = pickle.load(file)
with open('four_vc.pkl', 'rb') as file:
    four_vc = pickle.load(file)
with open('total_vc.pkl', 'rb') as file:
    total_vc = pickle.load(file)

In [8]:
mul_defect_att = {**two_defect_att, **three_defect_att, **four_defect_att}
mul_vc = {**two_vc, **three_vc, **four_vc}

In [9]:
test_data_path = '/mnt/workspace/DATA/test_WM.npz'
test_data = np.load(test_data_path)

att_dimen = len(single_defect_att['Center'])
print('att_dimen:', att_dimen)

att_dimen: 20


In [10]:
test_label = test_data['label_name']
test_wm = test_data['denoise_wm']
test_wm_tensor = trans(torch.reshape(torch.tensor(test_wm, dtype=torch.float32),(len(test_wm),1,52,52)))
print(test_wm_tensor.shape, test_label.shape)

torch.Size([7405, 1, 224, 224]) (7405,)


In [11]:
test_label_oh = test_data['label_one_hot']
# 通过one_hot标签给数据分为单缺陷、双缺陷、三缺陷以及四缺陷

test_single_wm = []  # 先定义列表，然后转换为tensor
test_single_label = []

test_two_wm = []
test_two_label = []

test_three_wm = []
test_three_label = []

test_four_wm = []
test_four_label = []

for i in range(len(test_label_oh)):
    if test_label_oh[i].sum() <= 1:
        test_single_wm.append(np.array(test_wm_tensor[i]))
        test_single_label.append(test_label[i])

    elif test_label_oh[i].sum() == 2:
        test_two_wm.append(np.array(test_wm_tensor[i]))
        test_two_label.append(test_label[i])

    elif test_label_oh[i].sum() == 3:
        test_three_wm.append(np.array(test_wm_tensor[i]))
        test_three_label.append(test_label[i])

    elif test_label_oh[i].sum() == 4:
        test_four_wm.append(np.array(test_wm_tensor[i]))
        test_four_label.append(test_label[i])


del test_data

In [12]:
test_size = len(test_wm_tensor)

# 定义需要的函数

In [13]:
def euclidean_distance(v1, v2):  # 参数v1,v2是np.array
    # 计算两个向量之间的欧氏距离
    distance = np.sqrt(np.sum((v1 - v2) ** 2))
    return distance

In [14]:
def cosine_similarity(v1, v2):
    # 参数v1,v2是np.array,不能是tensor，可以用np.array()将tensor转换为array
    # 计算两个向量的点积
    dot_product = np.dot(v1, v2)
    # 计算两个向量的模
    norm_v1 = np.linalg.norm(v1)
    norm_v2 = np.linalg.norm(v2)
    # 计算余弦相似度
    similarity = dot_product / (norm_v1 * norm_v2)
    return similarity

In [15]:
def predict_label_visual(fea, vc):  # fea是numpy.ndarray类型
    min_dis = math.inf
    for c in vc.keys():  # 如果不带.keys()，for循环的字典也是默认取键
        dis = euclidean_distance(fea, vc[c])
        if dis < min_dis:
            min_dis = dis
            label = c
    return label

In [16]:
def predict_label_semantic(att, defect_dict):  # 使用余弦相似度
    max_sim = -2
    for c in defect_dict.keys():  # 如果不带.keys()，for循环的字典也是默认取键
        sim = cosine_similarity(att, np.array(defect_dict[c]))
        if sim > max_sim:
            max_sim = sim
            label = c
    return label

In [17]:
for i in range(len(test_three_label)):
    if test_three_label[i] in test_single_label:
        print(i, test_three_label[i])
    elif test_three_label[i] in test_two_label:
        print(i, test_three_label[i])
    elif test_three_label[i] in test_four_label:
        print(i, test_three_label[i])


In [18]:
def get_outputs(model, dataloader, device):
    x = []
    y = []
    with torch.no_grad():
        for imgs,labels in dataloader:  
            imgs = imgs.to(device)
            outputs = model(imgs)

            for out in outputs:
                x.append(out.flatten().cpu())  # 输出展开,out的维度大小变为512，没有flatten的话维度大小就是(512,1,1)
            for label in labels:
                y.append(label)

    x = np.array(x)
    y = np.array(y)

    return x,y

In [19]:
def two_space_predict(fea_arr, vc, att_arr, defect_dict, real_label):
    """
    :param fea_arr: numpy array, shape (n_samples, n_features)
    :param vc: dictionary
    :param att_arr: numpy array, shape (n_samples, n_attributes)
    :param defect_dict: dictionary
    :param real_label: numpy array, shape (n_samples,)
    """
    label_visual = []
    for fea in fea_arr:
        label_visual.append(predict_label_visual(fea, vc))
    label_visual = np.array(label_visual)
    
    label_semantic = []
    for att in att_arr:
        label_semantic.append(predict_label_semantic(att, defect_dict))
    label_semantic = np.array(label_semantic)

    label_identical = label_visual[label_visual==label_semantic]
    drop_rate = 1 - len(label_identical)/len(label_visual)

    label_real = real_label[label_visual==label_semantic]

    acc = (label_real==label_identical).sum()/len(label_real)

    # 我们下面获取丢弃的样本在语义空间和视觉空间中的准确率
    drop_real_label = real_label[label_visual!=label_semantic]
    drop_label_semantic = label_semantic[label_visual!=label_semantic]
    drop_label_visual = label_visual[label_visual!=label_semantic]

    drop_acc_semantic = (drop_real_label==drop_label_semantic).sum()/len(drop_real_label)
    drop_acc_visual = (drop_real_label==drop_label_visual).sum()/len(drop_real_label)

    return acc, drop_rate, drop_acc_semantic, drop_acc_visual


# 开始测试

## 单故障

In [20]:
single_wm_tensor = torch.tensor(np.array(test_single_wm), dtype=torch.float32)
single_dataset = MyDataSet(single_wm_tensor,test_single_label)
single_loader = DataLoader(single_dataset, batch_size=32, shuffle=False)
del single_wm_tensor

In [21]:
x_att, _ = get_outputs(model, single_loader, device)
x_fea, y_real = get_outputs(model_fea, single_loader, device)

In [22]:
acc1, drop1, drop_semantic_acc1, drop_visual_acc1 = two_space_predict(x_fea, single_vc, x_att, single_defect_att, y_real)
acc2, drop2, drop_semantic_acc2, drop_visual_acc2 = two_space_predict(x_fea, total_vc, x_att, total_defect_att, y_real)

print(f'在狭义空间中，丢弃率为{drop1:.4f}的情况下，准确率为{acc1:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc1:.4f}，视觉准确率为{drop_visual_acc1:.4f}')
print(f'在广义空间中，丢弃率为{drop2:.4f}的情况下，准确率为{acc2:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc2:.4f}，视觉准确率为{drop_visual_acc2:.4f}')

在狭义空间中，丢弃率为0.0212的情况下，准确率为0.9949，丢弃的样本的语义准确率为0.4412，视觉准确率为0.3824
在广义空间中，丢弃率为0.0243的情况下，准确率为0.9962，丢弃的样本的语义准确率为0.2564，视觉准确率为0.3846


In [23]:
del single_dataset, single_loader

## 二故障

In [24]:
two_wm_tensor = torch.tensor(np.array(test_two_wm), dtype=torch.float32)
two_dataset = MyDataSet(two_wm_tensor,test_two_label)
two_loader = DataLoader(two_dataset, batch_size=32, shuffle=False)
del two_wm_tensor

In [25]:
x_att, _ = get_outputs(model, two_loader, device)
x_fea, y_real = get_outputs(model_fea, two_loader, device)

In [26]:
acc1, drop1, drop_semantic_acc1, drop_visual_acc1 = two_space_predict(x_fea, two_vc, x_att, two_defect_att, y_real)
acc2, drop2, drop_semantic_acc2, drop_visual_acc2 = two_space_predict(x_fea, total_vc, x_att, total_defect_att, y_real)

print(f'在狭义空间中，丢弃率为{drop1:.4f}的情况下，准确率为{acc1:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc1:.4f}，视觉准确率为{drop_visual_acc1:.4f}')
print(f'在广义空间中，丢弃率为{drop2:.4f}的情况下，准确率为{acc2:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc2:.4f}，视觉准确率为{drop_visual_acc2:.4f}')

在狭义空间中，丢弃率为0.0519的情况下，准确率为0.9501，丢弃的样本的语义准确率为0.6444，视觉准确率为0.2593
在广义空间中，丢弃率为0.1288的情况下，准确率为0.8967，丢弃的样本的语义准确率为0.8000，视觉准确率为0.1224


In [27]:
del two_dataset, two_loader

## 三故障

In [28]:
three_wm_tensor = torch.tensor(np.array(test_three_wm), dtype=torch.float32)
three_dataset = MyDataSet(three_wm_tensor,test_three_label)
three_loader = DataLoader(three_dataset, batch_size=32, shuffle=False)
del three_wm_tensor

In [29]:
x_att, _ = get_outputs(model, three_loader, device)
x_fea, y_real = get_outputs(model_fea, three_loader, device)

In [30]:
acc1, drop1, drop_semantic_acc1, drop_visual_acc1 = two_space_predict(x_fea, three_vc, x_att, three_defect_att, y_real)
acc2, drop2, drop_semantic_acc2, drop_visual_acc2 = two_space_predict(x_fea, total_vc, x_att, total_defect_att, y_real)

print(f'在狭义空间中，丢弃率为{drop1:.4f}的情况下，准确率为{acc1:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc1:.4f}，视觉准确率为{drop_visual_acc1:.4f}')
print(f'在广义空间中，丢弃率为{drop2:.4f}的情况下，准确率为{acc2:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc2:.4f}，视觉准确率为{drop_visual_acc2:.4f}')

在狭义空间中，丢弃率为0.0904的情况下，准确率为0.8140，丢弃的样本的语义准确率为0.4240，视觉准确率为0.2350
在广义空间中，丢弃率为0.1421的情况下，准确率为0.6129，丢弃的样本的语义准确率为0.4809，视觉准确率为0.1994


In [31]:
del three_dataset, three_loader

## 四故障

In [32]:
four_wm_tensor = torch.tensor(np.array(test_four_wm), dtype=torch.float32)
four_dataset = MyDataSet(four_wm_tensor,test_four_label)
four_loader = DataLoader(four_dataset, batch_size=32, shuffle=False)
del four_wm_tensor

In [33]:
x_att, _ = get_outputs(model, four_loader, device)
x_fea, y_real = get_outputs(model_fea, four_loader, device)

In [34]:
acc1, drop1, drop_semantic_acc1, drop_visual_acc1 = two_space_predict(x_fea, four_vc, x_att, four_defect_att, y_real)
acc2, drop2, drop_semantic_acc2, drop_visual_acc2 = two_space_predict(x_fea, total_vc, x_att, total_defect_att, y_real)

print(f'在狭义空间中，丢弃率为{drop1:.4f}的情况下，准确率为{acc1:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc1:.4f}，视觉准确率为{drop_visual_acc1:.4f}')
print(f'在广义空间中，丢弃率为{drop2:.4f}的情况下，准确率为{acc2:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc2:.4f}，视觉准确率为{drop_visual_acc2:.4f}')

在狭义空间中，丢弃率为0.1375的情况下，准确率为0.7101，丢弃的样本的语义准确率为0.7727，视觉准确率为0.2273
在广义空间中，丢弃率为0.2312的情况下，准确率为0.3577，丢弃的样本的语义准确率为0.3351，视觉准确率为0.3568


In [35]:
del four_dataset, four_loader

## 混合故障

In [36]:
test_mul_wm = test_two_wm + test_three_wm + test_four_wm
test_mul_label = test_two_label + test_three_label + test_four_label

In [37]:
mul_wm_tensor = torch.tensor(np.array(test_mul_wm), dtype=torch.float32)
mul_dataset = MyDataSet(mul_wm_tensor,test_mul_label)
mul_loader = DataLoader(mul_dataset, batch_size=32, shuffle=False)
del mul_wm_tensor

In [38]:
x_att, _ = get_outputs(model, mul_loader, device)
x_fea, y_real = get_outputs(model_fea, mul_loader, device)

In [39]:
acc1, drop1, drop_semantic_acc1, drop_visual_acc1 = two_space_predict(x_fea, mul_vc, x_att, mul_defect_att, y_real)
acc2, drop2, drop_semantic_acc2, drop_visual_acc2 = two_space_predict(x_fea, total_vc, x_att, total_defect_att, y_real)

print(f'在狭义空间中，丢弃率为{drop1:.4f}的情况下，准确率为{acc1:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc1:.4f}，视觉准确率为{drop_visual_acc1:.4f}')
print(f'在广义空间中，丢弃率为{drop2:.4f}的情况下，准确率为{acc2:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc2:.4f}，视觉准确率为{drop_visual_acc2:.4f}')

在狭义空间中，丢弃率为0.1364的情况下，准确率为0.7357，丢弃的样本的语义准确率为0.5234，视觉准确率为0.2326
在广义空间中，丢弃率为0.1484的情况下，准确率为0.7113，丢弃的样本的语义准确率为0.5738，视觉准确率为0.2033


In [40]:
del mul_dataset, mul_loader

## 所有故障

In [41]:
total_dataset = MyDataSet(test_wm_tensor,test_label)
total_loader = DataLoader(total_dataset, batch_size=32, shuffle=False)
del test_wm_tensor

In [42]:
x_att, _ = get_outputs(model, total_loader, device)
x_fea, y_real = get_outputs(model_fea, total_loader, device)

In [43]:
acc, drop, drop_semantic_acc, drop_visual_acc = two_space_predict(x_fea, total_vc, x_att, total_defect_att, y_real)

print(f'丢弃率为{drop:.4f}的情况下，准确率为{acc:.4f}，丢弃的样本的语义准确率为{drop_semantic_acc:.4f}，视觉准确率为{drop_visual_acc:.4f}')

丢弃率为0.1215的情况下，准确率为0.7799，丢弃的样本的语义准确率为0.5600，视觉准确率为0.2111


In [44]:
del total_dataset, total_loader