# 1. 加载必要的库

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets,transforms

# 2. 定义超参数

In [3]:
BTACH_SIZE = 128 #每批处理的数据
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") #选择GPU or CPU训练
EPOCHS = 100 #训练的轮次

# 3. 构建pipeline，对图像做处理（transforms）

In [5]:
pipeline = transforms.Compose([
    transforms.ToTensor(), #将图片转化为tensor
    transforms.Normalize((0.1307,),(0.3081,)) #正则化，过拟合时候降低复杂度
])

# 4. 下载，加载数据集

In [6]:
from torch.utils.data import DataLoader
#下载数据集
train_set = datasets.MNIST("data",train=True,download=True,transform=pipeline)
test_set = datasets.MNIST("data",train=False,download=True,transform=pipeline)
#加载数据
train_loader = DataLoader(train_set,batch_size=BTACH_SIZE,shuffle=True)

test_loader = DataLoader(test_set,batch_size=BTACH_SIZE,shuffle=True)

# 5. 构建网络模型

In [7]:
class Digit(nn.Module):  # 继承父类
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 10, 5)  # 二维卷积、输入通道，输出通道，5*5 kernel
        self.conv2 = nn.Conv2d(10, 20, 3)
        self.fc1 = nn.Linear(20 * 10 * 10, 500)  # 全连接层，输入通道， 输出通道
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):  # 前馈
        input_size = x.size(0)  # 得到batch_size
        x = self.conv1(x)  # 输入：batch*1*28*28, 输出：batch*10*24*24(28-5+1)
        x = F.relu(x)  # 使表达能力更强大的激活函数, 输出batch*10*24*24
        x = F.max_pool2d(x, 2, 2)  # 最大池化层，输入batch*10*24*24，输出batch*10*12*12

        x = self.conv2(x)  # 输入batch*10*12*12，输出batch*20*10*10
        x = F.relu(x)

        x = x.view(input_size, -1)  # 拉平， 自动计算维度，20*10*10= 2000

        x = self.fc1(x)  # 输入batch*2000,输出batch*500
        x = F.relu(x)

        x = self.fc2(x)  # 输入batch*500 输出batch*10

        output = F.log_softmax(x, dim=1)  # 计算分类后每个数字的概率值

        return output

# 6. 定义优化器

In [8]:
model = Digit().to(DEVICE)
optimizer = optim.Adam(model.parameters())

# 7. 定义训练方法

In [9]:
def train_model(model,device,train_loader,optimizer,epoch):
    #训练模型
    model.train()
    for batch_index,(data,target) in enumerate(train_loader):
        #部署到DEVICE上去
        data,target = data.to(device),target.to(device)
        #初始化梯度为0
        optimizer.zero_grad()
        #训练后的结果
        output = model(data)
        #计算损失
        loss = F.cross_entropy(output,target)
        #找到概率最大的下表作为预测值
        pred = output.max(1,keepdim=True)
        #反向传播
        loss.backward()
        #参数优化
        optimizer.step()
        if batch_index % 3000 == 0: #每3000下打印一次
            print("Train Epoch:{}\t Loss:{:.6f}".format(epoch,loss.item()))

# 8. 定义测试方法

In [10]:
def test_model(model, device, test_loader):
    #模型验证
    model.eval()
    #统计正确率
    correct = 0.0
    #测试损失
    test_loss = 0.0
    with torch.no_grad():    # 不计算梯度，不反向传播
        for data, label in test_loader:
            data, label = data.to(device), label.to(device)
            #测试数据
            output = model(data)
            #计算测试损失
            test_loss += F.cross_entropy(output, label).item()
            #找到概率值最大的下标
            pred = output.argmax(dim=1)
            #累计正确率
            correct += pred.eq(label.view_as(pred)).sum().item()
        test_loss /= len(test_loader.dataset)
        print("Test —— Average loss : {:.4f}, Accuracy : {:.3f}\n".format(test_loss, 100.0 * correct / len(test_loader.dataset)))

# 9. 循环调用方法7，8，检查效果

In [None]:
for epoch in range(1,EPOCHS + 1):
    train_model(model,DEVICE,train_loader,optimizer,epoch)
    test_model(model,DEVICE,test_loader)