# Import libraries

In [1]:
import torch.nn as nn
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
import torch
from tqdm import tqdm
import random
import numpy as np
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
import time

# Enable GPU

In [2]:
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
    
print(f"Using device {device}")

Using device cuda


# 超参数

In [None]:
# 官方参数
hyper_params_default = {
    "learning_rate": 0.001,       # 学习率
    "batch_size": 64,             # 批次大小
    "epochs": 15,                # 训练轮数
    "optimizer": "Adam",          # 优化器
    "loss_function": "CrossEntropyLoss",  # 损失函数
    "dropout_rate": 0.5,         # dropout率
    "weight_decay": 0.01,       # 权重衰减
}

In [3]:
# 定义本次训练的超参数
hyper_params = {
    "learning_rate": 0.00001,       # 学习率
    "batch_size": 128,             # 批次大小
    "epochs": 50,                # 训练轮数
    "optimizer": "Adam",          # 优化器
    "loss_function": "CrossEntropyLoss",  # 损失函数
    "dropout_rate": 0.5,         # dropout率
    "weight_decay": 0.0001,       # 权重衰减
}

# CNN Architecture

In [4]:
class cnn(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.head = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(hyper_params["dropout_rate"]),
            nn.Linear(4*4*512, 38)
        )

    def forward(self, x):
        x = self.conv(x)
        x = self.head(x)
        return x

# Load dataset

In [None]:
# 归一化后像素值 = (原始像素值（0-1 范围） - 均值) / 标准差
t = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

path = "New Plant Diseases Dataset"

train = datasets.ImageFolder(path + "/train", transform=t)
test = datasets.ImageFolder(path + "/valid", transform=t)
print(f'train.class_to_idx: {train.class_to_idx}')


train = DataLoader(train, batch_size=hyper_params["batch_size"], shuffle=True, num_workers=4, pin_memory=True)
test = DataLoader(test, batch_size=hyper_params["batch_size"], shuffle=False, num_workers=4, pin_memory=True)

# Create model

In [None]:
model = cnn()

if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=hyper_params["learning_rate"], weight_decay=hyper_params["weight_decay"])

# Training loop

In [None]:
# 初始化 TensorBoard

writer = SummaryWriter(log_dir= f'runs/{time.strftime("%Y%m%d-%H%M%S")}')

hyper_params["start_time"] = time.strftime("%Y%m%d-%H%M%S")  # 训练开始时间

table_content = """
| 超参数名称 | 参数值 |
|------------|--------|
"""
for param_name, param_value in hyper_params.items():
    table_content += f"| {param_name} | {param_value} |\n"

# 写入TensorBoard（step设为0，代表训练开始前）
writer.add_text(
    tag="Experiment_Config/Hyperparameters",
    text_string=table_content,
    global_step=0
)
print("✅ 训练开始前已记录超参数表格到TensorBoard")

epochs = hyper_params["epochs"]
global_step = 0  # 记录全局步数
for epoch in range(epochs):
    model.train()
    loop = tqdm(train, desc=f"Epoch {epoch+1}/{epochs}", leave=True)
    total = 0
    correct = 0
    epoch_last_step_loss = 0
    epoch_last_step_acc = 0
    final_loss = 0
    final_train_acc = 0
    for features, labels in loop:
        features, labels = features.to(device), labels.to(device)
        optimizer.zero_grad()

        pred = model(features)
        loss = criterion(pred, labels)
        loss.backward()
        optimizer.step()

        _,pred = torch.max(pred, 1)
        total += labels.size(0)
        correct += (pred == labels).sum().item()

        loop.set_postfix(loss=loss.item(), acc=(correct/total)*100)
        writer.add_scalar('Loss/train-step', loss.item(), global_step)
        writer.add_scalar('Accuracy/train-step', (correct/total)*100, global_step)
        epoch_last_step_loss = loss.item()
        epoch_last_step_acc = (correct/total)*100
        global_step += 1
    
    writer.add_scalar('Loss/train-epoch', epoch_last_step_loss, epoch)
    writer.add_scalar('Accuracy/train-epoch', epoch_last_step_acc, epoch)
    # 测试集评估
    all_preds = []
    all_labels = []
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for features, labels in test:
            features, labels = features.to(device), labels.to(device)
            preds = model(features)
            _, preds = torch.max(preds, 1)
            total += labels.size(0)
            correct += (preds == labels).sum().item()
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
        test_acc = (correct / total) * 100
        writer.add_scalar('Accuracy/valid-epoch', test_acc, epoch)
writer.close()
        
        
    

# Testing

In [None]:
all_preds = []
all_labels = []
correct = 0
total = 0
model.eval()
with torch.no_grad():
    for features, labels in test:
        features, labels = features.to(device), labels.to(device)
        preds = model(features)
        _, preds = torch.max(preds, 1)
        total += labels.size(0)
        correct += (preds == labels).sum().item()
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(20, 20))
sns.heatmap(cm, annot=False, fmt='d', cmap='Blues')
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()

accuracy = (correct/total)*100
print(f"Test accuracy: {accuracy}")
torch.save(model.state_dict(), f"plant_disease_{int(accuracy)}.pth")

# 导出onnx模型

In [6]:
import os
# 初始化模型
model = cnn()
model = model.to(device)
# 加载预训练权重
model_path = "plant_disease_98-20260103-114257.pth"

if os.path.exists(model_path):
    try:
        # 加载模型权重
        model.load_state_dict(torch.load(model_path, map_location=device))
        print(f"成功加载预训练模型: {model_path}")
    except Exception as e:
        print(f"加载模型失败: {e}")
        # 尝试另一种加载方式
        try:
            model.load_state_dict(torch.load(model_path, map_location=device), strict=False)
            print(f"使用strict=False成功加载模型: {model_path}")
        except Exception as e2:
            print(f"使用strict=False也加载失败: {e2}")
            exit(1)
else:
    print(f"模型文件不存在: {model_path}")
    exit(1)

# 设置模型为评估模式
model.eval()

# 创建一个示例输入张量（与模型期望的输入形状一致）
# 输入形状: [batch_size, channels, height, width]
# 这里使用batch_size=1, channels=3, height=256, width=256
dummy_input = torch.randn(1, 3, 256, 256).to(device)

# 导出为ONNX格式
onnx_path = f"onnx/{model_path.split('.pth')[0]}.onnx"
torch.onnx.export(
    model,
    dummy_input,
    onnx_path,
    export_params=True,  # 导出训练好的权重
    opset_version=18,    # ONNX操作集版本
    do_constant_folding=True,  # 是否执行常量折叠优化
    input_names=['input'],  # 输入张量的名称
    output_names=['output'],  # 输出张量的名称
    # 新版本推荐的动态形状设置方式
    dynamic_axes ={
        'input': {0: 'batch_size'},  # 允许batch_size动态变化
        'output': {0: 'batch_size'}
    },
    # 禁用dynamo（避免自动转换带来的问题）
    dynamo=False,
    # 确保导出为单个文件
    keep_initializers_as_inputs=False
)

print(f"成功导出ONNX模型到: {onnx_path}")

成功加载预训练模型: plant_disease_98-20260103-114257.pth


  torch.onnx.export(


成功导出ONNX模型到: onnx/plant_disease_98-20260103-114257.onnx


In [7]:
# 验证ONNX模型
import onnx
try:
    # 加载ONNX模型
    onnx_model = onnx.load(onnx_path)
    # 检查模型结构是否正确
    onnx.checker.check_model(onnx_model)
    print("ONNX模型验证通过")
except Exception as e:
    print(f"ONNX模型验证失败: {e}")

ONNX模型验证通过


In [None]:
# print(test.class_to_idx)
# print(test.num_workers)
# tensorboard --logdir=runs/train_exp --port=6006

命令模式下：

Shift➕回车 运行当前代码块，并跳到下一个代码块

Ctrl➕回车，只会运行当前代码块

Alt➕回车，运行当前代码块，并向下新建一个代码块

按b，向下新建一个代码块

按a，向上新建一个代码块

按c，复制当前代码块（单元格）

按x，剪切掉当前代码块

按v，粘贴到当前代码块；按shift➕v，粘贴到上一个代码块

按z，撤回操作

对于多行代码，在代码块命令模式下，按L，可以对代码标行数

按dd（两次），删除代码块

按h键，可以调出markdown的快捷键介绍表格