# Loss 测试


In [23]:
import os

import torch
from jupyter_server.services.contents.checkpoints import Checkpoints
from torch import nn
# Example of target with class indices
loss = nn.CrossEntropyLoss()
input = torch.randn(3, 5, requires_grad=True)
target = torch.empty(3, dtype=torch.long).random_(5)
print(input)
print(target)

output = loss(input, target)
print(output)
output.backward()
print(output)

tensor([[ 0.2906,  0.7886, -0.8071,  0.6975, -0.4337],
        [ 0.8174,  0.0636, -1.0646,  0.6757,  0.4634],
        [-1.0926, -0.4392, -1.2230,  0.7750, -0.1133]], requires_grad=True)
tensor([1, 1, 3])
tensor(1.2372, grad_fn=<NllLossBackward0>)
tensor(1.2372, grad_fn=<NllLossBackward0>)


In [24]:
# Example of target with class probabilities
# If containing class probabilities, same shape as the input and each value should be between [0,1]
input = torch.randn(3, 5, requires_grad=True)
target = torch.randn(3, 5).softmax(dim=1)
print(input)
print(target)
output = loss(input, target)
output.backward()
print(output)

tensor([[ 1.1117,  0.2177,  0.2326, -1.2596,  0.7871],
        [-0.9317,  0.7030,  0.2557,  0.8344, -0.0866],
        [-1.0184,  0.4584, -0.3879,  1.5941,  1.2739]], requires_grad=True)
tensor([[0.2037, 0.0211, 0.1943, 0.0515, 0.5295],
        [0.1375, 0.0930, 0.4348, 0.2061, 0.1286],
        [0.0290, 0.1263, 0.1008, 0.6414, 0.1025]])
tensor(1.4823, grad_fn=<DivBackward1>)


# 优化器使用重点：
```
optimizer.zero_grad()
loss.backward()
optimizer.step()
```


# 训练模板

导包

In [36]:
import torch

from src.model import AntBeeClassifier
from src.dataset import ClassDirectoryDataset
from torch.optim import Adam
from torch.utils.data import DataLoader,random_split

from tqdm import tqdm
from tqdm import trange
import numpy as np

参数配置

In [37]:
# ==== 训练配置 ====
checkpoints_dir = "checkpoints"
os.makedirs(checkpoints_dir, exist_ok=True)
# ==== 参数设定 ====
num_epochs = 100
batch_size = 32
num_workers = 2
learning_rate = 1e-3

patience = 10
best_val_loss = np.inf
epochs_without_improvement = 0 # 记录val_loss已经多少个epoch没有下降了

设备选择

In [38]:
# ==== 设备选择 ====
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


数据

In [39]:
# ==== 创建dataset 和 dataloader ====
dataset = ClassDirectoryDataset("../data/hymenoptera_data/train", ["jpg"])
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_set, val_set = random_split(dataset, [train_size, val_size],
                                  generator=torch.Generator().manual_seed(42)
)


train_loader = DataLoader(train_set,
                          batch_size=batch_size,
                          num_workers=num_workers,
                          shuffle=True,
                          pin_memory=True  # 如果使用GPU，启用此选项加速数据传输
)
val_loader = DataLoader(val_set,
                        batch_size=batch_size,
                        num_workers=num_workers,
                        shuffle=True,
                        pin_memory=True
)

In [40]:
img0, label0 = dataset[5]
print("img0.shape:", img0.shape)
print("label0_id:", label0)
print("label0:", dataset.classes[label0])

img0.shape: torch.Size([3, 224, 224])
label0_id: 0
label0: ants


模型实例、损失函数、Optimizer

In [41]:
# ==== 模型实例 ====
model = AntBeeClassifier(dropout_rate=0.2).to(device) # 1. build a model
# ==== 损失函数 & 优化器 ====
criterion = torch.nn.CrossEntropyLoss() # 2. define the loss
optimizer = Adam(model.parameters(), lr=learning_rate , weight_decay=1e-4) # 3. do the optimize work
# 添加权重衰减防止过拟合

Training + Validation loop

In [46]:
# ==== Training + Validation loop ====
for epoch in range(num_epochs):
    # --- Training ---
    model.train()
    train_loss = 0.0 # 1个 epoch 中 所有 batch 的损失之和
    correct, total = 0, 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels) # 计算 交叉熵loss

        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        predicted = outputs.argmax(1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    avg_train_loss = train_loss / len(train_loader) # 把总损失除以 batch 数，就是平均每个 batch 的损失
    train_acc = correct / total

    # --- Validation ---
    model.eval()
    val_loss = 0.0
    correct, total = 0, 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            predicted = outputs.argmax(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_val_loss = val_loss / len(val_loader)
    val_acc = correct / total

    # --- 模型保存 ---
    # 保存最新模型
    checkpoint = {
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "train_loss": avg_train_loss,
        "val_loss": avg_val_loss,
        "train_acc": train_acc,
        "val_acc": val_acc,
    }
    torch.save(checkpoint, os.path.join(checkpoints_dir, "latest_model.pth"))

    # 保存最佳模型（基于验证损失）
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(checkpoint, os.path.join(checkpoints_dir, "best_model.pth"))
        epochs_without_improvement = 0
        print(f"New best model saved with val_loss: {avg_val_loss:.4f}")
    else:
        epochs_without_improvement += 1

    # --- 早停检查 ---
    if epochs_without_improvement >= patience:
        print(f"Early stopping triggered after {epoch+1} epochs!")
        break

    # --- Log ---
    print(f"Epoch [{epoch+1}/{num_epochs}] "
          f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.4f}")

Early stopping triggered after 1 epochs!


In [45]:
# 训练完成后加载最佳模型
best_checkpoint = torch.load(os.path.join(checkpoints_dir, "best_model.pth"), weights_only=True)
best_model = AntBeeClassifier()
best_model.load_state_dict(best_checkpoint["model_state_dict"])

<All keys matched successfully>