# 任务1：
微调在ImageNet上预训练的卷积神经网络实现Caltech-101分类

基本要求：

(1) 训练集测试集按照 [Caltech-101]( https://data.caltech.edu/records/mzrjq-6wc02) 标准；
(2) 修改现有的 CNN 架构（如AlexNet，ResNet-18）用于 Caltech-101 识别，通过将其输出层大小设置为 101 以适应数据集中的类别数量，其余层使用在ImageNet上预训练得到的网络参数进行初始化；
(3) 在 Caltech-101 数据集上从零开始训练新的输出层，并对其余参数使用较小的学习率进行微调；
(4) 观察不同的超参数，如训练步数、学习率，及其不同组合带来的影响，并尽可能提升模型性能；
(5) 与仅使用 Caltech-101 数据集从随机初始化的网络参数开始训练得到的结果 进行对比，观察预训练带来的提升

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import os
import shutil
from sklearn.model_selection import train_test_split
from torch.utils.tensorboard import SummaryWriter
import itertools

## 数据预处理，划分训练集、验证集和测试集

In [2]:


# 原始数据集路径
original_data_dir = "101_ObjectCategories"
# 输出路径
output_dir = "caltech101_split"
train_dir = os.path.join(output_dir, "train")
val_dir = os.path.join(output_dir, "val")
test_dir = os.path.join(output_dir, "test")

# 创建输出文件夹
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# 遍历每个类别文件夹
for category in os.listdir(original_data_dir):
    if category == 'BACKGROUND_Google':
        continue  # 跳过背景文件夹
    # 获取当前类别的路径
    category_path = os.path.join(original_data_dir, category)
    if os.path.isdir(category_path):
        # 获取当前类别的所有图片路径
        images = [os.path.join(category_path, img) for img in os.listdir(category_path) if img.endswith(('.jpg', '.png'))]
        
        # 按 70% 训练，15% 验证，15% 测试划分
        train_images, temp_images = train_test_split(images, test_size=0.3, random_state=42)
        val_images, test_images = train_test_split(temp_images, test_size=0.5, random_state=42)

        # 创建类别文件夹
        os.makedirs(os.path.join(train_dir, category), exist_ok=True)
        os.makedirs(os.path.join(val_dir, category), exist_ok=True)
        os.makedirs(os.path.join(test_dir, category), exist_ok=True)

        # 移动图片到对应文件夹
        for img in train_images:
            shutil.copy(img, os.path.join(train_dir, category))
        for img in val_images:
            shutil.copy(img, os.path.join(val_dir, category))
        for img in test_images:
            shutil.copy(img, os.path.join(test_dir, category))

print("数据集划分完成！")

数据集划分完成！


In [3]:
data_dir = "caltech101_split"

In [4]:

# Step 1: Data Preparation
batch_size = 32
image_size = 224

data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((image_size, image_size)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((image_size, image_size)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

train_dataset = datasets.ImageFolder(root=f"{data_dir}/train", transform=data_transforms['train'])
val_dataset = datasets.ImageFolder(root=f"{data_dir}/val", transform=data_transforms['val'])
test_dataset = datasets.ImageFolder(root=f"{data_dir}/test", transform=data_transforms['val'])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

## Load Pretrained Model：ResNet-18
微调并进行超参数选择

In [7]:
# 超参数列表
learning_rates = [0.01, 0.001]
momentums = [0.9, 0.95]
num_epochs_list = [10]


# 遍历所有超参数组合
for lr, momentum, num_epochs in itertools.product(learning_rates, momentums, num_epochs_list):
    print(f"实验: lr={lr}, momentum={momentum}, num_epochs={num_epochs}")
    # 模型和优化器
    # Step 2: Load Pretrained Model
    model = models.resnet18(pretrained=True)  # 加载 ResNet-18
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, 101)  # 修改最后一层为 101 类
    print(num_ftrs)
    # Step 3: Freeze Parameters (微调其余参数)
    for param in model.parameters():
        param.requires_grad = True  # 解冻所有参数

    # 仅为最后一层设置较大的学习率
    # Step 4: Define Loss and Optimizer
    optimizer = optim.SGD([
        {'params': model.fc.parameters(), 'lr': lr},  # 最后一层使用较大的学习率
        {'params': [param for name, param in model.named_parameters() if "fc" not in name], 'lr': 0.0001}  # 其余层使用较小的学习率
    ], momentum=momentum)

    criterion = nn.CrossEntropyLoss()
        # Step 5: Training Loop
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    print(device)

        # ...existing code...
    writer = SummaryWriter(f"runs/resnet/exp_lr{lr}_mom{momentum}_ep{num_epochs}")


    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        train_loss = running_loss / len(train_loader)
        writer.add_scalar('Loss/train', train_loss, epoch)

        # 验证集 loss 和 accuracy
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, preds = torch.max(outputs, 1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
        val_loss = val_loss / len(val_loader)
        val_acc = correct / total

        writer.add_scalar('Loss/val', val_loss, epoch)
        writer.add_scalar('Accuracy/val', val_acc, epoch)

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    writer.close()



实验: lr=0.01, momentum=0.9, num_epochs=10
512
cuda:0
Epoch 1/10, Train Loss: 1.4283, Val Loss: 0.4701, Val Acc: 0.8932
Epoch 2/10, Train Loss: 0.3494, Val Loss: 0.2985, Val Acc: 0.9262
Epoch 3/10, Train Loss: 0.2073, Val Loss: 0.2475, Val Acc: 0.9400
Epoch 4/10, Train Loss: 0.1535, Val Loss: 0.2300, Val Acc: 0.9385
Epoch 5/10, Train Loss: 0.1137, Val Loss: 0.2120, Val Acc: 0.9416
Epoch 6/10, Train Loss: 0.0829, Val Loss: 0.1989, Val Acc: 0.9408
Epoch 7/10, Train Loss: 0.0664, Val Loss: 0.1857, Val Acc: 0.9462
Epoch 8/10, Train Loss: 0.0604, Val Loss: 0.1884, Val Acc: 0.9508
Epoch 9/10, Train Loss: 0.0494, Val Loss: 0.1814, Val Acc: 0.9454
Epoch 10/10, Train Loss: 0.0432, Val Loss: 0.1886, Val Acc: 0.9439
实验: lr=0.01, momentum=0.95, num_epochs=10
512
cuda:0
Epoch 1/10, Train Loss: 1.2438, Val Loss: 0.3966, Val Acc: 0.8978
Epoch 2/10, Train Loss: 0.2305, Val Loss: 0.2375, Val Acc: 0.9347
Epoch 3/10, Train Loss: 0.1347, Val Loss: 0.2432, Val Acc: 0.9354
Epoch 4/10, Train Loss: 0.0726, Val 

### 随机初始化

In [11]:
# Step 7: Compare with Random Initialization

# 重新初始化模型（不加载预训练权重）
random_model = models.resnet18(pretrained=False)  # 不加载预训练权重
num_ftrs = random_model.fc.in_features
random_model.fc = nn.Linear(num_ftrs, 101)  # 修改最后一层为 101 类

# 定义损失函数和优化器
lr = 0.01
momentum = 0.9
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(random_model.parameters(), lr=lr, momentum=momentum)

# 将模型移动到设备
random_model = random_model.to(device)

# 训练循环
num_epochs = 10
writer = SummaryWriter(f"runs/resnet_random/exp_lr{lr}_mom{momentum}_ep{num_epochs}")
for epoch in range(num_epochs):
    random_model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = random_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    train_loss = running_loss / len(train_loader)
    writer.add_scalar('Loss/train', train_loss, epoch)
# 验证集 loss 和 accuracy
    random_model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = random_model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    val_loss = val_loss / len(val_loader)
    val_acc = correct / total

    writer.add_scalar('Loss/val', val_loss, epoch)
    writer.add_scalar('Accuracy/val', val_acc, epoch)

    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    writer.close()

Epoch 1/10, Train Loss: 3.6639, Val Loss: 4.2416, Val Acc: 0.2475
Epoch 2/10, Train Loss: 2.8497, Val Loss: 2.6851, Val Acc: 0.4174
Epoch 3/10, Train Loss: 2.3866, Val Loss: 2.5757, Val Acc: 0.4427
Epoch 4/10, Train Loss: 2.0714, Val Loss: 2.2823, Val Acc: 0.4804
Epoch 5/10, Train Loss: 1.8297, Val Loss: 2.0167, Val Acc: 0.5250
Epoch 6/10, Train Loss: 1.6034, Val Loss: 2.0346, Val Acc: 0.5204
Epoch 7/10, Train Loss: 1.3896, Val Loss: 1.7018, Val Acc: 0.5757
Epoch 8/10, Train Loss: 1.1804, Val Loss: 1.8173, Val Acc: 0.5780
Epoch 9/10, Train Loss: 1.0334, Val Loss: 1.6412, Val Acc: 0.6011
Epoch 10/10, Train Loss: 0.8914, Val Loss: 1.6070, Val Acc: 0.6218


## Load Pretrained Model： AlexNet
微调并进行超参数选择

In [9]:
# 超参数列表
learning_rates = [0.01, 0.001]
momentums = [0.9, 0.95]
num_epochs_list = [10]


# 遍历所有超参数组合
for lr, momentum, num_epochs in itertools.product(learning_rates, momentums, num_epochs_list):
    print(f"实验: lr={lr}, momentum={momentum}, num_epochs={num_epochs}")
    # 模型和优化器
    # Step 2: Load Pretrained Model
   # Step 2: Load Pretrained Model
    model = models.alexnet(pretrained=True)  # 加载 AlexNet 预训练模型
    num_ftrs = model.classifier[6].in_features  # 获取最后一层的输入特征数
    model.classifier[6] = nn.Linear(num_ftrs, 101)  # 修改最后一层为 101 类
    # Step 3: Freeze Parameters (微调其余参数)
    for param in model.parameters():
        param.requires_grad = True  # 解冻所有参数

    # 仅为最后一层设置较大的学习率
    # Step 4: Define Loss and Optimizer
# 为最后一层设置较大的学习率，其余层设置较小的学习率
    optimizer = optim.SGD([
    {'params': model.classifier[6].parameters(), 'lr': lr},  # 最后一层
    {'params': [param for name, param in model.named_parameters() if "classifier.6" not in name], 'lr': 0.0001}  # 其余层
], momentum=momentum)

    criterion = nn.CrossEntropyLoss()
        # Step 5: Training Loop
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    print(device)

        # ...existing code...
    writer = SummaryWriter(f"runs/alexnet/exp_lr{lr}_mom{momentum}_ep{num_epochs}")


    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        train_loss = running_loss / len(train_loader)
        writer.add_scalar('Loss/train', train_loss, epoch)

        # 验证集 loss 和 accuracy
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, preds = torch.max(outputs, 1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
        val_loss = val_loss / len(val_loader)
        val_acc = correct / total

        writer.add_scalar('Loss/val', val_loss, epoch)
        writer.add_scalar('Accuracy/val', val_acc, epoch)

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    writer.close()



实验: lr=0.01, momentum=0.9, num_epochs=10




cuda:0
Epoch 1/10, Train Loss: 1.2685, Val Loss: 0.6792, Val Acc: 0.8378
Epoch 2/10, Train Loss: 0.2640, Val Loss: 0.6221, Val Acc: 0.8493
Epoch 3/10, Train Loss: 0.1215, Val Loss: 0.5896, Val Acc: 0.8678
Epoch 4/10, Train Loss: 0.0800, Val Loss: 0.5784, Val Acc: 0.8724
Epoch 5/10, Train Loss: 0.0578, Val Loss: 0.5915, Val Acc: 0.8647
Epoch 6/10, Train Loss: 0.0472, Val Loss: 0.5844, Val Acc: 0.8755
Epoch 7/10, Train Loss: 0.0299, Val Loss: 0.6044, Val Acc: 0.8762
Epoch 8/10, Train Loss: 0.0221, Val Loss: 0.6240, Val Acc: 0.8709
Epoch 9/10, Train Loss: 0.0274, Val Loss: 0.6241, Val Acc: 0.8632
Epoch 10/10, Train Loss: 0.0298, Val Loss: 0.5957, Val Acc: 0.8762
实验: lr=0.01, momentum=0.95, num_epochs=10
cuda:0
Epoch 1/10, Train Loss: 1.4452, Val Loss: 0.6685, Val Acc: 0.8324
Epoch 2/10, Train Loss: 0.2942, Val Loss: 0.5648, Val Acc: 0.8632
Epoch 3/10, Train Loss: 0.1170, Val Loss: 0.5378, Val Acc: 0.8762
Epoch 4/10, Train Loss: 0.0864, Val Loss: 0.5619, Val Acc: 0.8716
Epoch 5/10, Train L

In [10]:
# Step 7: Compare with Random Initialization

# 重新初始化模型（不加载预训练权重）
random_model = models.alexnet(pretrained=False)  # 不加载预训练权重
num_ftrs = random_model.classifier[6].in_features
random_model.classifier[6] = nn.Linear(num_ftrs, 101)  # 修改最后一层为 101 类

# 定义损失函数和优化器
lr = 0.01
momentum = 0.9
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(random_model.parameters(), lr=lr, momentum=momentum)

# 将模型移动到设备
random_model = random_model.to(device)

# 训练循环
num_epochs = 10
writer = SummaryWriter(f"runs/alexnet_random/exp_lr{lr}_mom{momentum}_ep{num_epochs}")
for epoch in range(num_epochs):
    random_model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = random_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    train_loss = running_loss / len(train_loader)
    writer.add_scalar('Loss/train', train_loss, epoch)
# 验证集 loss 和 accuracy
    random_model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = random_model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    val_loss = val_loss / len(val_loader)
    val_acc = correct / total

    writer.add_scalar('Loss/val', val_loss, epoch)
    writer.add_scalar('Accuracy/val', val_acc, epoch)

    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    writer.close()

Epoch 1/10, Train Loss: 4.2034, Val Loss: 3.8013, Val Acc: 0.1914
Epoch 2/10, Train Loss: 3.5713, Val Loss: 3.4007, Val Acc: 0.2813
Epoch 3/10, Train Loss: 3.2530, Val Loss: 3.1371, Val Acc: 0.3313
Epoch 4/10, Train Loss: 2.9010, Val Loss: 2.8036, Val Acc: 0.3928
Epoch 5/10, Train Loss: 2.5475, Val Loss: 2.4326, Val Acc: 0.4481
Epoch 6/10, Train Loss: 2.3359, Val Loss: 2.3194, Val Acc: 0.4781
Epoch 7/10, Train Loss: 2.0919, Val Loss: 2.1419, Val Acc: 0.5088
Epoch 8/10, Train Loss: 1.9695, Val Loss: 2.0779, Val Acc: 0.5342
Epoch 9/10, Train Loss: 1.8045, Val Loss: 1.9818, Val Acc: 0.5388
Epoch 10/10, Train Loss: 1.6841, Val Loss: 2.0089, Val Acc: 0.5496
