# Copyright (c) 2025 YI-AN YEH
# This project is licensed under the MIT License - see the LICENSE file for details.

In [None]:
import torch

# 檢查是否有可用的 CUDA 設備 (NVIDIA GPU)
is_cuda_available = torch.cuda.is_available()

if is_cuda_available:
    # 獲取 GPU 設備
    device = torch.device("cuda:0")
    print("GPU is available.")
    print("Device name:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("GPU not available, using CPU.")
    
print(f"Using device: {device}")

GPU is available.
Device name: NVIDIA GeForce RTX 2070 SUPER
Using device: cuda:0


In [2]:
import pandas as pd
import numpy as np
import os
import json

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image # Pillow 函式庫，用於讀取圖片
import torch.nn as nn
import torch.nn.functional as F

# --- 全域設定參數 ---
IMG_SIZE = 224
BATCH_SIZE = 32
NUM_CLASSES = 15
N_SPLITS = 5

In [3]:
# 載入我們在 Notebook 01 中切分好的資料集
train_val_df = pd.read_csv('train_val_set.csv')
test_df = pd.read_csv('test_set.csv')

# 載入標籤與索引的對應關係
with open('label_mapping.json', 'r') as f:
    label_mapping = json.load(f)

print(f"訓練驗證集資料量: {len(train_val_df)}")

訓練驗證集資料量: 16510


In [4]:
# 1. 定義資料增強與轉換
# 我們建立兩種轉換流程：一種用於訓練 (包含隨機增強)，一種用於驗證 (只有標準化)
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(IMG_SIZE), # 隨機裁切並縮放回 IMG_SIZE
        transforms.RandomHorizontalFlip(),      # 隨機水平翻轉
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1), # 隨機調整顏色
        transforms.ToTensor(),                  # 將圖片轉換為 PyTorch Tensor，並將像素值縮放到 [0, 1]
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 用 ImageNet 的平均值和標準差進行標準化
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),                 # 先縮放到 256
        transforms.CenterCrop(IMG_SIZE),        # 從中心裁切出 IMG_SIZE 的區域
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]),
}

# 2. 建立自定義的 Dataset 類別
class PlantDiseaseDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.df = dataframe
        self.transform = transform

    def __len__(self):
        # 回傳資料集的總長度
        return len(self.df)

    def __getitem__(self, idx):
        # 根據索引 (idx) 獲取一筆資料
        img_path = self.df.iloc[idx]['filepath']
        label = self.df.iloc[idx]['label_idx']
        
        # 使用 Pillow 讀取圖片
        image = Image.open(img_path).convert('RGB')
        
        # 套用指定的轉換/增強
        if self.transform:
            image = self.transform(image)
            
        return image, torch.tensor(label, dtype=torch.long)

In [5]:
import torch.nn as nn
import torch.nn.functional as F

class BaselineModel(nn.Module):
    def __init__(self, num_classes):
        # 繼承 nn.Module 的初始化
        super(BaselineModel, self).__init__()
        
        # --- 在 __init__ 中，像樂高積木一樣，先定義好模型會用到的所有 "層" ---
        
        # 特徵提取層 (Convolutional Layers)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding='same')
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding='same')
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding='same')
        
        # 分類層 (Fully Connected Layers)
        self.flatten = nn.Flatten()
        
        # 這裡的 in_features 需要手動計算
        # 原始圖片 224x224，經過 3 次 MaxPool (每次尺寸減半) -> 224 / 2 / 2 / 2 = 28
        # 所以展平前的 Tensor 維度是 (batch_size, 128, 28, 28)
        # 展平後變成 (batch_size, 128 * 28 * 28)
        self.fc1 = nn.Linear(in_features=128 * 28 * 28, out_features=512)
        self.dropout = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(in_features=512, out_features=num_classes)

    def forward(self, x):
        # --- 在 forward 函式中，定義資料 "向前傳播" 的順序 ---
        # 也就是說，資料要依序通過我們剛剛定義的那些層
        
        # Block 1
        x = self.conv1(x)
        x = F.relu(x)
        x = self.pool(x)
        
        # Block 2
        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool(x)
        
        # Block 3
        x = self.conv3(x)
        x = F.relu(x)
        x = self.pool(x)
        
        # Classification Head
        x = self.flatten(x)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)  # 註：輸出的原始分數 (logits) 直接給損失函數即可，
                         # PyTorch 的 CrossEntropyLoss 會自動幫我們做 Softmax
        return x

# 建立模型實例
baseline_model = BaselineModel(NUM_CLASSES)
# 將模型的所有參數和緩衝區移動到我們之前設定的 device (GPU) 上
baseline_model.to(device)

# 印出模型架構，確認一下
print(baseline_model)

BaselineModel(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=same)
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=same)
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=same)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=100352, out_features=512, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc2): Linear(in_features=512, out_features=15, bias=True)
)


In [6]:
from tqdm import tqdm # 引入 tqdm 來顯示一個漂亮的進度條

# 訓練一個 Epoch 的函數
def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()  # 將模型設置為訓練模式 (這對 Dropout、BatchNorm 等層很重要)
    
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    
    # 使用 tqdm 包裝 dataloader 來顯示進度條
    for inputs, labels in tqdm(dataloader, desc="Training"):
        # 1. 將資料移動到 GPU
        inputs, labels = inputs.to(device), labels.to(device)
        
        # 2. 清零梯度 (非常重要的一步)
        optimizer.zero_grad()
        
        # 3. 前向傳播
        outputs = model(inputs)
        
        # 4. 計算損失
        loss = criterion(outputs, labels)
        
        # 5. 反向傳播
        loss.backward()
        
        # 6. 更新權重
        optimizer.step()
        
        # 累計損失和正確預測數
        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        correct_predictions += torch.sum(preds == labels.data)
        total_samples += labels.size(0)
        
    epoch_loss = running_loss / total_samples
    epoch_acc = correct_predictions.double() / total_samples
    
    return epoch_loss, epoch_acc.item()


# 驗證一個 Epoch 的函數
def validate_one_epoch(model, dataloader, criterion, device):
    model.eval()  # 將模型設置為評估模式
    
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    
    # 在評估模式下，我們不需要計算梯度，可以節省計算資源
    with torch.no_grad():
        for inputs, labels in tqdm(dataloader, desc="Validating"):
            # 1. 將資料移動到 GPU
            inputs, labels = inputs.to(device), labels.to(device)
            
            # 2. 前向傳播
            outputs = model(inputs)
            
            # 3. 計算損失
            loss = criterion(outputs, labels)
            
            # 累計損失和正確預測數
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct_predictions += torch.sum(preds == labels.data)
            total_samples += labels.size(0)
            
    epoch_loss = running_loss / total_samples
    epoch_acc = correct_predictions.double() / total_samples
    
    return epoch_loss, epoch_acc.item()

In [None]:
import torch

# 1. 檢查 PyTorch 能否偵測到 CUDA (GPU)
print(f"CUDA 是否可用: {torch.cuda.is_available()}")

if torch.cuda.is_available():
    # 2. 獲取 GPU 的名稱
    print(f"目前 GPU 名稱: {torch.cuda.get_device_name(0)}")
    
    # 3. 這是 "正確" 的 device 定義方式
    device = torch.device("cuda")
else:
    print("警告: PyTorch 找不到 CUDA 設備，將使用 CPU。")
    device = torch.device("cpu")

# 4. 打印出您目前 "真正" 在使用的設備
print(f"目前程式將使用的設備: {device}")

In [None]:
from sklearn.model_selection import StratifiedKFold
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import time

# --- Heads-up: 這段程式碼會運行很長時間！ ---
# 請確保你的電腦已接上電源，並準備好讓 GPU 開始工作。
# 你可以打開工作管理員來監控 GPU 使用率。

# 建立 StratifiedKFold 物件
kfold = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=42)

# 準備 K-Fold 切分的資料
X_kfold = train_val_df['filepath']
y_kfold = train_val_df['label_idx']

# 儲存每一折的最佳驗證準確率
fold_val_accuracies = []

# --- 主迴圈開始 ---
for fold, (train_ids, val_ids) in enumerate(kfold.split(X_kfold, y_kfold)):
    # ... (前面的程式碼不變) ...

    # --- 1. 準備這一折的資料 ---
    train_df = train_val_df.iloc[train_ids]
    val_df = train_val_df.iloc[val_ids]

    train_dataset = PlantDiseaseDataset(train_df, transform=data_transforms['train'])
    val_dataset = PlantDiseaseDataset(val_df, transform=data_transforms['val'])

    # *** 核心修正點：將 num_workers 改為 0 ***
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)
    
    # --- 2. 建立全新的模型、損失函數和優化器 ---
    model = BaselineModel(NUM_CLASSES).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    
    # *** 修正點：移除 verbose=True 參數 ***
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=3)

    # --- 3. 訓練迴圈 (Epoch Loop) ---
    num_epochs = 30
    best_val_acc = 0.0
    epochs_no_improve = 0
    patience = 5

    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch + 1}/{num_epochs}")
        
        # 訓練
        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer, device)
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        
        # 驗證
        val_loss, val_acc = validate_one_epoch(model, val_loader, criterion, device)
        print(f"Validation Loss: {val_loss:.4f}, Validation Acc: {val_acc:.4f}")
        
        # 更新學習率 (ReduceLROnPlateau 預設會在觸發時打印訊息)
        scheduler.step(val_loss)
        
        # 檢查是否需要早停或儲存最佳模型
        if val_acc > best_val_acc:
            print(f"Validation accuracy improved from {best_val_acc:.4f} to {val_acc:.4f}. Saving model...")
            best_val_acc = val_acc
            epochs_no_improve = 0
            torch.save(model.state_dict(), f'baseline_model_fold_{fold+1}_best.pth')
        else:
            epochs_no_improve += 1
        
        if epochs_no_improve >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break
            
    fold_val_accuracies.append(best_val_acc)
    end_time = time.time()
    print(f"Fold {fold+1} finished in {(end_time - start_time)/60:.2f} minutes.")


# --- 4. 評估 K-Fold 交叉驗證的最終結果 ---
print("\n" + "="*50)
print("       CROSS-VALIDATION FINAL RESULTS       ")
print("="*50)

mean_accuracy = np.mean(fold_val_accuracies)
std_accuracy = np.std(fold_val_accuracies)

for i, acc in enumerate(fold_val_accuracies):
    print(f"Fold {i+1} Best Validation Accuracy: {acc:.4f}")

print("-" * 50)
print(f"基準模型 5-折交叉驗證結果")
print(f"平均驗證準確率: {mean_accuracy:.4f}")
print(f"驗證準確率標準差: {std_accuracy:.4f}")
print("-" * 50)