# Info
RiJoshin, 2400013201;

人工智能基础第三次作业的实现，为方便图像显示与分模块修改

起于2025-04-03


## 第二课作业
用pytorch实现卷积神经网络，对cifar10数据集进行分类
要求:
1. 使用pytorch的nn.Module和Conv2d等相关的API实现卷积神经网络
2. 使用pytorch的DataLoader和Dataset等相关的API实现数据集的加载
3. 修改网络结构和参数，观察训练效果
4. 使用数据增强，提高模型的泛化能力



In [None]:
# import necessary packages
import os
import torch
import torchvision

from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision import transforms

from tqdm import tqdm
# plt
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import datetime
import random
import string
import numpy as np

## 超参数

In [None]:
# 定义超参数
hyperparameters = {
    'batch_size': 128,
    'learning_rate': 1e-4,
    'num_epochs': 100,
    'early_stop_patience': 15
}
EARLY_STOP = True
DEBUG = False


## 数据裁剪

In [None]:

# 自定义高斯噪声变换
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=0.1, p=0.5):
        self.mean = mean
        self.std = std
        self.p = p

    def __call__(self, tensor):
        if np.random.rand() < self.p:
            return tensor + torch.randn(tensor.size()) * self.std + self.mean
        return tensor

    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1}, p={2})'.format(self.mean, self.std, self.p)


test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)) # 归一化
])

# 数据增强的数据预处理方式
train_transform = transforms.Compose([
    # transforms.RandomResizedCrop(32, scale=(0.8, 1.0)),  # 随机缩放裁剪
    transforms.RandomHorizontalFlip(p=0.5),
    # transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),  # 颜色扰动
    # transforms.RandomRotation(15),  # 小幅旋转
    transforms.ToTensor(),
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.1), ratio=(0.3, 3.3)),  # 随机擦除
    # AddGaussianNoise(mean=0., std=0.1, p=0.5),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))  # 归一化
])


## 可视化训练过程

In [None]:

def visualize_training_progress(train_acc_list, train_loss_list, test_acc_list,
                                hyperparameters=None, model_architecture=None, 
                                description="", save_dir="training_logs", 
                                show_plot=True):
    """
    可视化训练过程的准确率和损失，并自动保存图表和相关参数。
    
    参数:
    - train_acc_list: 每个epoch的训练准确率列表
    - train_loss_list: 每个epoch的训练损失列表
    - test_acc_list: 每个epoch的测试准确率列表
    - hyperparameters: 超参数字典
    - model_architecture: 模型架构描述
    - description: 实验描述
    - save_dir: 保存图表和日志的目录
    - show_plot: 是否显示图表
    """
    
    # 创建保存目录
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    # 生成唯一的文件名
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    random_str = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
    file_prefix = f"{timestamp}_{random_str}"
    
    # 创建图表
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), constrained_layout=True)
    
    # 绘制准确率
    epochs = np.arange(1, len(train_acc_list) + 1)
    ax1.plot(epochs, train_acc_list, 'b-', label='Training Accuracy', linewidth=2)
    ax1.plot(epochs, test_acc_list, 'r--', label='Validation Accuracy', linewidth=2)
    ax1.set_title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
    ax1.set_xlabel('Epochs', fontsize=12)
    ax1.set_ylabel('Accuracy', fontsize=12)
    ax1.legend(loc='best')
    ax1.grid(True, linestyle='--', alpha=0.7)
    ax1.xaxis.set_major_locator(MaxNLocator(integer=True))
    
    # 绘制损失
    ax2.plot(epochs, train_loss_list, 'g-', label='Training Loss', linewidth=2)
    ax2.set_title('Training Loss', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Epochs', fontsize=12)
    ax2.set_ylabel('Loss', fontsize=12)
    ax2.legend(loc='best')
    ax2.grid(True, linestyle='--', alpha=0.7)
    ax2.xaxis.set_major_locator(MaxNLocator(integer=True))
    
    # 添加描述信息
    fig.suptitle(f'Training Progress - {description}', fontsize=16, fontweight='bold')
    
    # 保存图表
    plot_filename = os.path.join(save_dir, f"{file_prefix}_training_progress.png")
    plt.savefig(plot_filename, dpi=300, bbox_inches='tight')
    
    # 保存超参数和模型架构
    info_filename = os.path.join(save_dir, f"{file_prefix}_training_info.txt")
    with open(info_filename, 'w') as f:
        f.write(f"Training Progress Summary - {description}\n")
        f.write(f"Timestamp: {timestamp}\n\n")
        
        if hyperparameters:
            f.write("Hyperparameters:\n")
            for key, value in hyperparameters.items():
                f.write(f"  {key}: {value}\n")
            f.write("\n")
        
        if model_architecture:
            f.write("Model Architecture:\n")
            f.write(f"{model_architecture}\n")
            f.write("\n")
        
        f.write("Training Accuracy: \n")
        f.write(f"{train_acc_list}\n\n")
        
        f.write("Training Loss: \n")
        f.write(f"{train_loss_list}\n\n")
        
        f.write("Validation Accuracy: \n")
        f.write(f"{test_acc_list}\n\n")

    
    if show_plot:
        plt.show()
    
    print(f"Training progress visualization saved to: {plot_filename}")
    print(f"Training information saved to: {info_filename}")
    # return plot_filename, info_filename

## load data

In [None]:

# 定义数据集
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=train_transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=test_transform)

# 定义数据加载器
train_loader = DataLoader(train_dataset, batch_size=hyperparameters['batch_size'], shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=hyperparameters['batch_size'], shuffle=False)

In [None]:

# train
# Mean: tensor([-0.3301, -0.3376, -0.3116])
# Std: tensor([1.3983, 1.3937, 1.4152])

# test
# Mean: tensor([0.0139, 0.0147, 0.0194])
# Std: tensor([1.2192, 1.2181, 1.3015])

# # 初始化变量
# mean = 0.0
# std = 0.0

# # 计算均值
# for images, _ in test_loader:
#     batch_samples = images.size(0)  # 当前批次的样本数量
#     images = images.view(batch_samples, images.size(1), -1)  # 展平每个通道
#     mean += images.mean(2).sum(0)  # 按通道计算均值并累加

# mean = mean / len(test_loader.dataset)  # 计算全局均值

# # 计算标准差
# for images, _ in test_loader:
#     batch_samples = images.size(0)
#     images = images.view(batch_samples, images.size(1), -1)
#     std += ((images - mean.unsqueeze(1)) ** 2).sum([0, 2])  # 按通道计算方差并累加

# std = torch.sqrt(std / (len(test_loader.dataset) * 32 * 32))  # 计算全局标准差

# print("Mean:", mean)
# print("Std:", std)

## Models


### Normal My Structure

In [None]:

# 定义模型
class Net(nn.Module):
    '''
    定义卷积神经网络,3个卷积层,2个全连接层
    '''
    def __init__(self, height, width):
        super(Net, self).__init__()

        self.image_height = height
        self.image_width = width

        # 先写死，后面改成根据输入参数构建网络维度的 TODO 最多5层
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1), # 3, H, W -> 32, H, W
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True), 
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True), 
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1), # 32, H/2, W/2 -> 64, H/2, W/2
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True), 
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1), 
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True), 
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),

            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1), # 64, H/4, W/4 -> 128, H/4, W/4
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True), 
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True), 
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1), # 128, H/8, W/8 -> 256, H/8, W/8
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True), 
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True), 
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),

            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1), # 256, H/16, W/16 -> 512, H/16, W/16
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True), 
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True), 
            nn.AvgPool2d(kernel_size=2, stride=2, padding=0),
        )

        self.fc = nn.Sequential(
            nn.Linear(512 * (height // 32) * (width // 32), 4096), 
            nn.ReLU(inplace=True), 
            nn.Dropout(),
            nn.Linear(4096, 10),
            nn.Softmax(dim=1) # 10 -> 10
        )
    
    def forward(self, x: torch.Tensor):
        '''
        Args:
            x: tensor, shape [batch_size, 3, H, W]
        '''
        res = self.conv(x)
        if DEBUG:
            print("conv output shape: ", res.shape)
        res = res.view(res.size(0), -1)
        if DEBUG:
            print("conv flatten output shape: ", res.shape)
        res = self.fc(res)
        return res

### VGG16

In [None]:

class VGG16(nn.Module):
    def __init__(self, num_classes=10):
        super(VGG16, self).__init__()
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.Linear(512 * 1 * 1, 2048),  # 输入维度调整为 512*1*1，因为 CIFAR-10 的图像大小是 32x32
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            # nn.Linear(1024, 4096),
            # nn.ReLU(inplace=True),
            # nn.Dropout(0.5),
            nn.Linear(2048, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

### ResNet for fun

In [None]:

# ResNet
class BasicBlockForResNet(nn.Module):
    expansion = 1 # in_channel = expansion * out_channel
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlockForResNet, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels) # !!!!!! TODO important
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels) 

        self.shortcut = nn.Sequential() 
        if stride != 1 or in_channels != self.expansion * out_channels: # TODO
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels,  self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )
        
        self.relu = nn.ReLU(inplace=True) # inplace=True, 直接在原来的内存上进行操作，节省内存开销

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x))) 
        out = self.bn2(self.conv2(out)) 
        out += self.shortcut(x) # 残差连接
        out = self.relu(out)
        return out

class ResNetTest(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNetTest, self).__init__()


        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)

        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))

        self.fc = nn.Linear(512 * block.expansion, num_classes)

        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU(inplace=True)

        # Kaiming初始化 # TODO 初始化，来自PPT    
        # 权重初始化
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
    
    def _make_layer(self, block, out_channels, num_blocks, stride):
        # 构建多个残差块
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x))) 
        out = self.layer1(out) 
        out = self.layer2(out) 
        out = self.layer3(out) 
        out = self.layer4(out)  
        out = self.avg_pool(out) 
        out = out.view(out.size(0), -1) 
        out = self.dropout(out)
        out = self.fc(out)
        return out

### Model的实例化

In [None]:

# 实例化模型
model = VGG16()
# model = ResNetTest(BasicBlockForResNet, [2, 2, 2, 2])


## Device

In [None]:

use_mlu = False # 爱了，喜欢这个判断
try:
    use_mlu = torch.mlu.is_available()
except:
    use_mlu = False

if use_mlu:
    device = torch.device('mlu:0')
else:
    if torch.cuda.is_available():
        device = torch.device('cuda:0')
    else:
        device = torch.device('cpu')
    print(f'MLU is not available, use {device} instead.')

model = model.to(device)

# init model
def weights_init(m):
    if isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)
    elif isinstance(m, nn.BatchNorm2d):
        nn.init.constant_(m.weight, 1)
        nn.init.constant_(m.bias, 0)
    elif isinstance(m, nn.Linear):
        nn.init.normal_(m.weight, 0, 0.01)
        nn.init.constant_(m.bias, 0)
model.apply(weights_init)

## 选择lossFunction 以及 optimizer

In [None]:

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hyperparameters['learning_rate'], weight_decay=5e-4) ## TODO


## Training!

### train function


In [None]:
def train_model(epoch):
    '''
    Args:
        epoch (int): 当前epoch次数
    Returns:
        并非list
        accuracie (Tensor): 本轮最后次统计的正确率
        losses     (Tensor): 同, loss
    '''
    # 训练模式
    model.train()
    accuracies = None
    losses = None
    # 使用 tqdm 包装数据加载器，显示训练进度
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{hyperparameters['num_epochs']}", unit="batch")
    
    for i, (images, labels) in enumerate(progress_bar):

        images = images.to(device)
        labels = labels.to(device)

        # 前向传播
        outputs = model(images)
        loss = criterion(outputs, labels)

        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        accuracy = (outputs.argmax(1) == labels).float().mean()

        # 打印训练信息
        if (i + 1) % 100 == 0:
            accuracies = accuracy.item()
            losses = loss.item()
            # 更新进度条信息
            progress_bar.set_postfix({
                "Loss": f"{loss.item():.4f}",
                "Accuracy": f"{accuracy.item():.4f}"
            })

    return accuracies, losses
        

### test function

In [None]:
def test_model(epoch):
    '''
    没有传参数model, 把model当全局变量用了, 这样写的少些(其实是为了改回py文件时候好改, 只用复制粘贴就行, 不想起新名字折腾变量声明域了)
    Returns:
        test_acc (double): 测试正确率
    '''
    # 测试模式
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        test_acc = correct / total

        print( f'    Epoch {epoch + 1}/{ hyperparameters["num_epochs"] } TestAccuracy: {test_acc:.2f}' )
        return test_acc
        # test_log = 'Test Accuracy of the model on the 10000 test images: {} %'.format(test_acc)
        # print(test_log)

### train的主循环

In [None]:

best_accuracy = 0.0
counter = 0


epoch_train_acc_list = []
epoch_train_loss_list = []
epoch_test_acc_list = []
# 训练模型
for epoch in range(hyperparameters['num_epochs']):
    train_acc, train_loss = train_model(epoch)
    test_acc = test_model(epoch)

    # early stopping
    if EARLY_STOP:
        if (test_acc > best_accuracy):
            best_accuracy = test_acc
            counter = 0
        else:
            counter += 1
            print(f"Early stopping counter: {counter}")
            if counter > hyperparameters['early_stop_patience']:
                print("Early stopping")
                break
    
    epoch_train_acc_list.append(train_acc)
    epoch_train_loss_list.append(train_loss)
    epoch_test_acc_list.append(test_acc)


message = input("Please type some information here as mdescription:\n    ")
visualize_training_progress(
        train_acc_list=epoch_train_acc_list,
        train_loss_list=epoch_train_loss_list,
        test_acc_list=epoch_test_acc_list,
        hyperparameters=hyperparameters,
        model_architecture=str(model),
        description=message,
        save_dir="training_logs"
)
    