In [1]:
import os
import cv2
import copy
import torch
import torchvision.datasets
import albumentations

import pandas as pd
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms

from tqdm.auto import tqdm
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import ExponentialLR
from sklearn.model_selection import train_test_split, StratifiedKFold
from PIL import Image
from albumentations.pytorch.transforms import ToTensorV2

  backends.update(_get_backends("networkx.backends"))
  data = fetch_version_info()


In [2]:
# 创建模型保存目录
os.makedirs('../models', exist_ok=True)

In [3]:
# 读取训练和测试数据
train = pd.read_csv('../dataset/train.csv')
test = pd.read_csv('../dataset/test.csv')
# 对标签列进行数字编码
train['number'], labels_unique = pd.factorize(train['label'])
# # 保存编码结果
# train.to_csv('../dataset/train_add_number.csv', index=False)
# # 保存标签映射关系
# pd.Series(labels_unique).to_csv('../dataset/label_mapping.csv', index=False)

In [4]:
# 数据增强
transforms_train = albumentations.Compose(
    [
        albumentations.Resize(320, 320),            # 调整图像尺寸到320x320
        albumentations.HorizontalFlip(p=0.5),       # 概率50%水平翻转
        albumentations.VerticalFlip(p=0.5),         # 概率50%垂直翻转
        albumentations.Rotate(limit=180, p=0.7),    # 随机翻转(±180°，概率70%)
        albumentations.RandomBrightnessContrast(),  # 随机调整亮度和对比度
        albumentations.Affine(
            translate_percent = (-0.25, 0.25),  # 平移范围±25%
            scale = (0.9, 1.1),                 # 缩放范围±10%
            rotate = 0,                         # 无旋转
            p = 0.5
        ),
        albumentations.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225], max_pixel_value=255.0),
        ToTensorV2(p=1.0)  # 将图像从 numpy 数组转换为 PyTorch 张量
    ]
)

# 不添加随机增强，确保评估结果的一致性
transforms_test = albumentations.Compose(
    [
        albumentations.Resize(320, 320),
        albumentations.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225], max_pixel_value=255.0),
        ToTensorV2(p=1.0)
    ]
)

In [5]:
class Leaf_Dataset(Dataset):
    def __init__(self, train_csv, transform=None, test_bool=False):
        '''
        train_csv: 记录图像路径及标号的csv文件
        transform: 图像变换
        test_bool: 是否为测试集
        '''
        self.train_csv = train_csv
        self.transform = transform
        self.test_bool = test_bool
        self.image_path = list(self.train_csv['image'])  # 提取所有图像路径
        # 如果不是测试集，加载标签信息
        if not test_bool:
            self.label_nums = list(self.train_csv['number'])

    def __getitem__(self, idx):
        '''
        获取单个样本
        idx：样本索引
        return: image, label
        '''
        # 读取图像
        image = cv2.imread(os.path.join('../dataset', self.image_path[idx]))
        # 转换颜色空间
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # 图像变换
        if self.transform is not None:
            image = self.transform(image=image)['image']
        # 测试集只返回图像，训练集和验证集返回图像和标签
        if not self.test_bool:
            label = self.label_nums[idx]
            return image, label
        else:
            return image

    def __len__(self):
        return len(self.image_path)

In [12]:
def train_model(train_loader, valid_loader, test, fold_n, device=torch.device('cuda:0')):
    # 模型初始化
    net = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V1)
    in_features = net.fc.in_features      # 获取全连接层的输入特征维度
    net.fc = nn.Linear(in_features, 176)  # 替换全连接层以适应176类树叶分类任务
    net = net.to(device)

    # 训练参数设置
    epoch = 30
    best_epoch = 0
    best_score = 0.0
    best_model_state = None   # 保存最佳模型状态
    early_stopping_round = 3  # 早停轮数
    losses = []               # 记录每轮的训练损失

    # 优化器、损失函数和调度器
    optimizer = optim.Adam(net.parameters(), lr=0.0001, weight_decay=1e-5)
    loss = nn.CrossEntropyLoss(reduction='mean')
    scheduler = ExponentialLR(optimizer, gamma=0.9)

    # 训练循环
    for i in range(epoch):
        acc = 0       # 累计训练准确数
        loss_sum = 0  # 累计训练损失

        # 训练阶段
        net.train()
        for x, y in tqdm(train_loader):
            # 准备输入数据
            x = torch.as_tensor(x, dtype=torch.float).to(device)
            y = y.to(device)

            # 前向传播
            y_hat = net(x)

            # 计算损失
            loss_temp = loss(y_hat, y)
            loss_sum += loss_temp

            # 反向传播
            optimizer.zero_grad()
            loss_temp.backward()
            optimizer.step()

            # 计算准确数
            acc += torch.sum(y_hat.argmax(dim=1).type(y.dtype) == y)

        # 更新学习率
        scheduler.step()

        # 记录平均损失
        losses.append(loss_sum.cpu().detach().numpy() / len(train_loader))

        # 打印训练结果
        print('epoch: ', i,
             'loss: ', loss_sum.item(),
             'train acc: ', (acc / (len(train_loader) * train_loader.batch_size)).item(), end='')

        # 验证阶段
        valid_acc = 0.0
        net.eval()
        for x, y in tqdm(valid_loader):
            # 准备输入数据
            x = torch.as_tensor(x, dtype=torch.float).to(device)
            y = y.to(device)

            # 前向传播
            with torch.no_grad():
                y_hat = net(x)

            # 计算准确数
            valid_acc += torch.sum(y_hat.argmax(dim=1).type(y.dtype) == y)

        # 计算并打印验证准确率
        print('valid acc: ', (valid_acc / (len(valid_loader) * valid_loader.batch_size)).item())

        # 模型保存与早停
        if valid_acc > best_score:
            best_model_state = copy.deepcopy(net.state_dict())
            best_score = valid_acc
            best_epoch = i
            print('best epoch save!')

        if i - best_epoch >= early_stopping_round:
            print(f'Early stopping at epoch {i}')
            break

    # 保存最佳模型
    model_path = f'../models/fold_{fold_n}_best_model.pth'
    torch.save({'model_state_dict': best_model_state}, model_path)
    print(f'Saved model for fold {fold_n} at {model_path}')

    # 加载最佳模型
    net.load_state_dict(best_model_state)

    # 加载测试数据
    testset = Leaf_Dataset(test, transform=transforms_test, test_bool=True)
    test_loader = DataLoader(testset, batch_size=64, shuffle=False, drop_last=False)

    # 执行预测
    predictions = []
    with torch.no_grad():
        for x in tqdm(test_loader):
            # 准备输入数据
            x = torch.as_tensor(x, dtype=torch.float).to(device)

            # 前向传播
            y_hat = net(x)

            # 获取预测结果
            predict = torch.argmax(y_hat, dim=1).reshape(-1)
            predict = list(predict.cpu().detach().numpy())
            predictions.extend(predict)
    return predictions

In [None]:
# 初始化分层K折交叉验证器
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=2025)

# 创建空 DataFrame 存储各折的预测结果
prediction_KFold = pd.DataFrame()

# 开始K折交叉验证
for fold_n, (train_idx, val_idx) in enumerate(skf.split(train, train['number'])):
    print(f'fold {fold_n} training...')

    # 划分训练集和验证集
    train_data = train.iloc[train_idx]
    valid_data = train.iloc[val_idx]

    # 创建数据集对象
    trainset = Leaf_Dataset(train_data, transform=transforms_train)
    validset = Leaf_Dataset(valid_data, transform=transforms_test)

    # 创建数据加载器
    train_loader = DataLoader(trainset, batch_size=32, shuffle=True, drop_last=False)
    valid_loader = DataLoader(validset, batch_size=32, shuffle=False, drop_last=False)

    # 训练模型并在测试集上预测
    predictions = train_model(train_loader, valid_loader, test, fold_n)

    # 存储当前折的预测结果
    prediction_KFold[f'fold {fold_n}'] = predictions

In [None]:
# 查看各折预测结果
print(prediction_KFold)

In [None]:
# 最终预测结果采取众数投票
prediction_final = list(prediction_KFold.mode(axis=1)[0].astype(int))

# 数字标签转换回文本标签
test['label'] = [labels_unique[i] for i in prediction_final]

# 保存结果文件
test.to_csv('../dataset/result_practice3.csv', index=False)