在自己的图像分类数据集上，使用ImageNet预训练图像分类模型初始化，改动分类层，迁移学习微调训练

In [1]:
# 导入工具包
import time
import os
from tqdm import tqdm

import pandas as pd
import numpy as np

import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt
%matplotlib inline

# 忽略烦人的红色提示
import warnings
warnings.filterwarnings("ignore")

In [2]:
torch.cuda.is_available() 

True

In [3]:
# 获取计算硬件
# 有 GPU 就用 GPU，没有就用 CPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('device', device)

device cuda:0


In [4]:
#图像预处理
from torchvision import transforms

# 训练集图像预处理：缩放裁剪、图像增强、转 Tensor、归一化
train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                     ])

# 测试集图像预处理-RCTN：缩放、裁剪、转 Tensor、归一化
test_transform = transforms.Compose([transforms.Resize(256),
                                     transforms.CenterCrop(224),
                                     transforms.ToTensor(),
                                     transforms.Normalize(
                                         mean=[0.485, 0.456, 0.406], 
                                         std=[0.229, 0.224, 0.225])
                                    ])

In [5]:
# 数据集文件夹路径
dataset_dir = 'F:/jupyterNotebook/code/lungImageSet_split'
train_path = os.path.join(dataset_dir, 'train')
test_path = os.path.join(dataset_dir, 'test')
print('训练集路径', train_path)
print('测试集路径', test_path)

from torchvision import datasets
# 载入训练集
train_dataset = datasets.ImageFolder(train_path, train_transform)
# 载入测试集
test_dataset = datasets.ImageFolder(test_path, test_transform)

print('训练集图像数量', len(train_dataset))
print('类别个数', len(train_dataset.classes))
print('各类别名称', train_dataset.classes)
print('测试集图像数量', len(test_dataset))
print('类别个数', len(test_dataset.classes))
print('各类别名称', test_dataset.classes)

训练集路径 F:/jupyterNotebook/code/lungImageSet_split\train
测试集路径 F:/jupyterNotebook/code/lungImageSet_split\test
训练集图像数量 12000
类别个数 3
各类别名称 ['lung_aca', 'lung_n', 'lung_scc']
测试集图像数量 3000
类别个数 3
各类别名称 ['lung_aca', 'lung_n', 'lung_scc']


In [6]:
train_dataset

Dataset ImageFolder
    Number of datapoints: 12000
    Root location: F:/jupyterNotebook/code/lungImageSet_split\train
    StandardTransform
Transform: Compose(
               RandomResizedCrop(size=(224, 224), scale=(0.08, 1.0), ratio=(0.75, 1.3333), interpolation=bilinear), antialias=None)
               RandomHorizontalFlip(p=0.5)
               ToTensor()
               Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
           )

In [7]:
# 各类别名称
class_names = train_dataset.classes
n_class = len(class_names)
# 映射关系：类别 到 索引号
train_dataset.class_to_idx
# 映射关系：索引号 到 类别
idx_to_labels = {y:x for x,y in train_dataset.class_to_idx.items()}

In [8]:
idx_to_labels

{0: 'lung_aca', 1: 'lung_n', 2: 'lung_scc'}

In [9]:
#定义数据加载器
from torch.utils.data import DataLoader

BATCH_SIZE = 8

# 训练集的数据加载器
train_loader = DataLoader(train_dataset,
                          batch_size=BATCH_SIZE,
                          shuffle=True,
                          num_workers=4
                         )

# 测试集的数据加载器
test_loader = DataLoader(test_dataset,
                         batch_size=BATCH_SIZE,
                         shuffle=False,
                         num_workers=4
                        )

In [10]:
# 导入训练所需工具包
from torchvision import models
import torch.optim as optim
from torch.optim import lr_scheduler

In [11]:
print(models)

<module 'torchvision.models' from 'F:\\Anaconda3\\lib\\site-packages\\torchvision\\models\\__init__.py'>


In [12]:
model = models.resnet50(pretrained=True) # 载入预训练模型
model.fc = nn.Linear(model.fc.in_features, n_class) 
# model = models.v(pretrained=True) # 载入预训练模型
# model.fc = nn.Linear(model.classifier[0].in_features, n_class) 
optimizer = optim.Adam(model.parameters()) # 微调训练所有层

In [13]:
device

device(type='cuda', index=0)

In [14]:
model = model.to(device)

# 交叉熵损失函数
criterion = nn.CrossEntropyLoss() 

# 训练轮次 Epoch
EPOCHS = 15

# 学习率降低策略
lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

In [15]:
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score

In [16]:
def train_one_batch(images, labels):
    
    # 运行一个 batch 的训练，返回当前 batch 的训练日志
    
    # 获得一个 batch 的数据和标注
    images = images.to(device)
    labels = labels.to(device)
    
    outputs = model(images) # 输入模型，执行前向预测
    loss = criterion(outputs, labels) # 计算当前 batch 中，每个样本的平均交叉熵损失函数值
    
    # 优化更新权重
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    # 获取当前 batch 的标签类别和预测类别
    _, preds = torch.max(outputs, 1) # 获得当前 batch 所有图像的预测类别
    preds = preds.cpu().numpy()
    loss = loss.detach().cpu().numpy()
    outputs = outputs.detach().cpu().numpy()
    labels = labels.detach().cpu().numpy()
    
    log_train = {}
    log_train['epoch'] = epoch
    log_train['batch'] = batch_idx
    # 计算分类评估指标 用于动态监测loss与accuracy
    log_train['train_loss'] = loss
    log_train['train_accuracy'] = accuracy_score(labels, preds)
    # log_train['train_precision'] = precision_score(labels, preds, average='macro')
    # log_train['train_recall'] = recall_score(labels, preds, average='macro')
    # log_train['train_f1-score'] = f1_score(labels, preds, average='macro')
    
    return log_train

In [17]:
def evaluate_testset():
    # 在整个测试集上评估，返回分类评估指标日志

    loss_list = []
    labels_list = []
    preds_list = []
    
    with torch.no_grad():
        for images, labels in test_loader: # 生成一个 batch 的数据和标注
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images) # 输入模型，执行前向预测

            # 获取整个测试集的标签类别和预测类别
            _, preds = torch.max(outputs, 1) # 获得当前 batch 所有图像的预测类别
            preds = preds.cpu().numpy()
            loss = criterion(outputs, labels) # 由 logit，计算当前 batch 中，每个样本的平均交叉熵损失函数值
            loss = loss.detach().cpu().numpy()
            outputs = outputs.detach().cpu().numpy()
            labels = labels.detach().cpu().numpy()

            loss_list.append(loss)
            labels_list.extend(labels)
            preds_list.extend(preds)
        
    log_test = {}
    log_test['epoch'] = epoch
    
    # 计算分类评估指标
    log_test['test_loss'] = np.mean(loss)
    log_test['test_accuracy'] = accuracy_score(labels_list, preds_list)
    log_test['test_precision'] = precision_score(labels_list, preds_list, average='macro')
    log_test['test_recall'] = recall_score(labels_list, preds_list, average='macro')
    log_test['test_f1-score'] = f1_score(labels_list, preds_list, average='macro')
    
    return log_test

In [18]:
epoch = 0
batch_idx = 0
best_test_accuracy = 0

In [19]:
# 训练日志-训练集
df_train_log = pd.DataFrame()
log_train = {}
log_train['epoch'] = 0
log_train['batch'] = 0
images, labels = next(iter(train_loader))
log_train.update(train_one_batch(images, labels))
df_train_log = df_train_log.append(log_train, ignore_index=True)

In [20]:
df_train_log

Unnamed: 0,epoch,batch,train_loss,train_accuracy
0,0,0,1.3718551,0.25


In [21]:
# 训练日志-测试集
df_test_log = pd.DataFrame()
log_test = {}
log_test['epoch'] = 0
log_test.update(evaluate_testset())
df_test_log = df_test_log.append(log_test, ignore_index=True)

In [22]:
df_test_log

Unnamed: 0,epoch,test_loss,test_accuracy,test_precision,test_recall,test_f1-score
0,0.0,1.931125,0.332667,0.408674,0.332667,0.258047


In [23]:
import wandb

wandb.init(project='lung cancer', name=time.strftime('%m%d%H%M%S'))

[34m[1mwandb[0m: Currently logged in as: [33mjyjy2001lfx[0m ([33mjnjy[0m). Use [1m`wandb login --relogin`[0m to force relogin


VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.016916666666899498, max=1.0…

In [24]:
for epoch in range(1, EPOCHS+1):
    
    print(f'Epoch {epoch}/{EPOCHS}') # 打印当前训练轮数
    
    ## 训练阶段
    model.train()
    for images, labels in tqdm(train_loader): # 获得一个 batch 的数据和标注
        batch_idx += 1
        log_train = train_one_batch(images, labels) # 一次训练一个batch
        df_train_log = df_train_log.append(log_train, ignore_index=True)
        wandb.log(log_train)
        
    lr_scheduler.step()

    ## 测试阶段
    model.eval()
    log_test = evaluate_testset()
    df_test_log = df_test_log.append(log_test, ignore_index=True)
    wandb.log(log_test)
    
    # 保存最新的最佳模型文件
    if log_test['test_accuracy'] > best_test_accuracy: 
        # 删除旧的最佳模型文件(如有)
        old_best_checkpoint_path = 'F:/jupyterNotebook/code/TL_demo_result/resnet50/checkpoints/2/best-{:.3f}.pth'.format(best_test_accuracy)
        if os.path.exists(old_best_checkpoint_path):
            os.remove(old_best_checkpoint_path)
        # 保存新的最佳模型文件
        new_best_checkpoint_path = 'F:/jupyterNotebook/code/TL_demo_result/resnet50/checkpoints/2/best-{:.3f}.pth'.format(log_test['test_accuracy'])
        torch.save(model, new_best_checkpoint_path)
        print('保存新的最佳模型', 'F:/jupyterNotebook/code/TL_demo_result/resnet50/checkpoints/2/best-{:.3f}.pth'.format(best_test_accuracy))
        best_test_accuracy = log_test['test_accuracy']

df_train_log.to_csv('训练日志-训练集.csv', index=False)
df_test_log.to_csv('训练日志-测试集.csv', index=False)

Epoch 1/15


 32%|████████████████████████▉                                                      | 474/1500 [02:57<06:24,  2.67it/s]


KeyboardInterrupt: 

In [25]:
# 载入最佳模型作为当前模型
model = torch.load('F:/jupyterNotebook/code/TL_demo_result/efficientnet_b5/checkpoints/2/best-{:.3f}.pth'.format(1.000))
# model = torch.load('F:/jupyterNotebook/code/TL_demo_result/resnet50/checkpoints/2/best-{:.3f}.pth'.format(best_test_accuracy))
model.eval()
print(evaluate_testset())

{'epoch': 1, 'test_loss': 0.00016193783, 'test_accuracy': 1.0, 'test_precision': 1.0, 'test_recall': 1.0, 'test_f1-score': 1.0}


In [None]:
# print(model)

In [26]:
from torchsummary import summary
summary(model, input_size=(3, 32, 32),batch_size=256)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [256, 48, 16, 16]           1,296
       BatchNorm2d-2          [256, 48, 16, 16]              96
              SiLU-3          [256, 48, 16, 16]               0
            Conv2d-4          [256, 48, 16, 16]             432
       BatchNorm2d-5          [256, 48, 16, 16]              96
              SiLU-6          [256, 48, 16, 16]               0
 AdaptiveAvgPool2d-7            [256, 48, 1, 1]               0
            Conv2d-8            [256, 12, 1, 1]             588
              SiLU-9            [256, 12, 1, 1]               0
           Conv2d-10            [256, 48, 1, 1]             624
          Sigmoid-11            [256, 48, 1, 1]               0
SqueezeExcitation-12          [256, 48, 16, 16]               0
           Conv2d-13          [256, 24, 16, 16]           1,152
      BatchNorm2d-14          [256, 24,

In [None]:
# pred = evaluate_testset()

In [None]:
# pred