In [None]:
# 連接Google Drive
from google.colab import drive
drive.mount('/content/drive')

# 安裝必要套件
!pip install torch torchvision torchaudio
!pip install timm
!pip install matplotlib seaborn
!pip install scikit-learn

# 導入所有必要的函式庫
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
import timm
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import time
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, classification_report, roc_curve, auc,
    precision_recall_curve, average_precision_score, matthews_corrcoef,
    cohen_kappa_score, balanced_accuracy_score
)
from sklearn.utils.class_weight import compute_class_weight
import pandas as pd
import json
import zipfile
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

print("環境設置完成！")

# 複製並解壓資料集到本地
import shutil
import zipfile

print("正在複製資料集ZIP檔案到Colab...")
zip_source = '/content/drive/My Drive/DualStage-DefectAI/資料集/EfficientNet_dataset.zip'  # 你的ZIP檔案路徑
zip_local = '/content/EfficientNet_dataset.zip'

# 檢查是否已經解壓過
if not os.path.exists('/content/EfficientNet_dataset'):
    # 複製ZIP檔案
    shutil.copy2(zip_source, zip_local)
    print("✓ ZIP檔案複製完成")

    # 解壓縮到本地
    print("正在解壓縮資料集...")
    with zipfile.ZipFile(zip_local, 'r') as zip_ref:
        zip_ref.extractall('/content/')
    print("✓ 資料集解壓縮完成")

    # 清理ZIP檔案以節省空間
    os.remove(zip_local)
    print("✓ 已清理臨時ZIP檔案")
else:
    print("✓ 資料集已存在於本地")

# 設定本地資料集路徑
train_dir = '/content/EfficientNet_dataset/train'
val_dir = '/content/EfficientNet_dataset/valid'
test_dir = '/content/EfficientNet_dataset/test'

# 驗證路徑是否正確
print("檢查本地資料集路徑:")
for path, name in [(train_dir, '訓練'), (val_dir, '驗證'), (test_dir, '測試')]:
    if os.path.exists(path):
        print(f"✓ {name}資料夾存在: {path}")
    else:
        print(f"✗ {name}資料夾不存在: {path}")

# 詳細檢查資料集結構
def analyze_dataset(data_dir, name):
    print(f"\n{name}資料集分析:")
    print("="*50)
    if os.path.exists(data_dir):
        classes = [d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))]
        total_images = 0
        class_distribution = {}

        for cls in classes:
            cls_path = os.path.join(data_dir, cls)
            images = [f for f in os.listdir(cls_path) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff'))]
            count = len(images)
            class_distribution[cls] = count
            total_images += count
            print(f"  {cls}: {count} 張圖片")

        print(f"  總計: {total_images} 張圖片")
        print(f"  類別分布: {class_distribution}")

        # 計算類別平衡性
        if len(class_distribution) == 2:
            values = list(class_distribution.values())
            balance_ratio = min(values) / max(values) if max(values) > 0 else 0
            print(f"  類別平衡比例: {balance_ratio:.3f}")

        return class_distribution, total_images
    else:
        print(f"  資料夾不存在: {data_dir}")
        return {}, 0

# 分析所有資料集
train_dist, train_total = analyze_dataset(train_dir, "訓練")
val_dist, val_total = analyze_dataset(val_dir, "驗證")
test_dist, test_total = analyze_dataset(test_dir, "測試")

# 記憶體優化設定
import torch
import gc

os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
torch.cuda.empty_cache()
gc.collect()

# 修正後的資料前處理（減少過度增強）
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),  # 減少旋轉角度
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.05),  # 減少顏色變化
    transforms.RandomAffine(degrees=0, translate=(0.05, 0.05), scale=(0.95, 1.05)),  # 減少變形
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 建立資料集
train_dataset = ImageFolder(train_dir, transform=train_transforms)
val_dataset = ImageFolder(val_dir, transform=val_transforms)
test_dataset = ImageFolder(test_dir, transform=val_transforms)

# 建立資料載入器（適當調整批次大小）
batch_size = 24  # 適當增加批次大小
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

print(f"\n最終資料集統計:")
print(f"訓練樣本數: {len(train_dataset)}")
print(f"驗證樣本數: {len(val_dataset)}")
print(f"測試樣本數: {len(test_dataset)}")
print(f"類別數: {len(train_dataset.classes)}")
print(f"類別名稱: {train_dataset.classes}")
print(f"批次大小: {batch_size}")

# 建立模型
def create_efficientnetv2_model(num_classes, model_name='tf_efficientnetv2_s.in1k'):
    model = timm.create_model(model_name, pretrained=True, num_classes=num_classes)
    return model

def count_parameters(model):
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    return total_params, trainable_params

num_classes = len(train_dataset.classes)
model = create_efficientnetv2_model(num_classes, 'tf_efficientnetv2_s.in1k')

# 模型分析
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

total_params, trainable_params = count_parameters(model)
print(f"\n模型架構分析:")
print(f"模型: EfficientNetV2-S")
print(f"使用設備: {device}")
print(f"總參數數量: {total_params:,}")
print(f"可訓練參數: {trainable_params:,}")
print(f"類別映射: {dict(enumerate(train_dataset.classes))}")

# 早停機制
class EarlyStopping:
    def __init__(self, patience=7, min_delta=0, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.counter = 0
        self.best_loss = float('inf')
        self.best_weights = None

    def __call__(self, val_loss, model):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            if self.restore_best_weights:
                self.best_weights = model.state_dict().copy()
        else:
            self.counter += 1

        if self.counter >= self.patience and self.restore_best_weights:
            model.load_state_dict(self.best_weights)

        return self.counter >= self.patience

# 修正後的指標計算函數
def calculate_comprehensive_metrics(y_true, y_pred, y_prob=None):
    """計算全面的分類指標"""
    metrics = {}

    # 基本指標
    metrics['accuracy'] = accuracy_score(y_true, y_pred)
    metrics['balanced_accuracy'] = balanced_accuracy_score(y_true, y_pred)

    # 二分類特殊處理
    if len(np.unique(y_true)) == 2:
        # 假設類別0是異物(F)，類別1是非異物(Not_F)
        metrics['precision'] = precision_score(y_true, y_pred, pos_label=0, zero_division=0)
        metrics['recall'] = recall_score(y_true, y_pred, pos_label=0, zero_division=0)
        metrics['f1'] = f1_score(y_true, y_pred, pos_label=0, zero_division=0)

        # 混淆矩陣元素
        cm = confusion_matrix(y_true, y_pred)
        if cm.shape == (2, 2):
            tn, fp, fn, tp = cm.ravel()
            metrics['true_negatives'] = tn
            metrics['false_positives'] = fp
            metrics['false_negatives'] = fn
            metrics['true_positives'] = tp

            # 特異性和敏感性
            metrics['specificity'] = tn / (tn + fp) if (tn + fp) > 0 else 0
            metrics['sensitivity'] = tp / (tp + fn) if (tp + fn) > 0 else 0

        # 其他指標
        metrics['matthews_corrcoef'] = matthews_corrcoef(y_true, y_pred)
        metrics['cohen_kappa'] = cohen_kappa_score(y_true, y_pred)

        # 修正ROC-AUC和PR-AUC計算
        if y_prob is not None:
            y_prob_array = np.array(y_prob)
            if y_prob_array.ndim == 2 and y_prob_array.shape[1] == 2:
                try:
                    # 使用類別0（異物）的概率
                    fpr, tpr, _ = roc_curve(y_true, y_prob_array[:, 0], pos_label=0)
                    metrics['roc_auc'] = auc(fpr, tpr)

                    # PR-AUC
                    precision_curve, recall_curve, _ = precision_recall_curve(
                        y_true, y_prob_array[:, 0], pos_label=0
                    )
                    metrics['pr_auc'] = auc(recall_curve, precision_curve)
                    metrics['average_precision'] = average_precision_score(
                        y_true, y_prob_array[:, 0], pos_label=0
                    )
                except:
                    metrics['roc_auc'] = 0
                    metrics['pr_auc'] = 0
                    metrics['average_precision'] = 0
    else:
        # 多分類
        metrics['precision'] = precision_score(y_true, y_pred, average='weighted', zero_division=0)
        metrics['recall'] = recall_score(y_true, y_pred, average='weighted', zero_division=0)
        metrics['f1'] = f1_score(y_true, y_pred, average='weighted', zero_division=0)
        metrics['matthews_corrcoef'] = matthews_corrcoef(y_true, y_pred)
        metrics['cohen_kappa'] = cohen_kappa_score(y_true, y_pred)

    # 每個類別的指標
    try:
        metrics['precision_per_class'] = precision_score(y_true, y_pred, average=None, zero_division=0)
        metrics['recall_per_class'] = recall_score(y_true, y_pred, average=None, zero_division=0)
        metrics['f1_per_class'] = f1_score(y_true, y_pred, average=None, zero_division=0)
    except:
        metrics['precision_per_class'] = np.array([0])
        metrics['recall_per_class'] = np.array([0])
        metrics['f1_per_class'] = np.array([0])

    return metrics

# 修正後的訓練函數
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    all_preds = []
    all_labels = []
    all_probs = []

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True)

        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()

        # 添加梯度裁剪
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()

        running_loss += loss.item()

        # 收集預測結果（避免梯度計算）
        with torch.no_grad():
            probs = torch.softmax(output, dim=1)
            _, predicted = torch.max(output, 1)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(target.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

        # 清理記憶體
        del data, target, output, loss
        if batch_idx % 10 == 0:
            torch.cuda.empty_cache()

        if batch_idx % 20 == 0:
            print(f'  Batch {batch_idx}/{len(train_loader)}, Loss: {running_loss/(batch_idx+1):.4f}')

    epoch_loss = running_loss / len(train_loader)
    metrics = calculate_comprehensive_metrics(all_labels, all_preds, all_probs)

    return epoch_loss, metrics, all_labels, all_preds, all_probs

# 修正後的驗證函數
def validate_epoch(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    all_preds = []
    all_labels = []
    all_probs = []
    # 添加時間統計
    total_inference_time = 0.0
    total_samples = 0

    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True)
            # 測量推論時間
            start_time = time.time()
            output = model(data)
            inference_time = time.time() - start_time

            total_inference_time += inference_time
            total_samples += data.size(0)
            loss = criterion(output, target)

            running_loss += loss.item()

            probs = torch.softmax(output, dim=1)
            _, predicted = torch.max(output, 1)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(target.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

            # 清理記憶體
            del data, target, output, loss
            if batch_idx % 10 == 0:
                torch.cuda.empty_cache()

    epoch_loss = running_loss / len(val_loader)
    metrics = calculate_comprehensive_metrics(all_labels, all_preds, all_probs)

    # 計算速度統計
    avg_inference_time = total_inference_time / len(val_loader)
    fps = total_samples / total_inference_time
    ms_per_image = (total_inference_time / total_samples) * 1000

    speed_stats = {
        'total_inference_time': total_inference_time,
        'avg_batch_time': avg_inference_time,
        'fps': fps,
        'ms_per_image': ms_per_image,
        'total_samples': total_samples
    }


    return epoch_loss, metrics, all_labels, all_preds, all_probs, speed_stats

# 修正後的繪圖函數
def plot_roc_pr_curves_multiclass(test_labels, test_probs, class_names, save_path):
    """多分類ROC和PR曲線"""
    plt.figure(figsize=(15, 6))

    test_probs_array = np.array(test_probs)
    n_classes = len(class_names)

    # 為每個類別計算ROC曲線
    plt.subplot(1, 2, 1)
    for i in range(n_classes):
        # 將多分類問題轉為一對多的二分類
        y_binary = (np.array(test_labels) == i).astype(int)
        if test_probs_array.ndim == 2 and test_probs_array.shape[1] == n_classes:
            fpr, tpr, _ = roc_curve(y_binary, test_probs_array[:, i])
            roc_auc = auc(fpr, tpr)
            plt.plot(fpr, tpr, linewidth=2,
                    label=f'{class_names[i]} (AUC = {roc_auc:.3f})')

    plt.plot([0, 1], [0, 1], 'k--', alpha=0.5)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Multi-class ROC Curves')
    plt.legend()
    plt.grid(True, alpha=0.3)

    # 為每個類別計算PR曲線
    plt.subplot(1, 2, 2)
    for i in range(n_classes):
        y_binary = (np.array(test_labels) == i).astype(int)
        if test_probs_array.ndim == 2 and test_probs_array.shape[1] == n_classes:
            precision_curve, recall_curve, _ = precision_recall_curve(
                y_binary, test_probs_array[:, i]
            )
            avg_precision = average_precision_score(y_binary, test_probs_array[:, i])
            plt.plot(recall_curve, precision_curve, linewidth=2,
                    label=f'{class_names[i]} (AP = {avg_precision:.3f})')

    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Multi-class Precision-Recall Curves')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.close()


# 修正後的混淆矩陣繪圖
def plot_confusion_matrix(y_true, y_pred, class_names, save_path):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))

    # 計算百分比
    cm_percent = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100

    # 創建標籤
    labels = []
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            labels.append(f'{cm[i,j]}\n({cm_percent[i,j]:.1f}%)')

    labels = np.asarray(labels).reshape(cm.shape)

    sns.heatmap(cm, annot=labels, fmt='', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.close()

# 模型保存函數
def save_model(model, optimizer, scheduler, epoch, best_metrics, save_path):
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'best_metrics': best_metrics,
        'model_architecture': 'tf_efficientnetv2_s.in1k',
        'num_classes': len(train_dataset.classes),
        'class_names': train_dataset.classes,
        'input_size': (224, 224),
        'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }, save_path)

# 訓練設置
# 計算類別權重
class_weights = compute_class_weight(
    'balanced',
    classes=np.unique(train_dataset.targets),
    y=train_dataset.targets
)
class_weights = torch.FloatTensor(class_weights).to(device)
print(f"類別權重: {dict(zip(train_dataset.classes, class_weights.cpu().numpy()))}")

# 使用加權損失函數
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)

# 修正後的學習率調度器
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=5, verbose=True
)

# 早停機制
early_stopping = EarlyStopping(patience=10, min_delta=0.001)

# 訓練參數
num_epochs = 30
best_val_acc = 0.0
best_val_f1 = 0.0
best_model_path = '/tmp/best_model.pth'

# 詳細指標記錄
training_history = {
    'train_loss': [], 'val_loss': [],
    'train_acc': [], 'val_acc': [],
    'train_balanced_acc': [], 'val_balanced_acc': [],
    'train_precision': [], 'val_precision': [],
    'train_recall': [], 'val_recall': [],
    'train_f1': [], 'val_f1': [],
    'train_specificity': [], 'val_specificity': [],
    'train_sensitivity': [], 'val_sensitivity': [],
    'train_matthews': [], 'val_matthews': [],
    'train_kappa': [], 'val_kappa': [],
    'train_roc_auc': [], 'val_roc_auc': [],
    'train_pr_auc': [], 'val_pr_auc': [],
    'learning_rate': [],
    'epoch_times': []
}

print("開始訓練異物檢測模型...")
print("="*70)
start_time = time.time()

# 修正後的訓練循環
for epoch in range(num_epochs):
    epoch_start_time = time.time()
    print(f'\nEpoch {epoch+1}/{num_epochs}')
    print('-' * 70)

    # 訓練
    train_loss, train_metrics, train_labels, train_preds, train_probs = train_epoch(
        model, train_loader, criterion, optimizer, device)

    # 驗證階段
    val_loss, val_metrics, val_labels, val_preds, val_probs, val_speed = validate_epoch(
    model, val_loader, criterion, device)

    # 更新學習率
    current_lr = optimizer.param_groups[0]['lr']
    scheduler.step(val_loss)

    epoch_time = time.time() - epoch_start_time

    # 記錄所有指標
    training_history['train_loss'].append(train_loss)
    training_history['val_loss'].append(val_loss)
    training_history['train_acc'].append(train_metrics['accuracy'])
    training_history['val_acc'].append(val_metrics['accuracy'])
    training_history['train_balanced_acc'].append(train_metrics['balanced_accuracy'])
    training_history['val_balanced_acc'].append(val_metrics['balanced_accuracy'])
    training_history['train_precision'].append(train_metrics['precision'])
    training_history['val_precision'].append(val_metrics['precision'])
    training_history['train_recall'].append(train_metrics['recall'])
    training_history['val_recall'].append(val_metrics['recall'])
    training_history['train_f1'].append(train_metrics['f1'])
    training_history['val_f1'].append(val_metrics['f1'])
    training_history['train_specificity'].append(train_metrics.get('specificity', 0))
    training_history['val_specificity'].append(val_metrics.get('specificity', 0))
    training_history['train_sensitivity'].append(train_metrics.get('sensitivity', 0))
    training_history['val_sensitivity'].append(val_metrics.get('sensitivity', 0))
    training_history['train_matthews'].append(train_metrics['matthews_corrcoef'])
    training_history['val_matthews'].append(val_metrics['matthews_corrcoef'])
    training_history['train_kappa'].append(train_metrics['cohen_kappa'])
    training_history['val_kappa'].append(val_metrics['cohen_kappa'])
    training_history['train_roc_auc'].append(train_metrics.get('roc_auc', 0))
    training_history['val_roc_auc'].append(val_metrics.get('roc_auc', 0))
    training_history['train_pr_auc'].append(train_metrics.get('pr_auc', 0))
    training_history['val_pr_auc'].append(val_metrics.get('pr_auc', 0))
    training_history['learning_rate'].append(current_lr)
    training_history['epoch_times'].append(epoch_time)

    # 詳細顯示結果
    print(f'訓練指標:')
    print(f'  Loss: {train_loss:.4f}, Acc: {train_metrics["accuracy"]:.4f}, F1: {train_metrics["f1"]:.4f}')
    print(f'  Precision: {train_metrics["precision"]:.4f}, Recall: {train_metrics["recall"]:.4f}')
    print(f'  ROC-AUC: {train_metrics.get("roc_auc", 0):.4f}')

    print(f'驗證指標:')
    print(f'  Loss: {val_loss:.4f}, Acc: {val_metrics["accuracy"]:.4f}, F1: {val_metrics["f1"]:.4f}')
    print(f'  Precision: {val_metrics["precision"]:.4f}, Recall: {val_metrics["recall"]:.4f}')
    print(f'  ROC-AUC: {val_metrics.get("roc_auc", 0):.4f}')

    print(f'學習率: {current_lr:.6f}, 時間: {epoch_time:.2f}秒')

    # 保存最佳模型（基於F1分數）
    if val_metrics['f1'] > best_val_f1:
        best_val_f1 = val_metrics['f1']
        best_val_acc = val_metrics['accuracy']
        save_model(model, optimizer, scheduler, epoch,
                  {'f1': best_val_f1, 'accuracy': best_val_acc},
                  best_model_path)
        print(f'★ 新的最佳模型! F1: {val_metrics["f1"]:.4f}, Acc: {val_metrics["accuracy"]:.4f}')

    # 早停檢查
    if early_stopping(val_loss, model):
        print(f"Early stopping triggered at epoch {epoch+1}")
        break

    # 清理記憶體
    torch.cuda.empty_cache()
    gc.collect()

total_time = time.time() - start_time
print(f'\n訓練完成! 總時間: {total_time/60:.2f} 分鐘')
print(f'最佳驗證F1分數: {best_val_f1:.4f}')
print(f'最佳驗證準確率: {best_val_acc:.4f}')

# 載入最佳模型進行測試
if os.path.exists(best_model_path):
    checkpoint = torch.load(best_model_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    print("已載入最佳模型進行測試")

# 測試集評估
print("\n進行測試集評估...")
# 測試階段
test_loss, test_metrics, test_labels, test_preds, test_probs, test_speed = validate_epoch(
    model, test_loader, criterion, device)

print(f"\n測試集最終結果:")
print("="*50)
print(f"Accuracy: {test_metrics['accuracy']:.4f}")
print(f"Balanced Accuracy: {test_metrics['balanced_accuracy']:.4f}")
print(f"Precision: {test_metrics['precision']:.4f}")
print(f"Recall: {test_metrics['recall']:.4f}")
print(f"F1-Score: {test_metrics['f1']:.4f}")
print(f"Specificity: {test_metrics.get('specificity', 0):.4f}")
print(f"Sensitivity: {test_metrics.get('sensitivity', 0):.4f}")


# 添加速度統計
print(f"\n推論速度統計:")
print(f"平均FPS: {test_speed['fps']:.2f}")
print(f"每張圖片推論時間: {test_speed['ms_per_image']:.2f} ms")
print(f"總推論時間: {test_speed['total_inference_time']:.4f} 秒")
print(f"總測試樣本數: {test_speed['total_samples']}")

Mounted at /content/drive
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading

model.safetensors:   0%|          | 0.00/86.5M [00:00<?, ?B/s]


模型架構分析:
模型: EfficientNetV2-S
使用設備: cuda
總參數數量: 20,181,331
可訓練參數: 20,181,331
類別映射: {0: 'F', 1: 'S', 2: 'V'}
類別權重: {'F': np.float32(1.0161439), 'S': np.float32(1.1649921), 'V': np.float32(0.8639216)}
開始訓練異物檢測模型...

Epoch 1/30
----------------------------------------------------------------------
  Batch 0/276, Loss: 2.3218
  Batch 20/276, Loss: 1.2437
  Batch 40/276, Loss: 1.0537
  Batch 60/276, Loss: 0.8364
  Batch 80/276, Loss: 0.7215
  Batch 100/276, Loss: 0.6526
  Batch 120/276, Loss: 0.5816
  Batch 140/276, Loss: 0.5453
  Batch 160/276, Loss: 0.4981
  Batch 180/276, Loss: 0.4576
  Batch 200/276, Loss: 0.4308
  Batch 220/276, Loss: 0.4107
  Batch 240/276, Loss: 0.3835
  Batch 260/276, Loss: 0.3694
訓練指標:
  Loss: 0.3582, Acc: 0.9098, F1: 0.9098
  Precision: 0.9098, Recall: 0.9098
  ROC-AUC: 0.0000
驗證指標:
  Loss: 0.1432, Acc: 0.9560, F1: 0.9562
  Precision: 0.9576, Recall: 0.9560
  ROC-AUC: 0.0000
學習率: 0.001000, 時間: 100.28秒
★ 新的最佳模型! F1: 0.9562, Acc: 0.9560

Epoch 2/30
-----------------

打包

In [None]:
# 模型打包與保存程式
import os
import json
import zipfile
import shutil
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# 創建結果目錄
results_dir = '/content/efficientnet_results'
os.makedirs(results_dir, exist_ok=True)

# 1. 保存訓練歷史為JSON和CSV
print("正在保存訓練歷史...")
history_json_path = os.path.join(results_dir, 'training_history.json')
history_csv_path = os.path.join(results_dir, 'training_history.csv')

# 保存為JSON
with open(history_json_path, 'w', encoding='utf-8') as f:
    json.dump(training_history, f, ensure_ascii=False, indent=2, default=str)

# 保存為CSV
df_history = pd.DataFrame(training_history)
df_history.to_csv(history_csv_path, index=False, encoding='utf-8')

# 2. 保存最終測試結果
print("正在保存測試結果...")
test_results = {
    'model_info': {
        'model_name': 'EfficientNetV2-S',
        'architecture': 'tf_efficientnetv2_s.in1k',
        'num_classes': len(train_dataset.classes),
        'class_names': train_dataset.classes,
        'input_size': [224, 224],
        'batch_size': batch_size,
        'total_epochs': len(training_history['train_loss']),
        'best_epoch': np.argmax(training_history['val_f1']) + 1,
        'training_time_minutes': total_time/60,
        'total_parameters': total_params,
        'trainable_parameters': trainable_params
    },
    'dataset_info': {
        'train_samples': len(train_dataset),
        'val_samples': len(val_dataset),
        'test_samples': len(test_dataset),
        'train_distribution': train_dist,
        'val_distribution': val_dist,
        'test_distribution': test_dist
    },
    'final_test_metrics': {
        'accuracy': float(test_metrics['accuracy']),
        'balanced_accuracy': float(test_metrics['balanced_accuracy']),
        'precision': float(test_metrics['precision']),
        'recall': float(test_metrics['recall']),
        'f1_score': float(test_metrics['f1']),
        'specificity': float(test_metrics.get('specificity', 0)),
        'sensitivity': float(test_metrics.get('sensitivity', 0)),
        'matthews_corrcoef': float(test_metrics['matthews_corrcoef']),
        'cohen_kappa': float(test_metrics['cohen_kappa']),
        'roc_auc': float(test_metrics.get('roc_auc', 0)),
        'pr_auc': float(test_metrics.get('pr_auc', 0)),
        'test_loss': float(test_loss)
    },
    'speed_statistics': {
    'fps': float(test_speed['fps']),
    'ms_per_image': float(test_speed['ms_per_image']),
    'total_inference_time': float(test_speed['total_inference_time']),
    'avg_batch_time': float(test_speed['avg_batch_time']),
    'total_samples': int(test_speed['total_samples'])
    },
    'confusion_matrix': {
        'matrix': confusion_matrix(test_labels, test_preds).tolist(),
        'true_labels': test_labels,
        'predicted_labels': test_preds
    },
    'per_class_metrics': {
        'precision': test_metrics['precision_per_class'].tolist(),
        'recall': test_metrics['recall_per_class'].tolist(),
        'f1': test_metrics['f1_per_class'].tolist()
    },
    'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}

test_results_path = os.path.join(results_dir, 'test_results.json')
with open(test_results_path, 'w', encoding='utf-8') as f:
    json.dump(test_results, f, ensure_ascii=False, indent=2, default=str)

# 3. 創建詳細的分類報告
print("正在生成分類報告...")
from sklearn.metrics import classification_report
class_report = classification_report(test_labels, test_preds,
                                   target_names=train_dataset.classes,
                                   output_dict=True)
class_report_path = os.path.join(results_dir, 'classification_report.json')
with open(class_report_path, 'w', encoding='utf-8') as f:
    json.dump(class_report, f, ensure_ascii=False, indent=2, default=str)

# 4. 生成訓練過程視覺化圖表
print("正在生成視覺化圖表...")

# 訓練過程曲線
plt.figure(figsize=(20, 12))

# 損失曲線
plt.subplot(3, 4, 1)
plt.plot(training_history['train_loss'], label='Training Loss')
plt.plot(training_history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)

# 準確率曲線
plt.subplot(3, 4, 2)
plt.plot(training_history['train_acc'], label='Training Accuracy')
plt.plot(training_history['val_acc'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True, alpha=0.3)

# F1分數曲線
plt.subplot(3, 4, 3)
plt.plot(training_history['train_f1'], label='Training F1')
plt.plot(training_history['val_f1'], label='Validation F1')
plt.title('Training and Validation F1-Score')
plt.xlabel('Epoch')
plt.ylabel('F1-Score')
plt.legend()
plt.grid(True, alpha=0.3)

# 精確度曲線
plt.subplot(3, 4, 4)
plt.plot(training_history['train_precision'], label='Training Precision')
plt.plot(training_history['val_precision'], label='Validation Precision')
plt.title('Training and Validation Precision')
plt.xlabel('Epoch')
plt.ylabel('Precision')
plt.legend()
plt.grid(True, alpha=0.3)

# 召回率曲線
plt.subplot(3, 4, 5)
plt.plot(training_history['train_recall'], label='Training Recall')
plt.plot(training_history['val_recall'], label='Validation Recall')
plt.title('Training and Validation Recall')
plt.xlabel('Epoch')
plt.ylabel('Recall')
plt.legend()
plt.grid(True, alpha=0.3)

# 平衡準確率曲線
plt.subplot(3, 4, 6)
plt.plot(training_history['train_balanced_acc'], label='Training Balanced Acc')
plt.plot(training_history['val_balanced_acc'], label='Validation Balanced Acc')
plt.title('Training and Validation Balanced Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Balanced Accuracy')
plt.legend()
plt.grid(True, alpha=0.3)

# ROC-AUC曲線
plt.subplot(3, 4, 7)
plt.plot(training_history['train_roc_auc'], label='Training ROC-AUC')
plt.plot(training_history['val_roc_auc'], label='Validation ROC-AUC')
plt.title('Training and Validation ROC-AUC')
plt.xlabel('Epoch')
plt.ylabel('ROC-AUC')
plt.legend()
plt.grid(True, alpha=0.3)

# Matthews相關係數曲線
plt.subplot(3, 4, 8)
plt.plot(training_history['train_matthews'], label='Training Matthews')
plt.plot(training_history['val_matthews'], label='Validation Matthews')
plt.title('Training and Validation Matthews Correlation')
plt.xlabel('Epoch')
plt.ylabel('Matthews Correlation')
plt.legend()
plt.grid(True, alpha=0.3)

# 學習率曲線
plt.subplot(3, 4, 9)
plt.plot(training_history['learning_rate'], label='Learning Rate')
plt.title('Learning Rate Schedule')
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')
plt.legend()
plt.grid(True, alpha=0.3)
plt.yscale('log')

# 每個epoch的時間
plt.subplot(3, 4, 10)
plt.plot(training_history['epoch_times'], label='Epoch Time')
plt.title('Training Time per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Time (seconds)')
plt.legend()
plt.grid(True, alpha=0.3)

# 特異性和敏感性
plt.subplot(3, 4, 11)
plt.plot(training_history['train_specificity'], label='Training Specificity')
plt.plot(training_history['val_specificity'], label='Validation Specificity')
plt.plot(training_history['train_sensitivity'], label='Training Sensitivity')
plt.plot(training_history['val_sensitivity'], label='Validation Sensitivity')
plt.title('Specificity and Sensitivity')
plt.xlabel('Epoch')
plt.ylabel('Score')
plt.legend()
plt.grid(True, alpha=0.3)

# Cohen's Kappa
plt.subplot(3, 4, 12)
plt.plot(training_history['train_kappa'], label='Training Kappa')
plt.plot(training_history['val_kappa'], label='Validation Kappa')
plt.title('Cohen\'s Kappa Score')
plt.xlabel('Epoch')
plt.ylabel('Kappa')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
training_curves_path = os.path.join(results_dir, 'training_curves.png')
plt.savefig(training_curves_path, dpi=300, bbox_inches='tight')
plt.close()

# 5. 生成混淆矩陣圖
confusion_matrix_path = os.path.join(results_dir, 'confusion_matrix.png')
plot_confusion_matrix(test_labels, test_preds, train_dataset.classes, confusion_matrix_path)

# 6. 生成ROC和PR曲線
roc_pr_curves_path = os.path.join(results_dir, 'roc_pr_curves.png')
plot_roc_pr_curves_multiclass(test_labels, test_probs, train_dataset.classes, roc_pr_curves_path)


# 7. 生成每類別性能柱狀圖
plt.figure(figsize=(15, 10))

# 每類別精確度
plt.subplot(2, 2, 1)
plt.bar(train_dataset.classes, test_metrics['precision_per_class'])
plt.title('Precision per Class')
plt.ylabel('Precision')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)

# 每類別召回率
plt.subplot(2, 2, 2)
plt.bar(train_dataset.classes, test_metrics['recall_per_class'])
plt.title('Recall per Class')
plt.ylabel('Recall')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)

# 每類別F1分數
plt.subplot(2, 2, 3)
plt.bar(train_dataset.classes, test_metrics['f1_per_class'])
plt.title('F1-Score per Class')
plt.ylabel('F1-Score')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)

# 數據分布
plt.subplot(2, 2, 4)
class_counts = [train_dist.get(cls, 0) for cls in train_dataset.classes]
plt.bar(train_dataset.classes, class_counts)
plt.title('Training Data Distribution')
plt.ylabel('Number of Samples')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)

plt.tight_layout()
per_class_metrics_path = os.path.join(results_dir, 'per_class_metrics.png')
plt.savefig(per_class_metrics_path, dpi=300, bbox_inches='tight')
plt.close()

# 8. 複製最佳模型到結果目錄
final_model_path = os.path.join(results_dir, 'best_model.pth')
if os.path.exists(best_model_path):
    shutil.copy2(best_model_path, final_model_path)
    print("✓ 最佳模型已複製")

# 9. 生成README文件
readme_content = f"""# EfficientNetV2-S 異物檢測模型訓練結果

## 模型資訊
- **模型架構**: EfficientNetV2-S (tf_efficientnetv2_s.in1k)
- **類別數**: {len(train_dataset.classes)}
- **類別名稱**: {', '.join(train_dataset.classes)}
- **輸入尺寸**: 224x224
- **總參數數**: {total_params:,}
- **可訓練參數**: {trainable_params:,}

## 資料集資訊
- **訓練樣本數**: {len(train_dataset)}
- **驗證樣本數**: {len(val_dataset)}
- **測試樣本數**: {len(test_dataset)}

## 訓練設定
- **批次大小**: {batch_size}
- **總訓練時間**: {total_time/60:.2f} 分鐘
- **最佳epoch**: {np.argmax(training_history['val_f1']) + 1}
- **優化器**: AdamW
- **學習率調度**: ReduceLROnPlateau
- **早停機制**: 啟用 (patience=10)

## 最終測試結果
- **準確率**: {test_metrics['accuracy']:.4f}
- **平衡準確率**: {test_metrics['balanced_accuracy']:.4f}
- **精確度**: {test_metrics['precision']:.4f}
- **召回率**: {test_metrics['recall']:.4f}
- **F1分數**: {test_metrics['f1']:.4f}
- **特異性**: {test_metrics.get('specificity', 0):.4f}
- **敏感性**: {test_metrics.get('sensitivity', 0):.4f}
- **Matthews相關係數**: {test_metrics['matthews_corrcoef']:.4f}
- **Cohen's Kappa**: {test_metrics['cohen_kappa']:.4f}
- **ROC-AUC**: {test_metrics.get('roc_auc', 0):.4f}
- **PR-AUC**: {test_metrics.get('pr_auc', 0):.4f}
## 推論速度
- **平均FPS**: {test_speed['fps']:.2f}
- **每張圖片推論時間**: {test_speed['ms_per_image']:.2f} ms
- **總推論時間**: {test_speed['total_inference_time']:.4f} 秒

## 檔案說明
- `best_model.pth`: 最佳模型權重檔案
- `training_history.json`: 完整訓練歷史 (JSON格式)
- `training_history.csv`: 完整訓練歷史 (CSV格式)
- `test_results.json`: 詳細測試結果
- `classification_report.json`: 分類報告
- `training_curves.png`: 訓練過程曲線圖
- `confusion_matrix.png`: 混淆矩陣圖
- `roc_pr_curves.png`: ROC和PR曲線圖
- `per_class_metrics.png`: 每類別性能圖表

## 使用方法
```python
import torch
import timm

# 載入模型
checkpoint = torch.load('best_model.pth')
model = timm.create_model('tf_efficientnetv2_s.in1k', pretrained=False, num_classes={len(train_dataset.classes)})
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

# 進行推論
# ... (預處理圖像)
# output = model(input_tensor)
```

生成時間: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
"""

readme_path = os.path.join(results_dir, 'README.md')
with open(readme_path, 'w', encoding='utf-8') as f:
    f.write(readme_content)

# 10. 創建ZIP檔案
print("正在創建ZIP檔案...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# 定義 ZIP 檔案的目標目錄路徑
zip_output_dir = os.path.join('/content/', 'efficientnetV2')
# 確保目標目錄存在，如果不存在則創建
os.makedirs(zip_output_dir, exist_ok=True)

# 組合完整的 ZIP 檔名和路徑
zip_filename = f'EfficientNetV2_ForeignObject_Detection_{timestamp}.zip'
zip_path = os.path.join(zip_output_dir, zip_filename)

with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
    for root, dirs, files in os.walk(results_dir):
        for file in files:
            file_path = os.path.join(root, file)
            arc_name = os.path.relpath(file_path, results_dir)
            zipf.write(file_path, arc_name)
            print(f"  已添加: {arc_name}")

print(f"✓ ZIP檔案已創建: {zip_path}")
print(f"檔案大小: {os.path.getsize(zip_path) / (1024*1024):.2f} MB")

# 11. 上傳到Google Drive
print("正在上傳到Google Drive...")
drive_save_path = f'/content/drive/My Drive/DualStage-DefectAI/輸出/EfficientNetv2s輸出/{zip_filename}'

try:
    shutil.copy2(zip_path, drive_save_path)
    print(f"✓ 檔案已成功上傳到Google Drive: {drive_save_path}")
except Exception as e:
    print(f"✗ 上傳失敗: {e}")
    print("請手動將檔案複製到Google Drive")

# 12. 顯示檔案清單
print(f"\n檔案清單:")
print("="*50)
for root, dirs, files in os.walk(results_dir):
    for file in files:
        file_path = os.path.join(root, file)
        file_size = os.path.getsize(file_path)
        print(f"{file:<30} {file_size:>10} bytes")

print(f"\n所有檔案已打包完成！")
print(f"ZIP檔案位置: {zip_path}")
print(f"Google Drive位置: {drive_save_path}")
print(f"結果目錄: {results_dir}")

# 13. 清理臨時檔案
print("\n正在清理臨時檔案...")
if os.path.exists(best_model_path):
    os.remove(best_model_path)
    print("✓ 臨時模型檔案已清理")

print("✓ 所有操作完成！")

正在保存訓練歷史...
正在保存測試結果...
正在生成分類報告...
正在生成視覺化圖表...
✓ 最佳模型已複製
正在創建ZIP檔案...
  已添加: README.md
  已添加: per_class_metrics.png
  已添加: training_history.json
  已添加: test_results.json
  已添加: training_curves.png
  已添加: training_history.csv
  已添加: roc_pr_curves.png
  已添加: classification_report.json
  已添加: confusion_matrix.png
  已添加: best_model.pth
✓ ZIP檔案已創建: /content/efficientnetV2/EfficientNetV2_ForeignObject_Detection_20250725_075942.zip
檔案大小: 217.19 MB
正在上傳到Google Drive...
✓ 檔案已成功上傳到Google Drive: /content/drive/My Drive/暑評yolotla/EfficientNetV2_ForeignObject_Detection_20250725_075942.zip

檔案清單:
README.md                            1812 bytes
per_class_metrics.png              183236 bytes
training_history.json               12044 bytes
test_results.json                  104570 bytes
training_curves.png               1096480 bytes
training_history.csv                 8636 bytes
roc_pr_curves.png                  177441 bytes
classification_report.json            695 bytes
confusion_matrix.png    