# VGG16 Cats vs Dogs (PyTorch, Colab-ready)

這份 Notebook 會：
1. 安裝必要套件
2. 掛載 Google Drive
3. 使用你在 Drive 中的資料夾結構 `data/train|validation|test`
4. 以 VGG16（ImageNet 預訓練）訓練、驗證、（可選）測試
5. 存下最佳模型到 `output_vgg16/best.pth`

資料夾結構應長這樣：
```
data/
├─ train/
│  ├─ cats/
│  └─ dogs/
├─ validation/
│  ├─ cats/
│  └─ dogs/
└─ test/
   ├─ cats/
   └─ dogs/
```


In [None]:
# 安裝 PyTorch 與 torchvision（Colab 通常已內建，可視情況重新安裝）
!pip -q install torch torchvision

In [None]:
# 掛載 Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 設定你的資料根目錄（請確認此路徑存在）
DATA_ROOT = "/content/drive/MyDrive/data"  # ← 如果你的資料在其他資料夾，改這行
OUT_DIR   = "/content/drive/MyDrive/output_vgg16"  # 模型與日誌輸出位置
IMG_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 10
FREEZE_BACKBONE = True   # 先凍結 VGG features 微調分類器；想全訓練就改為 False
USE_AMP = True           # 啟用混合精度（需要 GPU）
LR = 1e-4
WEIGHT_DECAY = 1e-4
DROPOUT_P = 0.5
NUM_WORKERS = 2          # Colab 可設 2~4；若遇到問題可設 0
EARLY_STOP = 0           # >0 時啟用 early stop；例如 5
MAX_GRAD_NORM = 0.0      # >0 啟用梯度裁切
SEED = 42

In [None]:
# 🔧 核心訓練與評估程式（等同於前面給你的 train_vgg16_catsdogs.py 的功能）
import os
import random
import time
from pathlib import Path
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms, models

def set_seed(seed: int):
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def build_transforms(img_size: int):
    imagenet_mean = [0.485, 0.456, 0.406]
    imagenet_std  = [0.229, 0.224, 0.225]
    train_tf = transforms.Compose([
        transforms.Resize(int(img_size * 1.15)),
        transforms.RandomResizedCrop(img_size),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(imagenet_mean, imagenet_std),
    ])
    eval_tf = transforms.Compose([
        transforms.Resize(int(img_size * 1.15)),
        transforms.CenterCrop(img_size),
        transforms.ToTensor(),
        transforms.Normalize(imagenet_mean, imagenet_std),
    ])
    return train_tf, eval_tf

def create_dataloaders_with_val(data_root: str, img_size: int, batch_size: int,
                               num_workers: int, seed: int):
    # 專為已存在 train/ validation/ test/ 的結構
    train_tf, eval_tf = build_transforms(img_size)
    root = Path(data_root)
    train_ds = datasets.ImageFolder(root / "train", transform=train_tf)
    val_ds   = datasets.ImageFolder(root / "validation", transform=eval_tf)
    test_ds  = None
    if (root / "test").exists():
        test_ds = datasets.ImageFolder(root / "test", transform=eval_tf)
    class_names = train_ds.classes
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,
                              num_workers=num_workers, pin_memory=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False,
                            num_workers=num_workers, pin_memory=True)
    test_loader = None
    if test_ds is not None:
        test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False,
                                 num_workers=num_workers, pin_memory=True)
    return train_loader, val_loader, test_loader, class_names

def build_model(num_classes: int, freeze_backbone: bool, dropout_p: float):
    vgg = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)
    if freeze_backbone:
        for p in vgg.features.parameters():
            p.requires_grad = False
    in_features = vgg.classifier[6].in_features
    new_classifier = list(vgg.classifier)
    if isinstance(new_classifier[5], nn.Dropout):
        new_classifier[5] = nn.Dropout(p=dropout_p)
    new_classifier[6] = nn.Linear(in_features, num_classes)
    vgg.classifier = nn.Sequential(*new_classifier)
    return vgg

@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()
    correct = 0
    total = 0
    num_classes = model.classifier[-1].out_features
    cm = torch.zeros((num_classes, num_classes), dtype=torch.long)
    for images, labels in loader:
        images = images.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)
        logits = model(images)
        preds = logits.argmax(dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
        for t, p in zip(labels.view(-1), preds.view(-1)):
            cm[t.long(), p.long()] += 1
    acc = correct / max(total, 1)
    return acc, cm

def save_checkpoint(state: dict, is_best: bool, out_dir: str):
    out = Path(out_dir)
    out.mkdir(parents=True, exist_ok=True)
    torch.save(state, out / "last.pth")
    if is_best:
        torch.save(state, out / "best.pth")

def export_onnx(model, device, img_size, num_classes, out_path: Path):
    model.eval()
    dummy = torch.randn(1, 3, img_size, img_size, device=device)
    dynamic_axes = {"input": {0: "batch"}, "logits": {0: "batch"}}
    torch.onnx.export(model, dummy, str(out_path),
                      input_names=["input"], output_names=["logits"],
                      dynamic_axes=dynamic_axes, opset_version=12)
    print(f"[Export] ONNX -> {out_path}")

def export_torchscript(model, device, out_path: Path):
    model.eval()
    scripted = torch.jit.script(model)
    scripted.save(str(out_path))
    print(f"[Export] TorchScript -> {out_path}")

@torch.no_grad()
def predict_image(model_path: str, image_path: str, img_size: int = 224, cpu: bool = False):
    device = torch.device("cpu" if cpu or not torch.cuda.is_available() else "cuda")
    ckpt = torch.load(model_path, map_location=device)
    class_names = ckpt["class_names"]
    model = build_model(num_classes=len(class_names), freeze_backbone=False, dropout_p=0.5)
    model.load_state_dict(ckpt["state_dict"])
    model.to(device)
    model.eval()
    _, eval_tf = build_transforms(img_size)
    from PIL import Image
    img = Image.open(image_path).convert("RGB")
    x = eval_tf(img).unsqueeze(0).to(device)
    logits = model(x)
    probs = torch.softmax(logits, dim=1)[0]
    pred_idx = int(torch.argmax(probs).item())
    return class_names[pred_idx], float(probs[pred_idx].item())

def train_loop(data_root: str, out_dir: str, img_size: int, batch_size: int, epochs: int,
               freeze_backbone: bool, amp: bool, lr: float, weight_decay: float,
               dropout: float, workers: int, early_stop: int, max_grad_norm: float, seed: int):
    set_seed(seed)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"[Device] {device}")
    train_loader, val_loader, test_loader, class_names = create_dataloaders_with_val(
        data_root=data_root, img_size=img_size, batch_size=batch_size,
        num_workers=workers, seed=seed
    )
    print(f"[Classes] {class_names}")
    model = build_model(num_classes=len(class_names), freeze_backbone=freeze_backbone, dropout_p=dropout)
    model.to(device)
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = optim.AdamW(params, lr=lr, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
    criterion = nn.CrossEntropyLoss()
    scaler = torch.cuda.amp.GradScaler(enabled=(device.type == "cuda" and amp))
    best_acc = 0.0
    epochs_no_improve = 0
    since = time.time()
    for epoch in range(1, epochs + 1):
        model.train()
        running_loss = 0.0
        running_correct = 0
        running_total = 0
        for images, labels in train_loader:
            images = images.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)
            optimizer.zero_grad(set_to_none=True)
            with torch.cuda.amp.autocast(enabled=(device.type == "cuda" and amp)):
                logits = model(images)
                loss = criterion(logits, labels)
            scaler.scale(loss).backward()
            if max_grad_norm is not None and max_grad_norm > 0:
                scaler.unscale_(optimizer)
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
            scaler.step(optimizer)
            scaler.update()
            running_loss += loss.item() * labels.size(0)
            preds = logits.argmax(dim=1)
            running_correct += (preds == labels).sum().item()
            running_total += labels.size(0)
        train_loss = running_loss / max(running_total, 1)
        train_acc = running_correct / max(running_total, 1)
        val_acc, val_cm = evaluate(model, val_loader, device)
        scheduler.step()
        is_best = val_acc > best_acc
        if is_best:
            best_acc = val_acc
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
        save_checkpoint({
            "epoch": epoch,
            "state_dict": model.state_dict(),
            "optimizer": optimizer.state_dict(),
            "scheduler": scheduler.state_dict(),
            "best_acc": best_acc,
            "class_names": class_names,
            "args": {
                "img_size": img_size, "batch_size": batch_size, "epochs": epochs,
                "freeze_backbone": freeze_backbone, "amp": amp, "lr": lr,
                "weight_decay": weight_decay, "dropout": dropout, "workers": workers,
                "early_stop": early_stop, "max_grad_norm": max_grad_norm, "seed": seed
            }
        }, is_best=is_best, out_dir=out_dir)
        print(f"[Epoch {epoch:03d}/{epochs}] train_loss={train_loss:.4f} train_acc={train_acc:.4f} val_acc={val_acc:.4f} best_val_acc={best_acc:.4f}")
        if early_stop > 0 and epochs_no_improve >= early_stop:
            print(f"[EarlyStop] No improvement for {early_stop} epochs.")
            break
    elapsed = time.time() - since
    print(f"[Done] Time: {elapsed/60.0:.1f} min, Best Val Acc: {best_acc:.4f}")
    if (Path(out_dir) / "best.pth").exists() and (Path(DATA_ROOT) / "test").exists():
        best_ckpt = torch.load(Path(out_dir) / "best.pth", map_location=device)
        model.load_state_dict(best_ckpt["state_dict"])
        test_loader = DataLoader(datasets.ImageFolder(Path(DATA_ROOT)/"test", transform=build_transforms(img_size)[1]),
                                 batch_size=batch_size, shuffle=False, num_workers=workers, pin_memory=True)
        test_acc, test_cm = evaluate(model, test_loader, device)
        print(f"[Test] acc={test_acc:.4f}")
        print("[Test] Confusion Matrix (rows=true, cols=pred):")
        print(test_cm.cpu().numpy())


In [None]:
# ▶️ 開始訓練
train_loop(
    data_root=DATA_ROOT,
    out_dir=OUT_DIR,
    img_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    freeze_backbone=FREEZE_BACKBONE,
    amp=USE_AMP,
    lr=LR,
    weight_decay=WEIGHT_DECAY,
    dropout=DROPOUT_P,
    workers=NUM_WORKERS,
    early_stop=EARLY_STOP,
    max_grad_norm=MAX_GRAD_NORM,
    seed=SEED,
)

## 推論（單張圖片）
把 `IMAGE_PATH` 指向你的測試影像，例如：
`/content/drive/MyDrive/data/test/cats/xxx.jpg`

In [None]:
from pathlib import Path
IMAGE_PATH = "/content/drive/MyDrive/data/test/cats/your_image.jpg"  # ← 改成你的圖片路徑
MODEL_PATH = str(Path(OUT_DIR) / "best.pth")
if Path(MODEL_PATH).exists() and Path(IMAGE_PATH).exists():
    pred, prob = predict_image(MODEL_PATH, IMAGE_PATH, img_size=IMG_SIZE, cpu=False)
    print(f"Predicted: {pred} ({prob:.3f})")
else:
    print("請先確認 MODEL_PATH 與 IMAGE_PATH 是否存在：\n", MODEL_PATH, "\n", IMAGE_PATH)

---
### 備註
- 若遇到 DataLoader 問題，可把 `NUM_WORKERS` 設為 0。
- 想要全網路一起訓練，請把 `FREEZE_BACKBONE=False`，並可把 `LR` 稍微調小一點再多訓練。
- 這份 Notebook 也支援 ONNX/TorchScript 匯出（有對應函式），你可以自行呼叫 `export_onnx` 或 `export_torchscript`。