In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
from torchvision import datasets,transforms
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
import matplotlib.pyplot as plt
import numpy as np

# 配置超参数

In [None]:
epochs = 5
num_classes = 10
lr = 1e-3
batch_size = 32

# 加载数据集

In [None]:
def load_data():
    custom_transform = {
        "train": transforms.Compose([transforms.Resize([224, 224]),
                                     transforms.ToTensor(),
                                     transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                          std=[0.229, 0.224, 0.225])]),
        "test": transforms.Compose([transforms.Resize([224, 224]), transforms.ToTensor(),
                                    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                         std=[0.229, 0.224, 0.225])])
    }
    train_ds = datasets.CIFAR10(root="./data",
                                train=True, transform=custom_transform["train"],
                                download=True)
    test_ds = datasets.CIFAR10(root="./data",
                            train=False, transform=custom_transform["test"], download=True)

    val_size = 0.2  # 设置验证集的大小
    num_train = len(train_ds)
    indices = list(range(num_train))  # 获取所有的索引

    np.random.shuffle(indices)  # 打乱索引

    # 开始划分数据集
    split = int(np.floor(val_size * num_train))
    train_idx, val_idx = indices[split:], indices[:split]

    # 使用PyTorch的SubsetRandomSampler采样器采样
    train_sampler, val_sampler = SubsetRandomSampler(
        train_idx), SubsetRandomSampler(val_idx)

    # 制作DataLoader
    train_dl = DataLoader(
        dataset=train_ds, batch_size=batch_size, sampler=train_sampler)
    val_dl = DataLoader(dataset=train_ds, batch_size=batch_size,
                        sampler=val_idx)
    test_dl = DataLoader(test_ds, batch_size)

    return train_dl, val_dl, test_dl

In [None]:
class BasicBlock(nn.Module):
    """
    基本残差块
    """
    def __init__(self, in_channels, out_channels, stride=[1,1], padding=1):
        super(BasicBlock, self).__init__()
        # 残差部分
        self.layer = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3,
                      stride=stride[0], padding=padding),
                      nn.BatchNorm2d(out_channels),
                      nn.ReLU(inplace=True),
                      nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=stride[1], padding=padding),
                      nn.BatchNorm2d(out_channels)  
        )
        #shortcut部分
        self.shortcut = nn.Sequential()
        if in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels,out_channels, kernel_size=1, stride=2, padding=0),
                nn.BatchNorm2d(out_channels)
            )
    
    def forward(self, x):
        out = self.layer(x) #残差部分
        out += self.shortcut(x) #shortcut+残差部分
        out = F.relu(out) #最后进行ReLU激活
        return out


In [None]:
class ResNet18(nn.Module):
    """
    构建残差网络
    """
    def __init__(self, num_classes):
        super(ResNet18, self).__init__()
        self.in_channels = 64
        # 第一层conv1
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64,kernel_size=7, stride=2,padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(3, 2, 1)
        )

        # 叠加若干个BasicBlock
        self.conv2 = self.make_layers(BasicBlock, [[1,1],[1,1]],64)
        self.conv3 = self.make_layers(BasicBlock, [[2,1],[1,1]],128)
        self.conv4 = self.make_layers(BasicBlock, [[2,1],[1,1]], 256)
        self.conv5 = self.make_layers(BasicBlock, [[2,1],[1,1]], 512)

        # 全连接层
        self.fc = nn.Sequential(
            nn.AdaptiveAvgPool2d((1,1)),
            nn.Flatten(),
            nn.Linear(512, num_classes)
        )
    
    def make_layers(self, block, strides, out_channels):
        """
        用于创建同一个残差块
        """
        layers = []
        # 每个conv模块都是2个BasicBlock组成
        for stride in strides:
            # 添加BasicBLock
            layers.append(block(in_channels=self.in_channels, 
            out_channels=out_channels, stride=stride))
            # 输出的维度变为下一个输入的维度
            self.in_channels = out_channels

        # 返回组成好的网络，*表示将列表打开
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        logits = self.fc(x)
        return logits       

In [None]:
model = ResNet18(num_classes)
model

# 设置优化器和损失函数

In [None]:
optimizer = optim.Adam(model.parameters(),lr=lr)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

In [None]:
def train(model, train_dl, val_dl):
    for epoch in range(epochs):
        val_acc, val_loss = 0., 0.
        val_acc_list, val_loss_list = [], []
        train_acc, train_loss = 0., 0.
        train_acc_list, train_loss_list = [], []
        total_num, total_correct = 0, 0
        model.train() # 开始训练
        for i, (features, targets) in enumerate(train_dl):
            # 将数据放到GPU上
            features = features.to(device)
            targets = targets.to(device)
            # 梯度清零
            optimizer.zero_grad()
            # 前向传播
            logits = model(features)
            # 计算损失
            loss = F.cross_entropy(logits, targets)
            # softmax分类得到类别
            probas = F.softmax(logits, dim=1)
            # 获取分类的结果
            preds = torch.max(probas, dim=1)[0]
            # 计算正确的个数
            total_correct += (targets == preds).sum().item()
            # 计算总数
            total_num += targets.size(0)
            train_loss += loss.item()
            loss.backward()
            optimizer.step()

            # 打印输出信息
            if i % 100 == 0:
                print("epoch:{}/{}, iter:{}/{}, loss:{}"
                .format(epoch+1, epochs, i, len(train_dl), loss))
        
        # 添加损失值
        train_loss_list.append(train_loss)
        # 添加准确率
        train_acc = float(total_correct) / total_num
        train_acc_list.append(train_acc)
        # 输出准确率以及损失值
        print("train_acc{: .2f}, train_loss: {: .2f}".format(train_acc, train_loss))

        # 验证
        model.eval()
        with torch.no_grad():
            total_loss = 0.  # 总损失
            total_num = 0  # 总数
            total_correct = 0  # 预测正确的个数

            for features, targets in val_dl:
                features = features.to(device)
                targets = targets.to(device)

                logits = model(features)
                # 使用softmax得到分类的类别概率
                probas = F.softmax(logits, dim=1)
                preds = torch.max(probas, dim=1)[0]

                total_correct += (preds == targets).sum().item()
                total_num += targets.size(0)

                loss = F.cross_entropy(targets, logits)
                total_loss += loss.item()
            # 添加到列表中
            val_acc = float(total_correct) / total_num
            val_acc_list.append(val_acc)
            val_loss_list.append(val_loss)
            print("val_acc{:.2f}, val_loss:{:.2f}".format(val_acc, val_loss))


In [None]:
a = torch.tensor([1,2,3])
b = torch.tensor([3,2,1])
print(type((a == b).sum()))


In [None]:
# 生成随机数测试
temp_num = torch.rand([32, 3, 32, 32]) #batch_size, channels, height, width
temp_num = temp_num.to(device)
temp_num.size()


In [None]:
out = model.forward(temp_num)
probas = F.softmax(out, dim=1) # 使用softmax得到各类别的概率


In [None]:
max_class = torch.max(probas,dim=1)
max_class

In [None]:
# 调用编写的方法进行测试
train_dl, val_dl, test_dl = load_data()
# train(model=model, train_dl=train_dl, val_dl=val_dl)


# 加载训练好的模型和参数

In [None]:
# net = torchvision.models.resnet18(pretrained=True) # 使用定义好的网络，使用预训练模型
# # 修改网络架构
# net.fc = nn.Linear(512, num_classes)