# 迁移学习微调训练图像分类模型

在自己的图像分类数据集上，使用ImageNet预训练图像分类模型初始化，改动分类层，迁移学习微调训练

同济子豪兄：https://space.bilibili.com/1900783

[代码运行云GPU环境](https://featurize.cn/?s=d7ce99f842414bfcaea5662a97581bd1)：GPU RTX 3060、CUDA v11.2

## 导入工具包

In [16]:
import time
import os
from tqdm import tqdm

import pandas as pd
import numpy as np

import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt
%matplotlib inline

# 忽略烦人的红色提示
import warnings
warnings.filterwarnings("ignore")

# 获取计算硬件
# 有 GPU 就用 GPU，没有就用 CPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('device', device)

device cuda:0


## 图像预处理

In [17]:
from torchvision import transforms

# 训练集图像预处理：缩放裁剪、图像增强、转 Tensor、归一化
train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                     ])

# 测试集图像预处理-RCTN：缩放、裁剪、转 Tensor、归一化
test_transform = transforms.Compose([transforms.Resize(256),
                                     transforms.CenterCrop(224),
                                     transforms.ToTensor(),
                                     transforms.Normalize(
                                         mean=[0.485, 0.456, 0.406], 
                                         std=[0.229, 0.224, 0.225])
                                    ])

## 载入图像分类数据集

In [18]:
dataset_dir = r'D:\dataset\sr'

In [20]:
train_path = os.path.join(dataset_dir, 'train')
test_path = os.path.join(dataset_dir, 'val')
print('训练集路径', train_path)
print('测试集路径', test_path)

from torchvision import datasets
# 载入训练集
train_dataset = datasets.ImageFolder(train_path, train_transform)
# 载入测试集
test_dataset = datasets.ImageFolder(test_path, test_transform)

print('训练集图像数量', len(train_dataset))
print('类别个数', len(train_dataset.classes))
print('各类别名称', train_dataset.classes)
print('测试集图像数量', len(test_dataset))
print('类别个数', len(test_dataset.classes))
print('各类别名称', test_dataset.classes)

训练集路径 D:\dataset\sr\train
测试集路径 D:\dataset\sr\val
训练集图像数量 22046
类别个数 2
各类别名称 ['parasitized', 'uninfected']
测试集图像数量 5512
类别个数 2
各类别名称 ['parasitized', 'uninfected']


## 类别和索引号 映射字典

In [21]:
# 各类别名称
class_names = train_dataset.classes
n_class = len(class_names)
# 映射关系：类别 到 索引号
train_dataset.class_to_idx
# 映射关系：索引号 到 类别
idx_to_labels = {y:x for x,y in train_dataset.class_to_idx.items()}

In [22]:
idx_to_labels

{0: 'parasitized', 1: 'uninfected'}

In [23]:
# 保存为本地的 npy 文件
np.save('idx_to_labels.npy', idx_to_labels)
np.save('labels_to_idx.npy', train_dataset.class_to_idx)

OSError: [Errno 28] No space left on device: 'idx_to_labels.npy'

## 定义数据加载器DataLoader

In [24]:
from torch.utils.data import DataLoader

BATCH_SIZE = 32

# 训练集的数据加载器
train_loader = DataLoader(train_dataset,
                          batch_size=BATCH_SIZE,
                          shuffle=True,
                          num_workers=4
                         )

# 测试集的数据加载器
test_loader = DataLoader(test_dataset,
                         batch_size=BATCH_SIZE,
                         shuffle=False,
                         num_workers=4
                        )

## 导入训练需使用的工具包

In [25]:
from torchvision import models
import torch.optim as optim
from torch.optim import lr_scheduler

## 选择迁移学习训练方式

斯坦福CS231N【迁移学习】中文精讲：https://www.bilibili.com/video/BV1K7411W7So

斯坦福CS231N【迁移学习】官方笔记：https://cs231n.github.io/transfer-learning

如果你的数据集和MS COCO数据集的图像域**类似**（街景、动植物、生活用品），可以保留预训练模型权重，在自己的数据集上迁移学习微调分类输出层或所有层。站在巨人的肩膀上，复用预训练模型在MS COCO数据集上学习到的图像特征。（Transfer Learning, Fine Tuning）

如果你的数据集和MS COCO数据集的图像域**不类似**（医疗影像、显微镜图像、工业检测、天文照片、动画、油画），可以随机初始化模型权重，在自己的数据集上重新训练所有层。（From Scratch）。或者冻结底层权重，只重新训练顶层，复用预训练模型在MS COCO数据集上学习到的底层图像特征。

### 选择一：只微调训练模型最后一层（全连接分类层）

In [12]:
# 设置模型路径
model_path = r'E:\MV-Code-202018010103-Lucy\Model\full_model.pth'

# 加载模型
model = torch.load(model_path)


In [15]:
model.fc = nn.Linear(model.fc.in_features, n_class)

AttributeError: 'ConvertModel' object has no attribute 'dense'

In [26]:
model = models.resnet18(pretrained=True) # 载入预训练模型

# 修改全连接层，使得全连接层的输出与当前数据集类别数对应
# 新建的层默认 requires_grad=True
model.fc = nn.Linear(model.fc.in_features, n_class)

In [27]:
model.fc

Linear(in_features=512, out_features=2, bias=True)

In [28]:
# 只微调训练最后一层全连接层的参数，其它层冻结
optimizer = optim.Adam(model.fc.parameters())

### 选择二：微调训练所有层

In [13]:
# model = models.resnet18(pretrained=True) # 载入预训练模型

# model.fc = nn.Linear(model.fc.in_features, n_class)

# optimizer = optim.Adam(model.parameters())

### 选择三：随机初始化模型全部权重，从头训练所有层

In [14]:
# model = models.resnet18(pretrained=False) # 只载入模型结构，不载入预训练权重参数

# model.fc = nn.Linear(model.fc.in_features, n_class)

# optimizer = optim.Adam(model.parameters())

## 训练配置

In [29]:
model = model.to(device)

# 交叉熵损失函数
criterion = nn.CrossEntropyLoss() 

# 训练轮次 Epoch
EPOCHS = 10

# 学习率降低策略
lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

## 函数：在训练集上训练

In [30]:
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score

In [31]:
def train_one_batch(images, labels):
    '''
    运行一个 batch 的训练，返回当前 batch 的训练日志
    '''
    
    # 获得一个 batch 的数据和标注
    images = images.to(device)
    labels = labels.to(device)
    
    outputs = model(images) # 输入模型，执行前向预测
    loss = criterion(outputs, labels) # 计算当前 batch 中，每个样本的平均交叉熵损失函数值
    
    # 优化更新权重
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    # 获取当前 batch 的标签类别和预测类别
    _, preds = torch.max(outputs, 1) # 获得当前 batch 所有图像的预测类别
    preds = preds.cpu().numpy()
    loss = loss.detach().cpu().numpy()
    outputs = outputs.detach().cpu().numpy()
    labels = labels.detach().cpu().numpy()
    
    log_train = {}
    log_train['epoch'] = epoch
    log_train['batch'] = batch_idx
    # 计算分类评估指标
    log_train['train_loss'] = loss
    log_train['train_accuracy'] = accuracy_score(labels, preds)
    # log_train['train_precision'] = precision_score(labels, preds, average='macro')
    # log_train['train_recall'] = recall_score(labels, preds, average='macro')
    # log_train['train_f1-score'] = f1_score(labels, preds, average='macro')
    
    return log_train

## 函数：在整个测试集上评估

In [32]:
def evaluate_testset():
    '''
    在整个测试集上评估，返回分类评估指标日志
    '''

    loss_list = []
    labels_list = []
    preds_list = []
    
    with torch.no_grad():
        for images, labels in test_loader: # 生成一个 batch 的数据和标注
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images) # 输入模型，执行前向预测

            # 获取整个测试集的标签类别和预测类别
            _, preds = torch.max(outputs, 1) # 获得当前 batch 所有图像的预测类别
            preds = preds.cpu().numpy()
            loss = criterion(outputs, labels) # 由 logit，计算当前 batch 中，每个样本的平均交叉熵损失函数值
            loss = loss.detach().cpu().numpy()
            outputs = outputs.detach().cpu().numpy()
            labels = labels.detach().cpu().numpy()

            loss_list.append(loss)
            labels_list.extend(labels)
            preds_list.extend(preds)
        
    log_test = {}
    log_test['epoch'] = epoch
    
    # 计算分类评估指标
    log_test['test_loss'] = np.mean(loss_list)
    log_test['test_accuracy'] = accuracy_score(labels_list, preds_list)
    log_test['test_precision'] = precision_score(labels_list, preds_list, average='macro')
    log_test['test_recall'] = recall_score(labels_list, preds_list, average='macro')
    log_test['test_f1-score'] = f1_score(labels_list, preds_list, average='macro')
    
    return log_test

## 训练开始之前，记录日志

In [33]:
epoch = 0
batch_idx = 0
best_test_accuracy = 0

In [35]:
# 训练日志-训练集
# 初始化空的 DataFrame

df_train_log = pd.DataFrame()

# 创建和更新 log_train 字典
log_train = {'epoch': 0, 'batch': 0}
images, labels = next(iter(train_loader))
log_train.update(train_one_batch(images, labels))

# 将 log_train 字典转换为 DataFrame
log_train_df = pd.DataFrame([log_train])

# 使用 pd.concat 合并 DataFrame
df_train_log = pd.concat([df_train_log, log_train_df], ignore_index=True)

In [36]:
df_train_log

Unnamed: 0,epoch,batch,train_loss,train_accuracy
0,0,0,0.7195293,0.53125


In [37]:
# 训练日志-测试集
# 初始化空的 DataFrame
df_test_log = pd.DataFrame()

# 创建和更新 log_test 字典
log_test = {'epoch': 0}
log_test.update(evaluate_testset())

# 将 log_test 字典转换为 DataFrame
log_test_df = pd.DataFrame([log_test])

# 使用 pd.concat 合并 DataFrame
df_test_log = pd.concat([df_test_log, log_test_df], ignore_index=True)

In [38]:
df_test_log

Unnamed: 0,epoch,test_loss,test_accuracy,test_precision,test_recall,test_f1-score
0,0,0.746294,0.503084,0.508355,0.503084,0.410038


## 登录wandb

1.安装 wandb：pip install wandb

2.登录 wandb：在命令行中运行wandb login

3.按提示复制粘贴API Key至命令行中

## 创建wandb可视化项目

In [41]:
import wandb

wandb.init(project='SR-malaria', name=time.strftime('%m%d%H%M%S'))

VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.011111111111111112, max=1.0…

## 运行训练

In [43]:
# 初始化 DataFrames
df_train_log = pd.DataFrame()
df_test_log = pd.DataFrame()

for epoch in range(1, EPOCHS+1):
    print(f'Epoch {epoch}/{EPOCHS}')
    
    # 训练阶段
    model.train()
    batch_idx = 0
    for images, labels in tqdm(train_loader): # 获得一个 batch 的数据和标注
        batch_idx += 1
        log_train = train_one_batch(images, labels)
        
        # 使用 pd.concat 而不是 append
        log_train_df = pd.DataFrame([log_train])
        df_train_log = pd.concat([df_train_log, log_train_df], ignore_index=True)

        wandb.log(log_train)
        
    lr_scheduler.step()

    # 测试阶段
    model.eval()
    log_test = evaluate_testset()
    
    # 使用 pd.concat 而不是 append
    log_test_df = pd.DataFrame([log_test])
    df_test_log = pd.concat([df_test_log, log_test_df], ignore_index=True)

    wandb.log(log_test)
    
    # 保存最新的最佳模型文件
    if log_test['test_accuracy'] > best_test_accuracy:
        # 删除旧的最佳模型文件(如有)
        old_best_checkpoint_path = f'checkpoint/best-{best_test_accuracy:.3f}.pth'
        if os.path.exists(old_best_checkpoint_path):
            os.remove(old_best_checkpoint_path)
        # 保存新的最佳模型文件
        best_test_accuracy = log_test['test_accuracy']
        new_best_checkpoint_path = f'checkpoint/best-{best_test_accuracy:.3f}.pth'
        torch.save(model, new_best_checkpoint_path)
        print(f'保存新的最佳模型 checkpoint/best-{best_test_accuracy:.3f}.pth')

# 将日志保存为 CSV 文件
df_train_log.to_csv('训练日志-训练集.csv', index=False)
df_test_log.to_csv('训练日志-测试集.csv', index=False)

Epoch 1/10


100%|██████████| 689/689 [01:25<00:00,  8.03it/s]


RuntimeError: Parent directory checkpoint does not exist.

## 在测试集上评价

In [28]:
# 载入最佳模型作为当前模型
model = torch.load('checkpoint/best-{:.3f}.pth'.format(best_test_accuracy))

In [29]:
model.eval()
print(evaluate_testset())

{'epoch': 30, 'test_loss': 0.26338762, 'test_accuracy': 0.8766233766233766, 'test_precision': 0.8780937321959233, 'test_recall': 0.8752925121116021, 'test_f1-score': 0.8748063404834624}


## 参考文档

https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html

https://www.bilibili.com/video/BV14J411X7Bb

https://www.bilibili.com/video/BV1w4411u7ay