In [None]:
# Download the dataset
# You may choose where to download the data.

# Google Drive
!gdown --id '1awF7pZ9Dz7X1jn1_QAiKN-_v56veCEKy' --output food-11.zip

# Dropbox
# !wget https://www.dropbox.com/s/m9q6273jl3djall/food-11.zip -O food-11.zip

# MEGA
# !sudo apt install megatools
# !megadl "https://mega.nz/#!zt1TTIhK!ZuMbg5ZjGWzWX1I6nEUbfjMZgCmAgeqJlwDkqdIryfg"

# Unzip the dataset.
# This may take some time.
!unzip -q food-11.zip

# 导入包

In [4]:
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import transforms
import numpy as np
import torch
import sys
import os
import random
from PIL import Image
from torch.utils.data import Dataset
import matplotlib.pyplot as plt
sys.path.append('../tools')
import pandas as pd

# 构建ResNet-18moxing

In [2]:
# 基础残差块
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out


# ResNet-18模型
class ResNet18(nn.Module):
    def __init__(self, block, layers, num_classes=11, num_channels=3):
        super(ResNet18, self).__init__()
        self.in_channels = 64

        # 初始卷积层
        self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2,
                               padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # 残差层
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        # 分类器
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        # 权重初始化
        self._initialize_weights()

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


def resnet18(num_classes=11):
    """创建自定义ResNet-18模型"""
    return ResNet18(BasicBlock, [2, 2, 2, 2], num_classes=num_classes)


## 测试ResNet-18

In [3]:
# 创建模型
model = resnet18(num_classes=11)
print(model)

ResNet18(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)


# 导入common_tools

In [5]:
def plot_loss_curves(train_losses, val_losses, save_path=None):
    """
    绘制训练和验证损失曲线
    """
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss', linewidth=2)
    plt.plot(val_losses, label='Validation Loss', linewidth=2)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True, alpha=0.3)

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        print(f"图片已保存到: {save_path}")


In [6]:
def plot_accuracy_curves(train_accuracies, val_accuracies, save_path=None):
    """
    绘制准确率曲线

    Args:
        train_accuracies: 训练准确率列表
        val_accuracies: 验证准确率列表
        save_path: 图片保存路径
    """
    plt.figure(figsize=(10, 6))

    epochs = range(1, len(train_accuracies) + 1)

    plt.plot(epochs, train_accuracies, 'b-', label='Training Accuracy', linewidth=2)
    plt.plot(epochs, val_accuracies, 'r-', label='Validation Accuracy', linewidth=2)

    plt.title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
    plt.xlabel('Epochs', fontsize=12)
    plt.ylabel('Accuracy (%)', fontsize=12)
    plt.legend(fontsize=12)
    plt.grid(True, alpha=0.3)

    # 设置y轴范围从0开始
    plt.ylim(bottom=0)

    # 添加最佳准确率标注
    best_val_acc = max(val_accuracies)
    best_epoch = val_accuracies.index(best_val_acc) + 1
    plt.axvline(x=best_epoch, color='gray', linestyle='--', alpha=0.7)
    plt.text(best_epoch, best_val_acc / 2, f'Best: {best_val_acc:.2f}%\nEpoch: {best_epoch}',
             ha='center', va='center', fontsize=10, bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8))

    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close()
    else:
        plt.show()

In [7]:
def plot_training_curves(train_losses, val_losses, train_accuracies, val_accuracies, save_path=None):
    """
    绘制综合训练曲线（损失和准确率在一起）

    Args:
        train_losses: 训练损失列表
        val_losses: 验证损失列表
        train_accuracies: 训练准确率列表
        val_accuracies: 验证准确率列表
        save_path: 图片保存路径
    """
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

    epochs = range(1, len(train_losses) + 1)

    # 绘制损失曲线
    ax1.plot(epochs, train_losses, 'b-', label='Training Loss', linewidth=2)
    ax1.plot(epochs, val_losses, 'r-', label='Validation Loss', linewidth=2)
    ax1.set_title('Training and Validation Loss', fontsize=14, fontweight='bold')
    ax1.set_ylabel('Loss', fontsize=12)
    ax1.legend(fontsize=12)
    ax1.grid(True, alpha=0.3)

    # 绘制准确率曲线
    ax2.plot(epochs, train_accuracies, 'b-', label='Training Accuracy', linewidth=2)
    ax2.plot(epochs, val_accuracies, 'r-', label='Validation Accuracy', linewidth=2)
    ax2.set_title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Epochs', fontsize=12)
    ax2.set_ylabel('Accuracy (%)', fontsize=12)
    ax2.legend(fontsize=12)
    ax2.grid(True, alpha=0.3)
    ax2.set_ylim(bottom=0)

    # 添加最佳准确率标注
    best_val_acc = max(val_accuracies)
    best_epoch = val_accuracies.index(best_val_acc) + 1
    ax2.axvline(x=best_epoch, color='gray', linestyle='--', alpha=0.7)
    ax2.text(best_epoch, best_val_acc / 2, f'Best: {best_val_acc:.2f}%',
             ha='center', va='center', fontsize=10,
             bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8))

    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close()
    else:
        plt.show()

# 训练模型

In [9]:
import os
from datetime import datetime

import torch.nn as nn
import torch
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import DatasetFolder
import torch.optim as optim
from tools.hw3_model import resnet18
from tools.hw3_common_tools import plot_loss_curves, plot_accuracy_curves, plot_training_curves
from PIL import Image



In [11]:
BASE_DIR = os.getcwd() 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # 自动选择GPU或CPU

In [12]:
parent_dir = os.path.dirname(BASE_DIR)  # 获取上级文件夹

In [13]:
now_time = datetime.now()
time_str = datetime.strftime(now_time, '%m-%d_%H-%M')
log_dir = os.path.join(BASE_DIR, "..", "results", time_str)
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

In [14]:
# 训练超参数
MAX_EPOCH = 256  # 总训练轮数
BATCH_SIZE = 128  # 批大小
LR = 0.001  # 初始学习率
log_interval = 1  # 日志记录间隔
val_interval = 1  # 验证间隔
PATIENCE = 20  # 早停耐心值


In [15]:
# ============================ step 1/5 数据加载 ============================
train_tfm = transforms.Compose([
    transforms.Resize((128, 128)),
    # 添加数据增强
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.ToTensor(),
    # 添加归一化（重要！）
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

test_tfm = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [16]:
train_set = DatasetFolder("food-11/training/labeled", loader=lambda x: Image.open(x), extensions="jpg",
                          transform=train_tfm)
valid_set = DatasetFolder("food-11/validation", loader=lambda x: Image.open(x), extensions="jpg",
                          transform=test_tfm)
unlabeled_set = DatasetFolder("food-11/training/unlabeled", loader=lambda x: Image.open(x), extensions="jpg",
                              transform=train_tfm)
test_set = DatasetFolder("food-11/testing", loader=lambda x: Image.open(x), extensions="jpg", transform=test_tfm)


In [17]:
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
valid_loader = DataLoader(valid_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)

print(f"训练集: {len(train_set)}")
print(f"验证集: {len(valid_set)}")

训练集: 3080
验证集: 660


In [18]:
model = resnet18(num_classes=11)
model.to(device)  # 将模型移动到设备（GPU/CPU）

ResNet18(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)


In [19]:
# 打印模型信息
print(f"模型已创建，移动到设备: {device}")
print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")

模型已创建，移动到设备: cpu
模型参数量: 11,182,155


In [20]:
# ============================ step 3/5 损失函数 ============================
criterion = nn.CrossEntropyLoss()  # 分类任务用交叉熵损失

In [21]:
# ============================ step 4/5 优化器 ============================
# 使用SGD，通常对ResNet效果更好
optimizer = optim.SGD(model.parameters(), lr=LR, momentum=0.9, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=MAX_EPOCH, eta_min=1e-6)


In [None]:
# ============================ step 5/5 训练循环 ============================
# 记录训练过程中的各项指标
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []
learning_rates = []
best_val_accuracy = 0.0
early_stop_counter = 0
best_epoch = 0

print("开始训练...")
for epoch in range(MAX_EPOCH):
    # ===== 训练阶段 =====
    model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        # 确保标签是long类型（CrossEntropyLoss要求）
        if target.dtype != torch.long:
            target = target.long()

        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        # 计算训练准确率
        _, predicted = torch.max(output.data, 1)
        train_total += target.size(0)
        train_correct += (predicted == target).sum().item()

    avg_train_loss = train_loss / len(train_loader)
    train_accuracy = 100.0 * train_correct / train_total
    train_losses.append(avg_train_loss)
    train_accuracies.append(train_accuracy)

    # 更新学习率并记录
    current_lr = scheduler.get_last_lr()[0]
    learning_rates.append(current_lr)
    scheduler.step()

    # ===== 验证阶段 =====
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for data, target in valid_loader:
            data, target = data.to(device), target.to(device)

            # 确保标签是long类型
            if target.dtype != torch.long:
                target = target.long()

            output = model(data)
            loss = criterion(output, target)
            val_loss += loss.item()

            # 计算验证准确率
            _, predicted = torch.max(output.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    avg_val_loss = val_loss / len(valid_loader)
    val_accuracy = 100.0 * val_correct / val_total
    val_losses.append(avg_val_loss)
    val_accuracies.append(val_accuracy)

    # ===== 早停判断和模型保存 =====
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        best_epoch = epoch
        early_stop_counter = 0

        # 保存最佳模型
        checkpoint = {
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "scheduler_state_dict": scheduler.state_dict(),
            "epoch": epoch,
            "best_val_accuracy": best_val_accuracy,
            "val_loss": avg_val_loss,
            "train_accuracy": train_accuracy,
            "train_loss": avg_train_loss
        }
        path_checkpoint = os.path.join(log_dir, "checkpoint_best.pkl")
        torch.save(checkpoint, path_checkpoint)
        print(f"✅ 保存最佳模型，验证准确率: {best_val_accuracy:.2f}%")
    else:
        early_stop_counter += 1

    # 打印训练和验证信息
    if epoch % log_interval == 0:
        print(f'Epoch: {epoch:03d}/{MAX_EPOCH}, '
              f'Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
              f'Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%, '
              f'LR: {current_lr:.6f}, '
              f'EarlyStop: {early_stop_counter}/{PATIENCE}')

    # 早停检查
    if early_stop_counter >= PATIENCE:
        print(f"🚨 早停触发！在 epoch {epoch} 停止训练")
        print(f"🏆 最佳模型在 epoch {best_epoch}, 验证准确率: {best_val_accuracy:.2f}%")
        break


In [None]:
# ===== 训练结束 =====
now_time = datetime.now()
time_str = datetime.strftime(now_time, '%m-%d_%H-%M')
print(f"训练完成时间: {time_str}")
print(f"最终最佳验证准确率: {best_val_accuracy:.2f}%")

In [None]:
# 保存训练记录
training_history = {
    'train_losses': train_losses,
    'val_losses': val_losses,
    'train_accuracies': train_accuracies,
    'val_accuracies': val_accuracies,
    'learning_rates': learning_rates,
    'best_val_accuracy': best_val_accuracy,
    'best_epoch': best_epoch
}
torch.save(training_history, os.path.join(log_dir, 'training_history.pth'))

# 绘制各种曲线
picture_path_loss = os.path.join(log_dir, 'loss_curves.png')
picture_path_acc = os.path.join(log_dir, 'accuracy_curves.png')
picture_path_combined = os.path.join(log_dir, 'training_curves.png')

plot_loss_curves(train_losses, val_losses, picture_path_loss)
plot_accuracy_curves(train_accuracies, val_accuracies, picture_path_acc)
plot_training_curves(train_losses, val_losses, train_accuracies, val_accuracies, picture_path_combined)

print(f"损失曲线已保存至: {picture_path_loss}")
print(f"准确率曲线已保存至: {picture_path_acc}")
print(f"综合训练曲线已保存至: {picture_path_combined}