# LeNet复现实验

## 实验目的
试选用一个深度学习框架，利用MNIST数据集训练一个LeNet模型，并统计平均识别准确率

## 实验环境
PyTorch 2.4.0
CUDA 12.4

## 实验方法

### 选择深度学习框架
选择PyTorch作为本次实验的深度学习框架，主要是因为它提供了动态计算图的支持，这使得代码调试更加直观且易于理解。PyTorch拥有强大的社区支持和丰富的资源，这对于解决开发过程中遇到的问题非常有帮助。此外，PyTorch与CUDA的集成非常紧密，能够充分利用GPU加速计算，这对于拥有如3060 Laptop GPU这样支持CUDA操作的显卡的设备来说，意味着可以显著提升训练速度和效率。

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

### MNIST数据集
MNIST 数据集是一个非常著名的手写数字识别数据集，它常被用来作为机器学习和计算机视觉领域中的基准测试数据。MNIST（Modified National Institute of Standards and Technology）数据集包含60000个训练样本和10000个测试样本，每个样本都是一个28x28像素大小的灰度图像，代表了0到9之间的某个数字。`torchvision`的`datasets`内置了加载MNIST的功能，可以用`datasets.MNIST`下载、加载和预处理。在预处理中，用`transforms`将其归一化到0-1之间，并转换为张量。

数据集应该被划分为训练集、验证集、测试集三个部分。其中测试集仅用于测试模型性能，不应该参与训练，就用MNIST提供的测试集；训练集用于模型训练；验证集用于在训练时验证模型效果，调整超参，可以从MNIST训练集中抽取一部分，这里用`torch.utils.data`中的`random_split`函数分割，比例选取为8:2。

In [4]:
from torch.utils.data import random_split, DataLoader
from torchvision import datasets, transforms

# Pre-processing
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Load MNIST dataset
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# Split train dataset into train and validation
train_ratio = 0.8
train_size = int(train_ratio * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Create dataloaders
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

### LeNet
LeNet 是一个经典的卷积神经网络，其中LeNet-5由Yann LeCun等人在1998年的论文《Gradient-Based Learning Applied to Document Recognition》中提出。这个网络架构主要用于手写字符识别，特别是在识别邮政编码和银行支票上的数字方面取得了成功。根据LeNet-5的原始论文，其结构如下所述。

#### C1
第一层（C1）是一个包含6个5x5卷积核的卷积层，用以提取图像中的基本特征。输入层接收的是32x32像素的图像。这意味着C1的padding=2：

#### S2
第二层(S2)原文是：
```
The four inputs to a unit in S2 are added, then multiplied by a trainable coefficient, and then added to a trainable bias. The result is passed through a sigmoidal function.
```
这相当于先经过2x2的平均池化层，然后通过一层激活函数：
$$y^{(2)}=\sigma(wa^{(2)}+b)$$

#### C3
第三层（C3）是一个包含16个5x5卷积核的卷积层。值得注意的是每个卷积核与S2的6个特征图特征图并非都是全部连接的。具体来说：
- 前六个C3特征图从S2层的每三个连续特征图中获取输入。
- 接下来的六个C3特征图从S2层的每四个连续特征图中获取输入。
- 再接下来的三个C3特征图从S2层的一些不连续的四个特征图中获取输入。
- 最后一个C3特征图从所有的S2特征图中获取输入。
卷积核对每个相连的特征图分通道卷积，然后将所有通道按元素相加，最终输出16个10x10的特征图。

这需要我们定义新的神经网络层继承自`nn.Module`：

In [5]:
class LeNetC3(nn.Module):
    def __init__(self, num_s2_feature_maps=6, out_channels=16, kernel_size=(5, 5), stride=1, padding=0):
        super(LeNetC3, self).__init__()

        self.connections = [
                [(i + j) % num_s2_feature_maps for j in range(3)] for i in range(num_s2_feature_maps) # 0-5
            ] + [
                [(i + j) % num_s2_feature_maps for j in range(4)] for i in range(num_s2_feature_maps) # 6-11
            ] + [
                [0, 1, 3, 4], [1, 2, 4, 5], [0, 2, 3, 5] # 12-14
            ] + [
                list(range(num_s2_feature_maps)) # 15
            ]
        
        self.conv_layers = nn.ModuleList()
        for i in range(out_channels):
            conv = nn.Conv2d(len(self.connections[i]), 1, kernel_size=kernel_size, stride=stride, padding=padding)
            self.conv_layers.append(conv)

    def forward(self, x):
        outputs = []
        for i, conv in enumerate(self.conv_layers):
            indices = self.connections[i]
            conv_input = x[:, indices, :, :]
            conv_output = conv(conv_input)
            outputs.append(conv_output)
        return torch.cat(outputs, dim=1)

#### S4
第四层（S4）再次执行与S2相似的子采样，等价于2x2的平均池化层再通过激活函数层，进一步压缩空间信息，输出16个5x5特征图。

#### C5
第五层（C5）是一个包含120个5x5卷积核的卷积层，每个卷积核和S4的16个特征图全有连接，输出120个1x1特征图，相当于一个120维向量。

#### F6
第六层（F6）是一个宽度84的全连接层，其激活函数为：
$$f(a)=A\tanh(Sa)$$
其中作者取$A=1.7159, S=0.6667$以满足$f(1)=1,f(-1)=-1$

#### OUTPUT
输出层（OUTPUT）是一个宽度10的径向激活函数（RBF）全连接层：
$$\phi_i(y^{(6)})=\exp(-\frac{\sum_{j}\|y_j^{(6)}-w_{ij}\|^2}{\sigma_i^2})$$
我们参考[torchrbf](https://github.com/ArmanMaesumi/torchrbf)中的RBF层：

In [6]:
class RBF(nn.Module):
    """
    Transforms incoming data using a given radial basis function:
    u_{i} = rbf(||x - c_{i}|| / s_{i})

    Arguments:
        in_features: size of each input sample
        out_features: size of each output sample

    Shape:
        - Input: (N, in_features) where N is an arbitrary batch size
        - Output: (N, out_features) where N is an arbitrary batch size

    Attributes:
        centres: the learnable centres of shape (out_features, in_features).
            The values are initialised from a standard normal distribution.
            Normalising inputs to have mean 0 and standard deviation 1 is
            recommended.
        
        log_sigmas: logarithm of the learnable scaling factors of shape (out_features).
        
        basis_func: the radial basis function used to transform the scaled
            distances.
    """

    def __init__(self, in_features, out_features, basis_func):
        super(RBF, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.centres = nn.Parameter(torch.Tensor(out_features, in_features))
        self.log_sigmas = nn.Parameter(torch.Tensor(out_features))
        self.basis_func = basis_func
        self.reset_parameters()

    def reset_parameters(self):
        nn.init.normal_(self.centres, 0, 1)
        nn.init.constant_(self.log_sigmas, 0)

    def forward(self, input):
        size = (input.size(0), self.out_features, self.in_features)
        x = input.unsqueeze(1).expand(size)
        c = self.centres.unsqueeze(0).expand(size)
        distances = (x - c).pow(2).sum(-1).pow(0.5) / torch.exp(self.log_sigmas).unsqueeze(0)
        return self.basis_func(distances)

综上，LeNet的结构可以表示为：

In [7]:
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        # C1 Convolution Layer
        self.conv1 = nn.Conv2d(1, 6, 5, padding=2)  # Input channels=1 for grayscale, output channels=6, kernel size=5x5
        # S2 Subsampling Layer (Pooling)
        self.pool = nn.AvgPool2d(2, 2)  # 2x2 average pooling
        self.w1 = nn.Parameter(torch.randn(1, 6, 1, 1))
        self.b1 = nn.Parameter(torch.randn(1, 6, 1, 1))
        # C3 Convolution Layer with custom connections
        self.conv2 = LeNetC3(num_s2_feature_maps=6, out_channels=16, kernel_size=(5, 5))
        # S4 Subsampling Layer
        self.pool = nn.AvgPool2d(2, 2)
        self.w2 = nn.Parameter(torch.randn(1, 16, 1, 1))
        self.b2 = nn.Parameter(torch.randn(1, 16, 1, 1))
        # C5 Convolution Layer
        self.conv3 = nn.Conv2d(16, 120, 5)  # Input channels=16 from S4, output channels=120, kernel size=5x5
        # F6 Fully Connected Layer
        self.fc1 = nn.Linear(120, 84)
        self.A = 1.7159
        self.S = 2/3
        # Output Layer
        self.fc2 = RBF(84, 10, lambda x: torch.exp(-x**2))

    def forward(self, x):
        x = self.conv1(x)
        x = self.pool(x)
        x = self.w1 * x + self.b1
        x = F.sigmoid(x)
        x = self.conv2(x)
        x = self.pool(x)
        x = self.w2 * x + self.b2
        x = F.sigmoid(x)
        x = self.conv3(x)
        x = x.view(-1, 120)  # Flatten the tensor for the fully connected layer
        x = self.fc1(x)
        x = self.A * F.tanh(self.S * x)
        x = self.fc2(x)
        return x

### 训练


In [10]:
# Define the device to use
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate the model
model = LeNet().to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Initialize TensorBoard writer
writer = SummaryWriter(log_dir='logs')

# Training loop
num_epochs = 10  # Number of epochs to train the model
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Print statistics
        running_loss += loss.item()
        if i % 100 == 99:  # print every 100 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 100:.3f}')
            running_loss = 0.0

    # Log the scalar values to TensorBoard
    writer.add_scalar('Training Loss', loss.item(), epoch)

print('Finished Training')

# Close the writer
writer.close()

[1,   100] loss: 2.303
[1,   200] loss: 2.303


KeyboardInterrupt: 