<a href="https://colab.research.google.com/github/GzpTez0514/-/blob/main/Pytorch%E6%B7%B1%E5%BA%A6%E5%AD%A6%E4%B9%A012_%E6%A8%A1%E5%9E%8B%E8%AE%AD%E7%BB%83.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# 一、设置库，导入环境
!pip install torchinfo
import os
import torch
torch.backends.cudnn.benchmark=True # 用于加速GPU计算的代码

# 导入pytorch一个完整流程可能所需全部的包
import torchvision
from torch import nn, optim
from torch.nn import functional as F
from torchvision import transforms as T
from torchvision import models as m
from torch.utils.data import DataLoader

# 导入作为辅助工具的各类包
import matplotlib.pyplot as plt
from time import time
import datetime
import random
import numpy as np
import pandas as pd
import gc

# 设置全局的随机数种子，这些随机数种子只能提供有限的控制，并不能完全令模型稳定下来
torch.manual_seed(1412)
random.seed(1412)
np.random.seed(1412)

# GPU系统会返回true，CPU系统会返回false
torch.cuda.is_available()

# GPU系统会令device=‘gpu',cpu系统会令device='cpu'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchinfo
  Downloading torchinfo-1.7.0-py3-none-any.whl (22 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.7.0


device(type='cuda')

In [None]:
# 从本地读取文件
from google.colab import files

uploaded = files.upload()
from google.colab import files
uploaded = files.upload()
for fn in uploaded.keys():
    print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))  

Saving test_32x32.mat to test_32x32.mat


Saving train_32x32.mat to train_32x32.mat
User uploaded file "train_32x32.mat" with length 182040794 bytes


In [None]:
# 二、数据导入、数据探索、数据增强
# 通常在第一次导入图像的时候，我们不会使用数据增强的任何手段，而是直接ToTensor()导入进行查看
# 导入数据
train = torchvision.datasets.SVHN(root=r'C:\学习资料文件夹\深兰资料\数据&论文\WEEK10-WEEK14-CV数据包\datasets\SVHN',
                  split='train',
                  download=False,
                  #transform=T.ToTensor()
                  )

test = torchvision.datasets.SVHN(root=r'C:\学习资料文件夹\深兰资料\数据&论文\WEEK10-WEEK14-CV数据包\datasets\SVHN',
                  split='test',
                  download=False,
                  transform=T.ToTensor())
# 先调一张图像来看看
train[0][0]

# 检查数据量
train
test

# 查看尺寸等信息
for x, y in train:
  print(x.shape)
  print(y)
  break

# 标签类别
print(np.unique(train.labels))

# 让每个数据集随机显示5张图像
def plotsample(data):
  fig, axs = plt.subplots(1, 5, figsize=(10, 10))
  for i in range(5):
    num = random.randint(0, len(data)-1) # 选取一个随机数
    # 抽取数据中对应的图像对象，make_grid函数可将任意格式的图像通道数升为3，而不改变图像原始的数据
    npimg = torchvision.utils.make_grid(data[num][0]).numpy()
    nplabel = data[num][1] # 提取标签
    # 将图像由(3, weight, height)转化为(weight, height, 3)，并放入imshow函数中读取
    axs[i].imshow(np.transpose(npimg, (1, 2, 0)))
    axs[i].set_title(nplabel) # 给每个子图加上标签
    axs[i].axis('off') # 消除每个子图的坐标轴

plotsample(train)

In [None]:
# 定义用于处理图像的transform
# 训练集可能需要数据增强，测试集确不做数据增强，因此要分开定义
# 是否需要数据增强呢，一般一开始不会考虑增加数据增强
trainT = T.Compose([T.RandomCrop(28), # 随机裁剪
           #T.RandomRotation(degrees=[-30, 30]), # 随机旋转
           T.ToTensor(),
           T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
           # 由于是实拍数据集，使用ImageNet的均值和标准差进行归一化
testT = T.Compose([T.CenterCrop(28), 
          T.ToTensor(),
          T.Normalize(mean=[0.458, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

# 正式导入数据
train = torchvision.datasets.SVHN(root=r'C:\学习资料文件夹\深兰资料\数据&论文\WEEK10-WEEK14-CV数据包\datasets\SVHN',
                  split='train',
                  download=False,
                  transform=trainT)

test = torchvision.datasets.SVHN(root=r'C:\学习资料文件夹\深兰资料\数据&论文\WEEK10-WEEK14-CV数据包\datasets\SVHN',
                  split='test',
                  download=False,
                  transform=testT)

plotsample(train) # 查看增强后的数据

In [None]:
# 三、基于经典架构构筑自己的网络
# 基于小型数据集，首先考虑使用各个经典架构中比较浅显、但学习能力又比较强的架构
# 比如ResNet18, VGG16, Inception也可以考虑
torch.manual_seed(1412)
resnet18_ = m.resnet18()
vgg16_ = m.vgg16() # VGG本来参数量就很大，因此我个人较少用vgg16_bn
print(resnet18_)
print(vgg16_)


# 小图像尺寸意味着池化层/步长为2的卷积层出现的次数有限，惯例来说只能出现2次，最终的特征图尺寸是7x7
class MyResNet(nn.Module):
  def __init__(self):
    super().__init__()

    self.block1 = nn.Sequential(nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False),
                   resnet18_.bn1,
                   resnet18_.relu) # 删除池化层

    self.block2 = resnet18_.layer2
    self.block3 = resnet18_.layer3

    # 自适应平均池化+线性层，此处都与残差网络一致
    self.avgpool = resnet18_.avgpool
    self.fc = nn.Linear(in_features=256, out_features=10, bias=True)

  def forward(self, x):
    x = self.block1(x)
    x = self.block2(x)
    x = self.block3(x)
    x = self.avgpool(x)
    x = x.view(-1, 256)
    x = self.fc(x)
    return x

class MyVGG(nn.Module):
  def __init__(self):
    super().__init__()
    
    # 在9层之后增加一个单独的卷积层，再加入池化层，构成(卷积x2 + 池化) + (卷积x3 + 池化)
    self.features = nn.Sequential(*vgg16_.features[0:9],# 星号用于解码
                    nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2, padding=0, dilation=1, ceil_mode=False)
                    )
    
    # 进入线性层时输入通道发生变化，因此线性层需要重写
    # 输出层也需要重写
    self.avgpool = vgg16_.avgpool
    self.fc = nn.Sequential(nn.Linear(in_features=7*7*128, out_features=4096, bias=True),
                 *vgg16_.classifier[1:6],
                 nn.Linear(in_features=4096, out_features=10, bias=True))
  
  def forward(self, x):
    x = self.features(x)
    x = self.avgpool(x)
    x = x.view(-1, 7*7*128)
    x = self.fc(x)
    return x
  
from torchinfo import summary
print(summary(MyResNet(), (10, 3, 28, 28), depth=1, device='cpu'))
print(summary(MyVGG(), (10, 3, 28, 28), depth=1, device='cpu'))

# 在这个过程中，我们是从已经实例化的类中直接复制层来使用
# 因此我们复用经典架构的部分，参数已经被实例化好了
# 因此实例化具体的MyResNet()时没有参数生成
print([*MyResNet().block2[0].parameters()][0][0][0])
print([*resnet18_.layer2[0].conv1.parameters()][0][0][0])

# 没有复用经典架构的部分，则在我们实例化网络的时候才有参数
print([*resnet18_.fc.parameters()][0][0])
print([*MyResNet().fc.parameters()][0][0])

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [None]:
# 提前停止
class EarlyStopping():
  '''
  在测试集上的损失连续几个epochs不再降低的时候，提前停止
  val_loss: 测试集/验证集上这个epoch的损失

  '''
  def __init__(self, patience=5, tol=0.0005):
    '''
    patience：连续patience个epochs上损失不再降低的时候，停止迭代
    tol：阈值，当新损失与旧损失之前的差异小于tol时，认为模型不再提升

    '''
    self.patience = patience
    self.tol = tol
    self.counter = 0 # 连续x次低于tol值
    self.lowest_loss = None # 用于记录历史最低损失，在没有最低损失之前为None
    self.early_stop = False # 是否触发提前停止

  def __call__(self, val_loss):
    '''
    val_loss：外部输入的实际损失
    '''
    if self.lowest_loss == None:
      self.lowest_loss = val_loss
    elif self.lowest_loss - val_loss > self.tol:
      self.lowest_loss = val_loss
      self.counter = 0
    elif self.lowest_loss - val_loss < self.tol:
      self.counter += 1
      print(f'\t NOTICE: Early Stopping counter {self.counter} of {self.patience}')
    if self.counter >= self.patience:
      print('\t NOTICE: Early Stopping Actived')
      self.early_stop = True
    return self.early_stop


In [11]:
# 训练、测试、监控、保存权重、绘图
# 在这个函数中，我们将整合之前所写的全部内容，并将训练、测试、监控、保存权重等流程全部包含在同一个函数中

# 训练函数
sigma = torch.ones([3, 3]) + np.random.normal(size=(3, 3))
print(sigma)
yhat = torch.max(sigma, 1)
print(yhat[0])
print(yhat[1]) # [1, 1, 0]

y = torch.tensor([1, 1, 2])
print(yhat[1] == y)
print((yhat[1] == y).sum())
print(((yhat[1] == y).sum()/3), torch.float32)

tensor([[ 2.4554, -0.0274, -0.4746],
        [ 0.1598,  1.0367,  0.4442],
        [-1.1785, -0.2464,  0.0895]], dtype=torch.float64)
tensor([2.4554, 1.0367, 0.0895], dtype=torch.float64)
tensor([0, 1, 2])
tensor([False,  True,  True])
tensor(2)
tensor(0.6667) torch.float32


In [None]:
def IterOnce(net, criterion, opt, x, y):
  '''
  对模型进行一次迭代的函数

  net：实例化后的网络
  criterion：损失函数
  opt：优化算法
  x：这一个batch中所有的样本
  y：这一个batch中所有样本的真实标签
  sigma：softmax函数返回的对应类别的值

  '''
  sigma = net.forward(x)
  loss = criterion(sigma, y)
  loss.backward()
  opt.step()
  opt.zero_grad(set_to_none=True) # 比起设置梯度为0，让梯度为None会更节约内存
  yhat = torch.max(sigma, 1)[1]
  correct = torch.sum(yhat == y)
  return correct, loss


def TestOnce(net, criterion, x, y):
  '''
  对一组数据进行测试并输出测试结果的函数

  net：经过训练后的架构
  criterion：损失函数
  x：要测试的数据所在的样本
  y：要测试的数据的真实标签
  对测试，一定要阻止计算图追踪
  这样可以节省很多内存，加速运算

  '''
  with torch.no_grad():
    sigma = net.forward(x)
    loss = criterion(sigma, y)
    yhat = torch.max(sigma, 1)[1]
    correct = torch.sum(yhat == y)
  return loss, correct


def fit_test(net, batchdata, testdata, criterion, opt, epochs, tol, modelname, PATH):
  '''
  对模型进行训练，并在每个epoch后输出训练集和测试集上的准确率/损失
  以现实对模型的监控
  实现模型的保存

  net：实例化后的网络
  batchdata：使用DataLoader分割后的训练数据：
  testdata：使用DataLoader分割后的测试数据
  criterion：损失函数
  opt：优化函数
  epochs：一共使用完整数据集epochs次
  tol：提前停止时测试集上loss下降的阈值，连续5次loss下降不超过tol就会触发提前停止
  modelname：现在正在运行的模型名称，用于保存权重时作为文件名
  PATH：将权重文件保存在path目录下

  '''
  SamplePerEpoch = batchdata.dataset.__len__() # 整个epoch里有多少个样本
  allsamples = SamplePerEpoch * epochs
  trainedsamples = 0
  trainlosslist = []
  testlosslist = []
  early_stopping = EarlyStopping(tol=tol)
  highest_acc = None

  for epoch in range(epochs):
    net.train()
    correct_train = 0
    loss_train =0
    for batch_idx, (x, y) in enumerate(batchdata):
      y = y.view(x.shape[0])
      correct, loss = IterOnce(net, criterion, opt, x, y)
      trainedsamples += x.shape[0]
      loss_train += loss
      correct_train += correct
      if (batch_idx) % 125 == 0:
        # 现在进行到了那个epoch
        # 现在训练到了多少个样本
        # 总共要训练多少个样本
        # 现在训练的样本占总共需要训练样本的百分比
        print('Epoch{}:[{}/{}({:.0f}%)]'.format(epoch, 
                            trainedsamples,
                            allsamples,
                            100*trainedsamples/allsamples))
    
    TrainAccThisEpoch = float(correct_train*100)/SamplePerEpoch # 当前epoch训练样本的准确率
    TrainLossThisEpoch = float(loss_train*100)/SamplePerEpoch # 当前epoch训练样本的损失
    trainlosslist.append(TrainLossThisEpoch)

    # 每次训练完一个epoch，就在测试集上验证一下模型现在的效果
    net.eval()
    correct_test = 0
    loss_test = 0
    TestSample = testdata.dataset.__len__()

    for x,y in testdata:
      y = y.view(x.shape[0])
      correct, loss = TestOnce(net, criterion, x, y)
      loss_test += losstest
      correct_test += correct

    TestAccThisEpoch = float(correct_test*100)/TestSample
    TestLossThisEpoch = float(loss_test*100)/TestSample
    testlosslist.append(TestLossThisEpoc)

    # 对每一个epoch，打印训练和测试的结果
    # 训练集上的损失，测试集上的损失，训练集上的准确率，测试集上的准确率
    print('\t Train Loss:{:.6f}, Test Loss:{:.6f}, TrainAcc:{:.3f}%, TestAcc:{:.3f}%'.format(TrainLossThisEpoch,
                                                    TestLossThisEpoch,
                                                    TrainAccThisEpoch,
                                                    TestAccThisEpoch))
    # 如果测试集准确率出现新高/测试集loss出现新低，那我们会保存现在的这一组权重
    if highestacc == None: # 首次进行测试
      highestacc = TestAccThisEpoch
    if highestacc < TestAccThisEpoch:
      highestacc = TestAccThisEpoch
      torch.save(net.state_dict(), os.path.join(PATH， modelname + '.pt'))
      print('\t Weight Saved')
    
    # 提前停止
    early_stop = early_stopping(TestLossThisEpoch)
    if early_stop == 'True':
      break
    
  print('Complete')
  return trainlosslist, testlosslist