In [1]:
import torch
import torch.nn as nn
import time
import copy
import torch.optim as optim
import torchvision
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
import argparse

from PIL import Image
import cv2
from torchvision.datasets import ImageFolder
from sklearn.decomposition import PCA
from torchvision.models import resnet18, resnet34, resnet50, resnet101
from torch.utils.data import Dataset, DataLoader, TensorDataset
from skimage import io
import numpy as np

KeyboardInterrupt: 

In [None]:
BATCH_SIZE = 32
num_classes = 2
input_size = 224
num_epochs = 1
trainset_size = 0.8

In [None]:
import os
path = 'data/test/test/'
file_list = os.listdir(path)
 
for file in file_list:
    # 补0 10表示补0后名字共10位
    filename = file.zfill(9)
    # print(filename)
    new_name = ''.join(filename)
    os.rename(path + '\\' + file, path + '\\' + new_name)

In [None]:
import json
with open('label.json','r',encoding='utf8')as fp:
    json_data = json.load(fp)
test_label = []
for i in range(1047):
    test_label.append(int(json_data['data{}'.format(i)][-2:])) # pos 120 neg 300
test_label = np.array(test_label)

In [None]:
test_label[test_label==-1] = 0
test_label

In [None]:
img=io.imread('data/test/test/00001.jpg')
io.imshow(img)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [None]:
data_transform_norm = {
    'norm': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
}
norm_data = ImageFolder(root='data/train/', transform=data_transform_norm['norm'])

In [None]:
norm_matrix = []
for i in range(len(norm_data)):
    norm_matrix.append(np.array(norm_data[i][0]))
norm_matrix = np.array(norm_matrix)
norm_matrix.shape

In [None]:
def get_mean_var(norm_matrix):
    mean = np.mean(norm_matrix,(0,2,3))
    std = np.std(norm_matrix,(0,2,3))
    return mean, std
train_mean, train_var = get_mean_var(norm_matrix)

In [None]:
data_transform = {
    'train':transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor()
#         ,
#         transforms.Normalize(train_mean, train_var)
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
#         ,
#         transforms.Normalize(train_mean, train_var)
    ])
}
train_data = ImageFolder(root='data/train/', transform=data_transform['train'])

train_size = int(0.8 * len(train_data))
val_size = len(train_data) - train_size
train_set, val_set = torch.utils.data.random_split(train_data, [train_size, val_size])

test_data = ImageFolder(root='data/test/', transform=data_transform['test'])

In [None]:

# nptestx = np.array(test_x).swapaxes(1,3)
list(test_data[0])
test_x = []
train_array = []
for i in range(len(test_data)):
    test_x.append(test_data[i][0].numpy())

for i in range(len(train_data)):
    train_array.append(train_data[i][0].numpy())
train_array = np.array(train_array)
    
    
test_x = torch.tensor(np.array(test_x))
test_y = torch.tensor(np.array(test_label),dtype=torch.long)

test_data_ = TensorDataset(test_x, test_y)

In [None]:
test_y.type()

In [None]:
train_data.class_to_idx

In [None]:
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
test_loader = DataLoader(test_data_, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
loader = {'train':train_loader, 'val':test_loader, 'test':test_loader}

In [None]:
test_data_

In [None]:
test_data

In [None]:
resnet = resnet34(pretrained = True).to(device)
# set_parameter_requires_grad(model_ft, feature_extract)
num_ftrs = resnet.fc.in_features
resnet.fc = nn.Linear(num_ftrs, num_classes).to(device)

In [None]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, is_inception=False):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)
        
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  
            else:
                model.eval()   

            running_loss = 0.0
            running_corrects = 0
            
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    if is_inception and phase == 'train':
                        outputs, aux_outputs = model(inputs)
                        loss1 = criterion(outputs, labels)
                        loss2 = criterion(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    model.load_state_dict(best_model_wts)
    return model, val_acc_history

In [None]:
num_epochs = 15
params_to_update = resnet.parameters()
optimizer_ft = optim.Adam(params_to_update, lr=0.001)
criterion = nn.CrossEntropyLoss()
model_ft, hist = train_model(resnet, 
                             loader, 
                             criterion, 
                             optimizer_ft, 
                             num_epochs=num_epochs)

In [None]:
# test
resnet.eval()
all_outputs = []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        test_outputs = resnet(inputs)
        _, preds = torch.max(test_outputs, 1)
        all_outputs = np.concatenate((all_outputs, preds.cpu().numpy()))
        

# print(np.mean(all_outputs == test_label))

all_outputs[all_outputs==0] = -1

In [None]:
all_outputs[979]

In [None]:
def store_txt(label,result_txt):
    with open(result_txt,"w") as w:
        for index,result in enumerate(label,1):#label后面的1,代表下标从1开始
            if index != len(label):
                w.write(str(index)+" "+str(int(result))+"\n")
            else:
                w.write(str(index)+" "+str(int(result)))
                
store_txt(all_outputs,'./task4_resnet101.txt')

In [None]:
def draw_features(width,height,x,savename):
    tic=time.time()
    fig = plt.figure(figsize=(16, 16))
    fig.subplots_adjust(left=0.05, right=0.95, bottom=0.05, top=0.95, wspace=0.05, hspace=0.05)
    for i in range(width*height):
        plt.subplot(height,width, i + 1)
        plt.axis('off')
        # plt.tight_layout()
        img = x[0, i, :, :]
        pmin = np.min(img)
        pmax = np.max(img)
        img = (img - pmin) / (pmax - pmin + 0.000001)
        plt.imshow(img, cmap='gray')
        print("{}/{}".format(i,width*height))
    fig.savefig(savename, dpi=100)
    fig.clf()
    plt.close()
    print("time:{}".format(time.time()-tic))

In [None]:
# 特征脸

# 绘图函数
def plot_faces(faces):
    fig, axes = plt.subplots(6, 6, figsize=(10,10),
                            subplot_kw ={'xticks':[], 'yticks':[]},
    gridspec_kw = dict(hspace=0.1,wspace=0.1))
    
    for i, ax in enumerate(axes.flat):
        ax.imshow(np.array(faces[i]).transpose(1,2,0))
        plt.show
def plot_faces_pca(faces):
    fig, axes = plt.subplots(6, 6, figsize=(10,10),
                            subplot_kw ={'xticks':[], 'yticks':[]},
    gridspec_kw = dict(hspace=0.1,wspace=0.1))
    for i, ax in enumerate(axes.flat):
        ax.imshow(faces[i].reshape(224,224),cmap = 'bone')
        plt.show
# 特征脸

unmask_x = train_array[:1100]
mask_x = train_array[1100:]
def plot_eigenface(data):
    pca = PCA()
    data = np.mean(data,axis = 1).reshape((len(data), -1))
    pca.fit(data)
    pca.components_.shape #36*361
    plot_faces_pca(pca.components_)
plot_eigenface(unmask_x)
plot_eigenface(mask_x)

In [None]:
test_x[0]
io.imshow(np.array(test_x[0]).transpose(1,2,0))

In [None]:
test_array = np.array(test_x)
test_array.shape

In [None]:
img_path = 'X:\\PatternRecognition\\4\\data\\test\\test\\00986.jpg'
img = Image.open(img_path).convert('RGB')


In [None]:
# features_grad = None
def extract(g):
    global features_grad
    features_grad = g

In [None]:
def draw_CAM(model, img_num, save_path, transform=None, visual_heatmap=False):
    '''
    绘制 Class Activation Map
    :param model: 加载好权重的Pytorch model
    :param img_path: 测试图片路径
    :param save_path: CAM结果保存路径
    :param transform: 输入图像预处理方法
    :param visual_heatmap: 是否可视化原始heatmap（调用matplotlib）
    :return:
    '''
    # 图像加载&预处理
#     img = Image.open(img_path).convert('RGB')
#     if transform:
#         img = transform(img)
#     img = img.unsqueeze(0)
    img = test_data[int(img_num)-1][0]
    img = img.unsqueeze(0).cuda()
    # 获取模型输出的feature/score
    model.eval()
    def my_forward(model, x):
#         print(*list(model.children())[:-2])
        mo1 = nn.Sequential(*list(model.children())[:-1])
        mo2 = nn.Sequential(*list(model.children())[:-2])
        feature1 = mo1(x).view(x.size(0), -1)
        feature2 = mo2(x)
        output= model.fc(feature1)
        return feature2, output

    myfeature, myoutput=my_forward(resnet, img)
    features = myfeature
    
    output = myoutput
#     output = model.fc(features)
#     print(features.shape)
    # 为了能读取到中间梯度定义的辅助函数
    def extract(g):
        global features_grad
        features_grad = g
 
    # 预测得分最高的那一类对应的输出score
    pred = torch.argmax(output).item()
    if pred == 1:
        print('masked')
    elif pred == 0:
        print('unmasked')
#     print(pred)
    pred_class = output[:, pred]
 
    features.register_hook(extract)
    pred_class.backward() # 计算梯度
 
    grads = features_grad   # 获取梯度
#     print(grads.shape)
 
    pooled_grads = torch.nn.functional.adaptive_avg_pool2d(grads, (1, 1))
 
    # 此处batch size默认为1，所以去掉了第0维（batch size维）
    pooled_grads = pooled_grads[0]
    features = features[0]
    # 512是最后一层feature的通道数
    for i in range(512):
        features[i, ...] *= pooled_grads[i, ...]
 
    # 以下部分同Keras版实现
    heatmap = features.detach().cpu().numpy()
#     print('1', heatmap.shape)
    heatmap = np.mean(heatmap, axis=0)
 
    heatmap = np.maximum(heatmap, 0)
#     print('2', heatmap.shape)
#     print(heatmap)
    heatmap /= np.max(heatmap)
    
    heatmap_ = heatmap
 
    # 可视化原始热力图
    if visual_heatmap:
        plt.matshow(heatmap)
        plt.show()
        
    img_path = '.\\data\\test\\test\\{}.jpg'.format(img_num)
    save_path = '.\\cam_output\\cam_output_{}.jpg'.format(img_num)
    
    img = cv2.imread(img_path)  # 用cv2加载原始图像
    heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))  # 将热力图的大小调整为与原始图像相同
    heatmap = np.uint8(255 * heatmap)  # 将热力图转换为RGB格式
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)  # 将热力图应用于原始图像
    superimposed_img = heatmap * 0.4 + img  # 这里的0.4是热力图强度因子
    cv2.imwrite(save_path, superimposed_img)  # 将图像保存到硬盘
    
    return heatmap_
    



heatmap_ = draw_CAM(resnet,'00980', save_path, transform=None, visual_heatmap=False)

In [None]:
io.imshow(heatmap_)