# 神经网络实践

## MNIST

In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
import time

In [2]:
class ConvNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            nn.Flatten(),
            nn.Linear(128 * 3 * 3, 256),
            nn.ReLU(),
            nn.Dropout(),

            nn.Linear(256, 10),
        )

    def forward(self, x):
        return self.layer(x)

In [3]:
transform_train = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
])
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

B = 64
epoches = 20
lr = 1e-3

train_dataset = MNIST(root="E:/Datasets", train=True, transform=transform_train, download=True)
train_data_loader = DataLoader(dataset=train_dataset, batch_size=B, shuffle=True, num_workers=2)
test_dataset = MNIST(root="E:/Datasets", train=False, transform=transform_test, download=True)
test_data_loader = DataLoader(dataset=test_dataset, batch_size=B, shuffle=False, num_workers=2)
net = ConvNet()
print(net)

ConvNet(
  (layer): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU()
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (12): Flatten(start_dim=1, end_dim=-1)
    (13): Linear(in_features=1152, out_features=256, bias=True)
    (14): ReLU()
    (15): Dropout(p=0.5, inplace=False)
    (16):

In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net.to(device)
criterion = nn.CrossEntropyLoss()
criterion.to(device)
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=5e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3, min_lr=1e-6)

if not os.path.exists("./models"):
    os.makedirs("./models")

start_time = time.time()
max_accuracy = 0.
for epoch in range(epoches):
    net.train()
    train_accuracy = 0.
    train_loss = 0.
    train_start_time = time.time()
    correct = 0
    total = 0
    for i, data in enumerate(train_data_loader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        _, predictions = torch.max(outputs, 1)
        correct += (predictions == labels).sum().item()
        total += labels.size(0)
    train_accuracy = correct / total
    print(f"Train Epoch {epoch+1}:\n\tloss: {train_loss/len(train_data_loader)}\n\taccuracy: {train_accuracy * 100:.2f}%\n\ttime: {time.time()-train_start_time:.2f}s\n")

    net.eval()
    test_loss = 0.
    test_start_time = time.time()
    correct = 0
    total = 0
    with torch.no_grad():
        for i, data in enumerate(test_data_loader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            _, predictions = torch.max(outputs, 1)
            correct += (predictions == labels).sum().item()
            total += labels.size(0)
    test_accuracy = correct / total
    print(f"Test Epoch {epoch+1}:\n\tloss: {test_loss/len(test_data_loader)}\n\taccuracy: {test_accuracy * 100:.2f}%\n\ttime: {time.time()-test_start_time:.2f}s")

    scheduler.step(test_loss / len(test_data_loader))
    checkpoint = {
        'net': net.state_dict(),
        'epoch': epoch,
        'lr': optimizer.param_groups[0]['lr'],
    }
    if test_accuracy > max_accuracy:
        max_accuracy = test_accuracy
        torch.save(checkpoint, './models/best_model.pth')
    torch.save(checkpoint, './models/latest_model.pth')
    print(f"Model saved at epoch {epoch + 1}")
    print("-" * 50)
print(f"Training finished in {time.time() - start_time:.2f}s")
print(f'Best accuracy is {100 * max_accuracy:.2f}%')

Train Epoch 1:
	loss: 0.25313163458653637
	accuracy: 91.91%
	time: 31.87s

Test Epoch 1:
	loss: 0.0760077171318328
	accuracy: 97.40%
	time: 8.05s
Model saved at epoch 1
--------------------------------------------------
Train Epoch 2:
	loss: 0.11279274138566385
	accuracy: 96.63%
	time: 38.24s

Test Epoch 2:
	loss: 0.048405801089258814
	accuracy: 98.31%
	time: 6.86s
Model saved at epoch 2
--------------------------------------------------
Train Epoch 3:
	loss: 0.09156499265768588
	accuracy: 97.25%
	time: 29.56s

Test Epoch 3:
	loss: 0.06627883450098478
	accuracy: 97.91%
	time: 8.02s
Model saved at epoch 3
--------------------------------------------------
Train Epoch 4:
	loss: 0.08452711139928733
	accuracy: 97.49%
	time: 33.09s

Test Epoch 4:
	loss: 0.044414292755507075
	accuracy: 98.59%
	time: 6.06s
Model saved at epoch 4
--------------------------------------------------
Train Epoch 5:
	loss: 0.07512623016615269
	accuracy: 97.72%
	time: 28.74s

Test Epoch 5:
	loss: 0.06683639753431936

## ResNet 18

In [5]:
class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample # 用于尺寸不匹配时的映射
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = x
        out = self.relu(self.bn1(self.conv1(x)))
        if self.downsample is not None:
            identity = self.downsample(x)
        out = self.bn2(self.conv2(out)) + identity
        out = self.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # 主体的4个阶段，每个阶段包含若干个 BasicBlock
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        # 构建每个 stage 的 layer（由多个 block 构成
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            # 如果通道数或尺寸不同，则添加映射
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion)
            )
            layers = []
            # 第一个 block 需要带 stride 和 downsample
            layers.append(block(self.in_channels, out_channels, stride, downsample))
            self.in_channels = out_channels * block.expansion

            # 后续 block，输入输出通道一致，stride=1
            for _ in range(1, blocks):
                layers.append(block(self.in_channels, out_channels))
            return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

## 自注意力机制

In [6]:
class SingleHeadSelfAttention(nn.Module):
    def __init__(self, embed_dim):
        super(SingleHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.query = nn.Linear(embed_dim, embed_dim)
        self.key = nn.Linear(embed_dim, embed_dim)
        self.value = nn.Linear(embed_dim, embed_dim)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        # x: [batch_size, seq_len, embed_dim]
        Q = self.query(x)
        K = self.key(x)
        V = self.value(x)
        scores = Q @ K.T / (self.embed_dim ** 0.5)
        attn_weights = self.softmax(scores)
        out = torch.bmm(attn_weights, V) # [batch_size, seq_len, embed_dim]
        return out