In [1]:
# main.ipynb

# 导入必要的库
import os
import pandas as pd
import numpy as np
from PIL import Image
import time
import copy

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms, models
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelEncoder

# 检查是否可以使用MPS（Metal Performance Shaders）进行GPU加速
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f'使用设备: {device}')


  Referenced from: <FB2FD416-6C4D-3621-B677-61F07C02A3C5> /Users/williamjing/opt/anaconda3/envs/kaggle/lib/python3.9/site-packages/torchvision/image.so
  warn(


使用设备: mps


In [2]:
# 数据预处理和增强
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],   # ImageNet的均值和标准差
                             [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],   # ImageNet的均值和标准差
                             [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])
}


In [3]:
# 自定义数据集类
class LeafDataset(Dataset):
    def __init__(self, csv_file, transform=None, mode='train'):
        self.data = pd.read_csv(csv_file)
        self.transform = transform
        self.mode = mode  # 'train' or 'test'

        if self.mode == 'train':
            # 提取标签并进行编码
            self.le = LabelEncoder()
            self.data['label'] = self.le.fit_transform(self.data['label'])
            self.classes = self.le.classes_
        else:
            self.classes = None  # 测试集没有标签

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = os.path.join('..', self.data.iloc[idx, 0])
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        if self.mode == 'train':
            label = self.data.iloc[idx, 1]
            return image, label
        else:
            return image, self.data.iloc[idx, 0]  # 返回图像和文件名


In [4]:
# 加载训练数据集
train_csv = '../train.csv'  # 请根据实际情况调整路径
train_dataset = LeafDataset(csv_file=train_csv, transform=data_transforms['train'], mode='train')

num_classes = len(train_dataset.classes)
print(f'类别数量: {num_classes}')


类别数量: 176


In [5]:
# 准备K折交叉验证
k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)

# 训练参数
num_epochs = 25
batch_size = 32
learning_rate = 0.001
criterion = nn.CrossEntropyLoss()


In [6]:
# 定义训练和验证函数
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs, fold):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Fold {fold}, Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # 每个epoch都有训练和验证阶段
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # 设置模型为训练模式
            else:
                model.eval()   # 设置模型为评估模式

            running_loss = 0.0
            running_corrects = 0

            # 遍历数据
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # 清零参数梯度
                optimizer.zero_grad()

                # 前向传播
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # 训练阶段反向传播+优化
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # 统计
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            # 学习率调整
            if phase == 'train':
                scheduler.step()

            # 计算损失和准确率
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.float() / len(dataloaders[phase].dataset)

            print(f'{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # 深拷贝模型
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Fold {fold} 训练完成于 {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'最佳验证准确率: {best_acc:.4f}')

    # 加载最佳模型权重
    model.load_state_dict(best_model_wts)
    return model, best_acc


In [7]:
# 开始K折交叉验证
fold_results = {}
for fold, (train_idx, val_idx) in enumerate(kfold.split(train_dataset)):
    print(f'Fold {fold}')
    print('-' * 20)

    # 创建数据子集
    train_subsampler = Subset(train_dataset, train_idx)
    val_subsampler = Subset(train_dataset, val_idx)

    # 数据加载器
    dataloaders = {
        'train': DataLoader(train_subsampler, batch_size=batch_size, shuffle=True),
        'val': DataLoader(val_subsampler, batch_size=batch_size, shuffle=False)
    }

    # 初始化模型
    model = models.resnet18(pretrained=True)

    # 修改最后的全连接层
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)

    model = model.to(device)

    # 设置优化器和学习率调度器
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
    # 每7个epoch学习率降低0.1倍
    exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    # 训练和评估
    model, best_acc = train_model(model, dataloaders, criterion, optimizer, exp_lr_scheduler, num_epochs, fold)

    # 保存每个fold的最佳模型
    torch.save(model.state_dict(), f'model_fold_{fold}.pth')

    # 存储结果
    fold_results[fold] = {'model': model, 'best_acc': best_acc}


Fold 0
--------------------




Fold 0, Epoch 0/24
----------
Train Loss: 3.8880 Acc: 0.2087
Val Loss: 2.5578 Acc: 0.4315

Fold 0, Epoch 1/24
----------
Train Loss: 2.1098 Acc: 0.5214
Val Loss: 1.6132 Acc: 0.6012

Fold 0, Epoch 2/24
----------
Train Loss: 1.4136 Acc: 0.6746
Val Loss: 1.1402 Acc: 0.7126

Fold 0, Epoch 3/24
----------
Train Loss: 1.0270 Acc: 0.7680
Val Loss: 0.9042 Acc: 0.7671

Fold 0, Epoch 4/24
----------
Train Loss: 0.7926 Acc: 0.8192
Val Loss: 0.7302 Acc: 0.8107

Fold 0, Epoch 5/24
----------
Train Loss: 0.6285 Acc: 0.8560
Val Loss: 0.5704 Acc: 0.8515

Fold 0, Epoch 6/24
----------
Train Loss: 0.5087 Acc: 0.8835
Val Loss: 0.5023 Acc: 0.8687

Fold 0, Epoch 7/24
----------
Train Loss: 0.4028 Acc: 0.9198
Val Loss: 0.4443 Acc: 0.8905

Fold 0, Epoch 8/24
----------
Train Loss: 0.3856 Acc: 0.9209
Val Loss: 0.4352 Acc: 0.8905

Fold 0, Epoch 9/24
----------
Train Loss: 0.3737 Acc: 0.9280
Val Loss: 0.4354 Acc: 0.8867

Fold 0, Epoch 10/24
----------
Train Loss: 0.3606 Acc: 0.9303
Val Loss: 0.4253 Acc: 0.8913

KeyboardInterrupt: 

In [None]:
# 所有fold训练完成
print('所有fold训练完成。')

In [None]:
# 选择最佳模型
best_fold = max(fold_results, key=lambda x: fold_results[x]['best_acc'])
best_model = fold_results[best_fold]['model']
print(f'最佳模型来自于Fold {best_fold}，验证准确率为{fold_results[best_fold]["best_acc"]:.4f}')

In [None]:
# 加载测试集
test_csv = '../test.csv'  # 请根据实际情况调整路径
test_dataset = LeafDataset(csv_file=test_csv, transform=data_transforms['test'], mode='test')
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 使用最佳模型进行预测
best_model.eval()
all_preds = []
image_names = []
with torch.no_grad():
    for inputs, img_paths in test_loader:
        inputs = inputs.to(device)
        outputs = best_model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        image_names.extend(img_paths)

# 将预测的标签整数映射回原始标签
label_encoder = train_dataset.le  # 从训练集获取LabelEncoder
predicted_labels = label_encoder.inverse_transform(all_preds)

# 创建提交文件
submission = pd.DataFrame({
    'image': image_names,
    'label': predicted_labels
})

# 保存为CSV文件
submission.to_csv('sample_submission.csv', index=False)
print('预测结果已保存到sample_submission.csv')