In [147]:
from common_lib import DataManager
from common_lib import AnnotationManager

In [148]:
import random
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from PIL import Image
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from tqdm.notebook import tqdm
import timm
import cv2

## 参数设置

In [149]:
batch_size = 16
seed = 42
lr = 0.003
epochs = 5

In [150]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device =  torch.device("cpu")
device

device(type='cuda', index=0)

In [151]:
def seed_everything(seed): #将所有的随机变量设置其随机种子
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(seed)

## 数据加载和类型设置

In [152]:
img_root = "F:/mountMl_fan/bearing/"

In [153]:
train_data = DataManager.load("./jsons/train.json")
val_data = DataManager.load("./jsons/val.json")
test_data = DataManager.load("./jsons/test.json")

In [154]:
am = AnnotationManager(train_data.class_dict)
am.setup_distribution_type(
    distribution_classes = am._all_classes,
    distribution_type = 'multilabel'
)

In [155]:
# rec = train_data[0]
# print(rec)
# am.get_distribution(rec)

In [156]:
# 多标签分类
class CharacterDataset(Dataset): #定义数据集 定义len getitem函数 重写len 和getitem魔法方法
    def __init__(self, data,am,_root,transform=None):  #设置文件和对应的转换操作
        self.file= data
        self.transform = transform
        self.img_root = img_root
        self.am = am
    def __len__(self):
        self.filelen = len(self.file)
        return self.filelen

    def __getitem__(self, idx): #p[key] 取值，当实例对象做p[key] 运算时，会调用类中的方法__getitem__
        rec = self.file[idx]
        img_path = self.img_root+rec['info']['image_path']
        img = Image.open(img_path)
#         img = img.convert("RGB")
        img_transformed = self.transform(img)
#         am = AnnotationManager(self.file.class_dict)
        label = self.am.get_distribution(rec) #使用传入的am 形式为[0] 选取[0]
        label = torch.tensor(label)
#         label = img_path.split(os.path.sep)[-1].split(".")[0]
#         label = 1 if label == "dog" else 0 #如果标签为dog 则设置为1 否则为0（狗为1 猫为0）
#         print(label)
        return img_transformed, label

In [157]:
#训练集 验证集 测试集 处理为相同的图像增强操作
train_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),  # 将给定图像随机裁剪为不同的大小和宽高比，然后缩放所裁剪得到的图像为制定的大小；
#         transforms.RandomHorizontalFlip(),  # 以给定的概率随机水平旋转给定的PIL的图像，默认为0.5；
        transforms.ToTensor(),              # 归一化 并且转化为张量
    ]
)

val_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
#         transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ]
)


test_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
#         transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ]
)

In [158]:
train_dataset = CharacterDataset(train_data,am,img_root, transform=train_transforms)  
valid_dataset = CharacterDataset(val_data,am,img_root, transform=test_transforms)
test_dataset = CharacterDataset(test_data,am,img_root, transform=test_transforms)

In [159]:
train_loader = DataLoader(dataset = train_dataset, batch_size=batch_size, shuffle=True )
valid_loader = DataLoader(dataset = valid_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset = test_dataset, batch_size=batch_size, shuffle=False)

## 预览数据集

In [160]:
# for data,label in train_loader:
#     fig, axes = plt.subplots(4, 4, figsize=(12, 12))
#     print(data.shape)
#     print(label)
#     print(len(label),len(label[0]))
#     for idx, ax in enumerate(axes.ravel()):
#         img = data[idx].permute(1, 2, 0)
#         ax.set_title(label[idx]) #轴添加
#         ax.imshow(img,cmap="gray")
#     break

## 模型构建

### 多标签的评估

In [161]:
y =  torch.tensor([1.,0.,1.,1])
pre = torch.tensor([0.1,0.2,0.4,0.6])>0.5
pre = pre.float()
pre,pre*y

(tensor([0., 0., 0., 1.]), tensor([0., 0., 0., 1.]))

In [162]:
torch.tensor([[0.1,0.2,0.4,0.6],[0.9,0.2,0.4,0.6]])

tensor([[0.1000, 0.2000, 0.4000, 0.6000],
        [0.9000, 0.2000, 0.4000, 0.6000]])

In [163]:
# 计算准确率——方式1
# 设定一个阈值，当预测的概率值大于这个阈值，则认为这幅图像中含有这类标签
def calculate_acuracy_mode_one(model_pred, labels,accuracy_th): 
    # 注意这里的model_pred是经过sigmoid处理的，sigmoid处理后可以视为预测是这一类的概率
    # 预测结果，大于这个阈值则视为预测正确
#     accuracy_th = 0.5
    pred_result = model_pred > accuracy_th
    pred_result = pred_result.float()      #将bool的变量转化为float类型 
    pred_one_num = torch.sum(pred_result)  #预测为1的个数
    if pred_one_num == 0:
        return 0, 0
    target_one_num = torch.sum(labels)
    true_predict_num = torch.sum(pred_result * labels) #做交集再求和 也就是TP的个数
    # 模型预测的结果中有多少个是正确的
    precision = true_predict_num / pred_one_num
    # 模型预测正确的结果中，占所有真实标签的数量
    recall = true_predict_num / target_one_num
#     return precision, recall
 
    return precision.item(), recall.item()
 


In [164]:
# 计算准确率——方式2
# 取预测概率最大的前top个标签，作为模型的预测结果
def calculate_acuracy_mode_two_for_item(model_pred, labels,top):  #整体
    # 取前top个预测结果作为模型的预测结果
    precision = 0
    recall = 0
    # 对预测结果进行按概率值进行降序排列，取概率最大的top个结果作为模型的预测结果
    pred_label_locate  = torch.argsort(model_pred, descending=True)[0:top]  #按照降序排序 排序的结果是其下标 默认按照最后一个维度排序
    print(pred_label_locate )
    temp_label = torch.zeros(model_pred.shape[0])
    temp_label[pred_label_locate ] = 1  #将其一张图片对象的预测结果输出
    print(temp_label)
    target_one_num = torch.sum(labels)
    true_predict_num = torch.sum(temp_label * labels)
        # 对每一幅图像进行预测准确率的计算
    precision += true_predict_num / top #预测的T中对了几个
        # 对每一幅图像进行预测查全率的计算
    recall += true_predict_num / target_one_num
    return precision, recall

In [165]:
# 计算准确率——方式2
# 取预测概率最大的前top个标签，作为模型的预测结果
def calculate_acuracy_mode_two(model_pred, labels,top):
    # 取前top个预测结果作为模型的预测结果
    precision = 0
    recall = 0
#     top = 5
    # 对预测结果进行按概率值进行降序排列，取概率最大的top个结果作为模型的预测结果
    pred_label_locate = torch.argsort(model_pred, descending=True)[:, 0:top]
    for i in range(model_pred.shape[0]):
        temp_label = torch.zeros(1, model_pred.shape[1])
        temp_label[0,pred_label_locate[i]] = 1
        target_one_num = torch.sum(labels[i])
        true_predict_num = torch.sum(temp_label * labels[i])
        # 对每一幅图像进行预测准确率的计算
        print(precision,recall)
        precision += true_predict_num / top
        # 对每一幅图像进行预测查全率的计算
        recall += true_predict_num / target_one_num
    return (precision/model_pred.shape[0]).item(), (recall/model_pred.shape[0]).item()


In [166]:
## 测试
# model_pred = torch.tensor([[0.6,0.55,0.2,0.1],[0.6,0.1,0.7,0.1]])
# labels = torch.tensor([[1,0,0,0],[1,0,1,0]])

# # model_pred = torch.tensor([0.6,0.6,0.2,0.1])
# # labels = torch.tensor([1,0,0,0])
# # model_pred.shape[0]
# calculate_acuracy_mode_one(model_pred,labels,0.5),calculate_acuracy_mode_two(model_pred,labels,2)

In [178]:
timm.list_models("*deit*")

['vit_deit_base_distilled_patch16_224',
 'vit_deit_base_distilled_patch16_384',
 'vit_deit_base_patch16_224',
 'vit_deit_base_patch16_384',
 'vit_deit_small_distilled_patch16_224',
 'vit_deit_small_patch16_224',
 'vit_deit_tiny_distilled_patch16_224',
 'vit_deit_tiny_patch16_224']

In [179]:
class multiLabel_net(nn.Module):
    def __init__(self,n_classes,pretrained = False):
        super(multiLabel_net,self).__init__()
        self.model = timm.create_model("vit_deit_tiny_distilled_patch16_224",pretrained=pretrained,in_chans =1,num_classes=n_classes)
        
    def forward(self,x):
        x,_= self.model(x)
        x = torch.sigmoid(x)   #方法
        return x
    def train_one_epoch(self,train_loader,criterion,optimizer,device,accuracy_th=0.5):
        epoch_loss = 0.0
        epoch_precision = 0.0
        epoch_recall = 0.0
        self.model.train()
        for i,(data,target) in enumerate(train_loader): #每一个批次的最后进行参数更新
            if device.type=="cuda":
                data,target = data.cuda(),target.cuda()
            optimizer.zero_grad()  #变量的梯度清零
#             output,aux = self.forward(data) #训练输出结果
            output = self.forward(data) #训练输出结果
    
#             print(target[0],output[0])
#             temp = ClassifierEvalMultilabel.compute_ap(target.detach().numpy(),output.detach().numpy())
#             print(temp)
#             for y_true,y_score in zip(target,output):
#                 y_true = y_true.detach().numpy()
#                 y_score = y_score.detach().numpy()
#                 print(y_true,y_score)
#                 temp = ClassifierEvalMultilabel.compute_ap(y_true,y_score) 
#                 print(temp)

            loss = criterion(output,target) #求解loss
            precision, recall = calculate_acuracy_mode_one(output,target,accuracy_th) 
            epoch_loss += loss
            epoch_precision += precision
            epoch_recall += recall   
            
            loss.backward() #计算每个参数的梯度值
            optimizer.step()  #参数根据梯度进行更新
            
            if(i%100==99):
                print(f"第{i+1}批次 loss: {loss:.4f} precision: {precision:.4f} recall: {recall:.4f} \n")
        return epoch_loss/len(train_loader),epoch_precision/len(train_loader),epoch_recall/len(train_loader)
    def validation_one_epoch(self,valid_loader,criterion,device,accuracy_th=0.5):
        valid_loss = 0.0
        valid_precision = 0.0
        valid_recall = 0.0
        self.model.eval()   #可以屏蔽BN层和dropout
        for data,target in valid_loader:
            if device.type=="cuda":
                data,target = data.cuda(),target.cuda()
            with torch.no_grad():
                output = self.model(data)
                output = torch.sigmoid(output)
#                 print(output)
                loss = criterion(output,target)
                valid_loss += loss
                precision, recall = calculate_acuracy_mode_one(output,target,accuracy_th) 
                valid_precision += precision
                valid_recall += recall   
        return valid_loss/len(valid_loader),valid_precision/len(valid_loader),valid_recall/len(valid_loader)

In [180]:
model = multiLabel_net(n_classes=len(am._all_classes),pretrained=True)
model.to(device)
criterion = nn.BCELoss ()  #BCELoss 需要先进行sigmoid  BCEWithLogitsLoss会包含sigmoid和BCE两部
optimizer = torch.optim.Adam(model.parameters(), lr=lr) #传入模型参数

## 训练模型

In [None]:
for epoch in range(epochs):
    print(f"第{epoch+1}轮---------------------------------》》》》》》\n")
    epoch_loss,epoch_precision,epoch_recall = model.train_one_epoch(train_loader, criterion, optimizer, device,0.5)
    epoch_val_loss,epoch_val_precision,epoch_val_recall = model.validation_one_epoch(valid_loader,criterion,device,0.5)
    print(
        f"Epoch : {epoch+1} - loss : {epoch_loss:.4f} - acc: {epoch_precision:.4f} -train_reacall: {epoch_recall:.4f} - val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_precision:.4f} -val_recall: {epoch_val_recall:.4f}\n"
    )

第1轮---------------------------------》》》》》》

第100批次 loss: 0.1491 precision: 0.7500 recall: 0.3750 

第200批次 loss: 0.1581 precision: 0.8125 recall: 0.4062 

第300批次 loss: 0.1511 precision: 0.7500 recall: 0.3750 

第400批次 loss: 0.1574 precision: 0.6875 recall: 0.3438 

第500批次 loss: 0.1511 precision: 0.7500 recall: 0.3750 

第600批次 loss: 0.1523 precision: 0.7500 recall: 0.3750 

第700批次 loss: 0.1553 precision: 0.6875 recall: 0.3438 

第800批次 loss: 0.1407 precision: 0.9375 recall: 0.4688 

第900批次 loss: 0.1497 precision: 0.8125 recall: 0.4062 

第1000批次 loss: 0.1576 precision: 0.6250 recall: 0.3125 

第1100批次 loss: 0.1514 precision: 0.8125 recall: 0.4062 

第1200批次 loss: 0.1364 precision: 0.8750 recall: 0.4375 

第1300批次 loss: 0.1351 precision: 0.6875 recall: 0.3438 

第1400批次 loss: 0.1306 precision: 0.8125 recall: 0.4062 

第1500批次 loss: 0.1368 precision: 0.8125 recall: 0.4062 

第1600批次 loss: 0.1286 precision: 0.6842 recall: 0.4062 

第1700批次 loss: 0.1265 precision: 0.8125 recall: 0.4062 

第1800批次 loss:

In [None]:
## 类别和评估

In [None]:
from classifier_eval import ClassifierEvalBinary

In [None]:
def cal_Ap_Adraw(batch_pred,batch_label,class_name):
    ap_list = []
    num_classes = batch_label.shape[1]
    for i in range(num_classes):
        y_true = batch_label[:,i].detach().cpu().numpy()
        y_score = batch_pred[:,i].detach().cpu().numpy()
        ap = ClassifierEvalBinary.compute_ap(y_true,y_score)
        ap_list.append(ap)
        pathname = "./ap_curve/"+class_name[i]+".png"
        ceb = ClassifierEvalBinary()
        ceb.draw_pr_curve(y_true,y_score,pathname,class_name[i])
    return np.array(ap_list)

In [None]:
def test_model(model,test_loader,criterion,device,accuracy_th,class_name):
    test_loss = 0.0
    test_precision = 0.0
    test_recall = 0.0
    model.eval()   #可以屏蔽BN层和dropout
    flag = 0
    all_pred = []
    for data,target in test_loader:
        if device.type=="cuda":
            data,target = data.cuda(),target.cuda()
        with torch.no_grad():
            output = model(data)
            output = torch.sigmoid(output)
            loss = criterion(output,target)
            test_loss += loss
            precision, recall = calculate_acuracy_mode_one(output,target,accuracy_th) 
            test_precision += precision
            test_recall += recall 
            if flag==0:
                all_pred = output
                all_true = target
                flag=1
            else:
                all_pred = torch.cat((all_pred,output))
                all_true = torch.cat((all_true,target))
    ap = cal_Ap_Adraw(all_pred,all_true,class_name)
    test_loss = test_loss/len(test_loader)
    test_precision = test_precision/len(test_loader)
    test_recall = test_recall/len(test_loader)
    print(f"ap_list is {ap} \n loss : {test_loss:.4f} - precision: {test_precision:.4f} - recall: {test_recall:.4f}\n")
    return all_pred.tolist()

In [None]:
class_name = am._all_classes

In [None]:
test_result = test_model(model,test_loader,criterion,device,0.5,class_name)

In [None]:
torch.save(model.state_dict(),"./models/deit_multilabel.pt")

In [None]:
# # create json
#
# test_result = []
# for batch in output_test:
#     batch_result = torch.softmax(batch,dim =  -1)
#     test_result += batch_result.tolist()
# print(len(test_result))    
record_list = list()
class_dict = test_data.class_dict
am = AnnotationManager(class_dict)
am.setup_distribution_type(
#     distribution_classes = ['0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'],
    distribution_classes = am._all_classes,
    distribution_type = 'multilabel'
)
for i,record in enumerate(test_data):
#     dist = [0.3, 0.7]  # dist is the output of your model
    dist = test_result[i]
    instance = am.create_instance(dist)
    record_list.append(
        {
            'info': record['info'],
            'instances': [instance]
        }
    )


data_out = DataManager(record_list, class_dict)
data_out.save('./jsons/deit_test_pred.json')