In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.nn.functional as F
from torchsummary import summary

In [None]:
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2,contrast=0.2, saturation=0.2) ,
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

In [None]:
Batch_Size = 256

In [None]:
trainset = datasets.CIFAR10(root='./data', train=True,download=True, transform=transform)
testset = datasets.CIFAR10(root='./data',train=False,download=True,transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=Batch_Size,shuffle=True, num_workers=8)
testloader = torch.utils.data.DataLoader(testset, batch_size=Batch_Size,shuffle=True, num_workers=8)
classes = ('plane', 'car', 'bird', 'cat','deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:
classes = trainset.classes

In [None]:
import numpy as np
import matplotlib.pyplot as plt
plt.imshow(trainset.data[0])
im,label = next(iter(trainloader))

In [None]:
def imshow(img):
    img = img / 2 + 0.5
    img = np.transpose(img.numpy(),(1,2,0))
    plt.imshow(img)

In [None]:
imshow(im[0])
im[0].shape
plt.figure(figsize=(8,12))
imshow(torchvision.utils.make_grid(im[:32]))

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu' # 判断是否用GPU

In [None]:
class BasicBlock(nn.Module):
    expansion = 1  # 扩展因子，用于指定短路连接的维度倍数

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        # 特征提取序列，包括两个3x3卷积层、批归一化和ReLU激活函数
        self.features = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(out_channels)
        )
        
        # 短路连接，根据条件判断是否需要调整维度
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels),
            )

    def forward(self, x):
        out = self.features(x)
        out += self.shortcut(x)
        out = torch.relu(out)
        return out

In [None]:
# 测试
basic_block = BasicBlock(64, 128)
x = torch.randn(2, 64, 32, 32)
y = basic_block(x)

In [None]:
class Bottleneck(nn.Module):
    expansion = 4  # 扩展因子，用于指定短路连接的维度倍数

    def __init__(self, in_channels, zip_channels, stride=1):
        super(Bottleneck, self).__init__()
        out_channels = self.expansion * zip_channels
        # 深度残差块的特征提取序列，包括1x1、3x3、1x1卷积层、批归一化和ReLU激活函数
        self.features = nn.Sequential(
            nn.Conv2d(in_channels, zip_channels, kernel_size=1, bias=False),
            nn.BatchNorm2d(zip_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(zip_channels, zip_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(zip_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(zip_channels, out_channels, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels)
        )
        # 短路连接，根据条件判断是否需要调整维度
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.features(x)
        out += self.shortcut(x)
        out = torch.relu(out)
        return out

In [None]:
# 测试
bottleneck = Bottleneck(256, 128)
x = torch.randn(2, 256, 32, 32)
y = bottleneck(x)

In [None]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10, verbose=False):
        super(ResNet, self).__init__()
        self.verbose = verbose
        self.in_channels = 64
        
        # 初始的卷积层、批归一化和ReLU激活函数
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )

        # 构建不同层次的残差块
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)

        # 全局平均池化层和分类器
        self.avg_pool = nn.AvgPool2d(kernel_size=4)
        self.classifer = nn.Linear(512 * block.expansion, num_classes)
        
    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        
        # 构建残差块
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = self.features(x)
        if self.verbose:
            print('block 1 output: {}'.format(out.shape))
        out = self.layer1(out)        
        if self.verbose:
            print('block 2 output: {}'.format(out.shape))
        out = self.layer2(out)
        if self.verbose:
            print('block 3 output: {}'.format(out.shape))
        out = self.layer3(out)
        if self.verbose:
            print('block 4 output: {}'.format(out.shape))
        out = self.layer4(out)
        if self.verbose:
            print('block 5 output: {}'.format(out.shape))
        
        # 全局平均池化、展平和分类
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.classifer(out)
        return out

In [None]:
# 定义ResNet18模型
def ResNet18(verbose=False):
    return ResNet(BasicBlock, [2, 2, 2, 2], verbose=verbose)

# 定义ResNet34模型
def ResNet34(verbose=False):
    return ResNet(BasicBlock, [3, 4, 6, 3], verbose=verbose)

# 定义ResNet50模型
def ResNet50(verbose=False):
    return ResNet(Bottleneck, [3, 4, 6, 3], verbose=verbose)

# 定义ResNet101模型
def ResNet101(verbose=False):
    return ResNet(Bottleneck, [3, 4, 23, 3], verbose=verbose)

# 定义ResNet152模型
def ResNet152(verbose=False):
    return ResNet(Bottleneck, [3, 8, 36, 3], verbose=verbose)

# 创建ResNet18模型实例并将其移动到指定的设备上
net = ResNet18(True).to(device)


In [None]:
summary(net,(3,32,32))

In [None]:
# 测试
x = torch.randn(2, 3, 32, 32).to(device)
y = net(x)

In [None]:
# 创建ResNet34模型实例并将其移动到指定的设备上
net = ResNet34().to(device)

# 如果设备为cuda，则使用DataParallel进行多GPU训练，并启用cudnn的benchmark模式
if device == 'cuda':
    net = nn.DataParallel(net)
    torch.backends.cudnn.benchmark = True


In [None]:
import torch.optim as optim
import torch.nn as nn

# 定义优化器，损失函数和学习率调度器
optimizer = optim.SGD(net.parameters(), lr=1e-1, momentum=0.9, weight_decay=5e-4)
criterion = nn.CrossEntropyLoss()
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, verbose=True, patience=5, min_lr=0.000001)  # 动态更新学习率

# 导入时间库
import time

# 设置训练的总轮数
epoch = 50

In [None]:
#训练
import os
if not os.path.exists('./model'):
    os.makedirs('./model')
else:
    print('文件已存在')
save_path = './model/ResNet.pth'

In [None]:
from utils import train
from utils import plot_history
Acc, Loss, Lr = train(net, trainloader, testloader, epoch, optimizer, criterion, scheduler, save_path, verbose = True)

In [None]:
plot_history(epoch ,Acc, Loss, Lr)

In [None]:
# 测试代码
correct = 0  # 用于记录正确分类的图像数量
total = 0    # 用于记录总图像数量

# 清理GPU缓存，确保有足够的GPU内存进行测试
torch.cuda.empty_cache()

# 将模型设置为评估模式
net.eval()

# 在评估模式下，不计算梯度，以节省内存和加速计算
with torch.no_grad():
    # 遍历测试数据加载器中的图像和标签
    for data in testloader:
        images, labels = data
        images = images.to(device)  # 将图像移动到设备（GPU或CPU）
        labels = labels.to(device)  # 将标签移动到设备（GPU或CPU）

        # 将网络应用于输入图像，获取输出
        outputs = net(images)

        # 使用 argmax 函数获取每个图像的预测类别
        _, predicted = torch.max(outputs.data, 1)

        # 计算正确分类的图像数量
        correct += (predicted == labels).sum().item()

        # 计算总图像数量
        total += labels.size(0)

# 打印网络在测试数据集上的准确率
print('Accuracy of the network on the 10000 test images: %.2f %%' % (100 * correct / total))


In [None]:
# 定义两个列表，用于存储每个类别中测试正确的图像数量和总图像数量，初始化为0
class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))

# 将模型设置为评估模式
net.eval()

# 在评估模式下，不计算梯度，以节省内存和加速计算
with torch.no_grad():
    # 遍历测试数据加载器中的图像和标签
    for data in testloader:
        images, labels = data
        images = images.to(device)  # 将图像移动到设备（GPU或CPU）
        labels = labels.to(device)  # 将标签移动到设备（GPU或CPU）

        # 将网络应用于输入图像，获取输出
        outputs = net(images)

        # 使用 argmax 函数获取每个图像的预测类别
        _, predicted = torch.max(outputs.data, 1)

        # 检查预测是否与真实标签相匹配，并将结果转换为布尔张量
        correct = (predicted == labels).squeeze()

        # 遍历当前批次中的每个图像
        for i in range(len(images)):
            label = labels[i]  # 获取当前图像的真实标签
            class_correct[label] += correct[i].item()  # 如果预测正确，增加该类别的正确计数
            class_total[label] += 1  # 增加该类别的总计数

# 打印每个类别在测试数据集上的准确率
for i in range(10):
    accuracy = 100 * class_correct[i] / class_total[i]
    print('Accuracy of %5s : %.2f %%' % (classes[i], accuracy))


In [None]:
# 从测试数据加载器中获取一个批次的图像和标签
dataiter = iter(testloader)
images, labels = next(dataiter)

# 将图像复制一份以备后用，并将其移动到设备（GPU或CPU）
images_ = images
images_ = images_.to(device)

# 将标签移动到设备（GPU或CPU）
labels = labels.to(device)

# 使用神经网络对图像进行预测
val_output = net(images_)
_, val_preds = torch.max(val_output, 1)

# 创建一个图表，用于显示图像和预测结果
fig = plt.figure(figsize=(25,4))

# 计算当前批次中预测正确的图像数量
correct = torch.sum(val_preds == labels.data).item()

# 打印准确率
print("Accuracy Rate = {}%".format(correct / len(images) * 100))

# 创建一个图表，显示64张图像及其真实类别和预测类别
fig = plt.figure(figsize=(25,25))
for idx in np.arange(64):    
    ax = fig.add_subplot(8, 8, idx+1, xticks=[], yticks=[])
    imshow(images[idx])  # 显示图像
    # 设置图像标题，使用绿色表示预测正确，红色表示预测错误
    ax.set_title("{}, ({})".format(classes[val_preds[idx].item()], classes[labels[idx].item()]), 
                 color=("green" if val_preds[idx].item() == labels[idx].item() else "red"))


In [None]:
#模型保存
torch.save(net,save_path[:-4]+'_'+str(epoch)+'.pth')