In [1]:
# vgg_fashion_mnist.py
"""
VGGNet - Fashion-MNIST
基于2014年Simonyan和Zisserman的论文实现
"Very Deep Convolutional Networks for Large-Scale Image Recognition"

VGG设计原则:
1. 使用非常小的(3x3)卷积滤波器
2. 网络深度增加到16-19层
3. 使用2x2最大池化，步长为2
4. 使用ReLU激活函数
5. 完全使用3x3卷积核（stride=1, padding=1保持尺寸）

本文件实现VGG-11, VGG-13, VGG-16, VGG-19
"""

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import time

torch.manual_seed(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# ==================== VGG配置 ====================
# 数字表示通道数，'M'表示MaxPool
VGG_CONFIGS = {
    'VGG11': [8, 'M', 16, 'M', 32, 32, 'M', 64, 64, 'M', 64, 64, 'M'],
    'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}

# ==================== 数据加载 ====================
def load_data(batch_size=64):
    """加载Fashion-MNIST数据集，resize到224x224"""
    transform_train = transforms.Compose([
        transforms.Resize(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.2860,), (0.3530,))
    ])

    transform_test = transforms.Compose([
        transforms.Resize(224),
        transforms.ToTensor(),
        transforms.Normalize((0.2860,), (0.3530,))
    ])

    train_dataset = datasets.FashionMNIST(
        root='./data', train=True, download=True, transform=transform_train
    )
    test_dataset = datasets.FashionMNIST(
        root='./data', train=False, download=True, transform=transform_test
    )

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

    return train_loader, test_loader

# ==================== VGG模型定义 ====================
class VGG(nn.Module):
    """
    完整的VGG实现

    VGG-16结构示例 (针对224x224输入):
    Block 1: 64, 64, MaxPool          -> 112x112x64
    Block 2: 128, 128, MaxPool        -> 56x56x128
    Block 3: 256, 256, 256, MaxPool   -> 28x28x256
    Block 4: 512, 512, 512, MaxPool   -> 14x14x512
    Block 5: 512, 512, 512, MaxPool   -> 7x7x512

    VGG-11 Modified:
    Block 1: 8, MaxPool                -> 112x112x8
    Block 2: 16, MaxPool               -> 56x56x16
    Block 3: 32, 32, MaxPool           -> 28x28x32
    Block 4: 64, 64, MaxPool           -> 14x14x64
    Block 5: 64, 64, MaxPool           -> 7x7x64

    Classifier:
    - FC: 3136 -> 

    Classifier:
    - FC: 25088 -> 4096
    - FC: 4096 -> 4096
    - FC: 4096 -> num_classes
    """
    def __init__(self, vgg_name='VGG16', num_classes=10, in_channels=1):
        super(VGG, self).__init__()

        self.vgg_name = vgg_name
        self.features = self._make_layers(VGG_CONFIGS[vgg_name], in_channels)
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))

        self.classifier = nn.Sequential(
            nn.Linear(64 * 7 * 7, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(256, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes),
        )

        self._initialize_weights()

    def _make_layers(self, config, in_channels):
        layers = []
        for v in config:
            if v == 'M':
                layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
            else:
                layers.append(nn.Conv2d(in_channels, v, kernel_size=3, padding=1))
                layers.append(nn.BatchNorm2d(v))
                layers.append(nn.ReLU(inplace=True))
                in_channels = v
        return nn.Sequential(*layers)

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# ==================== 训练函数 ====================
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for data, target in train_loader:
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()

    return running_loss / len(train_loader), 100. * correct / total

# ==================== 测试函数 ====================
def test(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)

            test_loss += loss.item()
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()

    return test_loss / len(test_loader), 100. * correct / total

# ==================== 绘图函数 ====================
def plot_results(train_losses, train_accs, test_losses, test_accs, model_name):
    epochs = range(1, len(train_losses) + 1)

    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    axes[0].plot(epochs, train_losses, 'b-', label='Train Loss', linewidth=2)
    axes[0].plot(epochs, test_losses, 'r-', label='Test Loss', linewidth=2)
    axes[0].set_xlabel('Epoch', fontsize=12)
    axes[0].set_ylabel('Loss', fontsize=12)
    axes[0].set_title(f'{model_name} - Loss Curve', fontsize=14)
    axes[0].legend(fontsize=10)
    axes[0].grid(True, alpha=0.3)

    axes[1].plot(epochs, train_accs, 'b-', label='Train Accuracy', linewidth=2)
    axes[1].plot(epochs, test_accs, 'r-', label='Test Accuracy', linewidth=2)
    axes[1].set_xlabel('Epoch', fontsize=12)
    axes[1].set_ylabel('Accuracy (%)', fontsize=12)
    axes[1].set_title(f'{model_name} - Accuracy Curve', fontsize=14)
    axes[1].legend(fontsize=10)
    axes[1].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig(f'{model_name.lower()}_results.png', dpi=150)
    plt.show()

# ==================== 主函数 ====================
def main():
    # 选择VGG变体
    vgg_name = 'VGG11'  # 可选: VGG11, VGG13, VGG16, VGG19

    batch_size = 32  # VGG参数量大，需要较小的batch size
    learning_rate = 0.0001
    num_epochs = 20

    print(f"Building {vgg_name}...")
    print("Loading Fashion-MNIST dataset (resized to 224x224)...")
    train_loader, test_loader = load_data(batch_size)
    print(f"Training samples: {len(train_loader.dataset)}")
    print(f"Test samples: {len(test_loader.dataset)}")

    model = VGG(vgg_name=vgg_name, num_classes=10, in_channels=1).to(device)

    # 打印模型结构摘要
    print(f"\n{vgg_name} Architecture:")
    print("="*50)
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Total parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")
    print("="*50)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=5e-4)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

    train_losses, train_accs = [], []
    test_losses, test_accs = [], []

    print("\n" + "="*60)
    print(f"Starting Training {vgg_name}...")
    print("="*60)

    start_time = time.time()
    best_acc = 0.0

    for epoch in range(1, num_epochs + 1):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        train_losses.append(train_loss)
        train_accs.append(train_acc)

        test_loss, test_acc = test(model, test_loader, criterion, device)
        test_losses.append(test_loss)
        test_accs.append(test_acc)

        scheduler.step()
        current_lr = optimizer.param_groups[0]['lr']

        # 保存最佳模型
        if test_acc > best_acc:
            best_acc = test_acc
            torch.save(model.state_dict(), f'{vgg_name.lower()}_best.pth')

        print(f"Epoch [{epoch:2d}/{num_epochs}] | LR: {current_lr:.6f} | "
              f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | "
              f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc:.2f}%")

    total_time = time.time() - start_time
    print("="*60)
    print(f"Training completed in {total_time:.2f} seconds")
    print(f"Best Test Accuracy: {best_acc:.2f}%")
    print("="*60)

    plot_results(train_losses, train_accs, test_losses, test_accs, vgg_name)

    torch.save(model.state_dict(), f'{vgg_name.lower()}_fashion_mnist.pth')
    print(f"Model saved to {vgg_name.lower()}_fashion_mnist.pth")

if __name__ == '__main__':
    main()

Using device: cpu
Building VGG11...
Loading Fashion-MNIST dataset (resized to 224x224)...


100%|██████████| 26.4M/26.4M [00:00<00:00, 114MB/s]
100%|██████████| 29.5k/29.5k [00:00<00:00, 4.23MB/s]
100%|██████████| 4.42M/4.42M [00:00<00:00, 66.6MB/s]
100%|██████████| 5.15k/5.15k [00:00<00:00, 13.7MB/s]


Training samples: 60000
Test samples: 10000

VGG11 Architecture:
Total parameters: 1,016,538
Trainable parameters: 1,016,538

Starting Training VGG11...




KeyboardInterrupt: 