# 2.CNN_Dogs_Cats.ipynb 說明
本 notebook 是一個完整的「貓狗影像分類」深度學習專案範例，主要內容與流程如下：
1. **套件與工具載入**：匯入 PyTorch、torchvision、sklearn、PIL、torchinfo 等深度學習與資料處理相關套件，並載入 EarlyStopping 工具。
2. **資料下載與解壓縮**：提供下載 cats_dogs.zip 的程式碼（註解掉），並用 zipfile 解壓縮資料集。
3. **裝置檢查**：檢查目前可用的運算裝置（GPU、MPS、CPU），並設定 device 變數。
4. **標籤與資料收集**：定義貓狗的標籤對應（dog:0, cat:1），並撰寫 collect_data 函數自動收集資料夾下所有圖片路徑與標籤。
5. **資料集切分**：使用 train_test_split 將資料分為訓練集與驗證集，並統計各類別數量。
6. **資料增強與正規化**：定義影像增強（隨機翻轉、旋轉等）與正規化（resize、toTensor、normalize）流程。
7. **自訂 Dataset 與 DataLoader**：建立 Image_dataset 類別，並用 DataLoader 將資料批次化，方便訓練。
8. **CNN Block、Inception Block、ResBlock 實作**：定義多種 CNN 卷積區塊，包括基本卷積、Inception 結構、ResNet 殘差結構，並用 markdown 圖示說明結構。
9. **模型架構設計**：組合上述區塊，設計一個多層次的 CNN 架構（CnnArchitecture），最後用 GAP（全域平均池化）與全連接層輸出分類結果。
10. **模型訓練設定**：設定 optimizer、loss function、early stopping、學習率調整（scheduler）等訓練相關參數。
11. **訓練迴圈**：執行多輪訓練與驗證，記錄 loss 與 accuracy，並根據 early stopping 儲存最佳模型。
12. **訓練過程視覺化**：用 matplotlib 畫出 loss 與 accuracy 的變化曲線。
13. **模型測試與評估**：載入最佳模型，對測試集進行預測，並用 sklearn 計算分類報告（precision, recall, f1-score）。
14. **單張圖片預測**：示範如何對單張圖片進行預測並顯示預測結果（貓或狗）。
本 notebook 涵蓋資料前處理、模型設計、訓練、驗證、測試與結果視覺化，並實作了多種 CNN 卷積區塊與現代深度學習訓練技巧。

In [34]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as transforms
import os
import numpy as np
import zipfile
from torchinfo import summary
from PIL import Image
from sklearn.model_selection import train_test_split
from pytorchtools import EarlyStopping #從其他檔案內取得EarlyStop func

In [35]:
#!pip install gdown

In [36]:
# import gdown

# url = "https://drive.google.com/u/1/uc?id=1ND85Qa01QNNirv9NxLS_L90O3mTBpaX6&export=download"
# output = "cats_dogs.zip"
# gdown.download(url, output, quiet=False)

In [37]:
#測試是否有使用到GPU

# m1 晶片
print(torch.backends.mps.is_available())
print(torch.backends.mps.is_built())

# nvidia 顯卡
print(torch.cuda.is_available())
print(torch.backends.cuda.is_built())

False
False
False
False


In [38]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")# 使用mac m1 晶片

In [39]:
print(f'using:{device}')

using:cpu


In [40]:
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

MPS device not found.


In [41]:
with zipfile.ZipFile('./cats_dogs.zip', 'r') as zip_ref:
    zip_ref.extractall('.')


In [42]:
label_dict = {'dog':0, 'cat':1}
labelreverse = {v: k for k, v in label_dict.items()}
labelreverse

{0: 'dog', 1: 'cat'}

In [43]:
image_ext = ['jpg', 'png']

In [44]:
def collect_data(folder: str):
    all_data = []
    all_label = []

    for im_cls in os.listdir("./dataset/%s"%folder):
        if im_cls == "dogs":
            for im_path in os.listdir("./dataset/%s/dogs/"%folder):
                ext = im_path.split('.')[-1]
                if ext in image_ext:
                    all_data.append("./dataset/%s/dogs/"%folder+im_path)
                    all_label.append(0)
        elif im_cls == "cats":
            for im_path in os.listdir("./dataset/%s/cats/"%folder):
                ext = im_path.split('.')[-1]
                if ext in image_ext:
                    all_data.append("./dataset/%s/cats/"%folder+im_path)
                    all_label.append(1)

    return all_data, all_label



In [45]:
all_data, all_label = collect_data("training_set")

print('All data size : ', len(all_data))
print('All label size : ', len(all_label))


All data size :  8005
All label size :  8005


In [46]:
X_train, X_valid, y_train, y_valid = train_test_split(all_data,
                                                    all_label,
                                                    test_size=0.2,
                                                    stratify=all_label,
                                                    shuffle=True)

In [47]:
print("Train data Dog count : ", y_train.count(0))
print("Train data Cat count : ", y_train.count(1))
print("Valid data Dog count : ", y_valid.count(0))
print("Valid data Cat count : ", y_valid.count(1))

Train data Dog count :  3204
Train data Cat count :  3200
Valid data Dog count :  801
Valid data Cat count :  800


In [48]:
aug = transforms.RandomOrder([
    transforms.RandomHorizontalFlip(p=0.3),
    # transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomRotation(degrees=(-10, 10)),
    # transforms.GaussianBlur(kernel_size=(5, 5)),
])

In [49]:
norm = transforms.Compose([
    transforms.Resize([224, 224]),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], #(input - mean) / std, (R, G, B)
                         std=[0.229, 0.224, 0.225]) # from imagenet dataset
])

In [50]:
class Image_dataset(Dataset):
    def __init__(self, X_data, y_data, mode):
        self.X_data = X_data        # 輸入的訓練資料
        self.y_data = y_data        # 輸入訓練資料的標籤
        self.mode = mode
        self.norm = norm
        if mode =='train':
            self.augmentation = aug

    def __getitem__(self, index):
        data_path = self.X_data[index]
        image = Image.open(data_path)
        image_tensor = self.norm(image)
        if self.mode =='train':
            image_tensor = self.augmentation(image_tensor)

        target = torch.tensor(int(self.y_data[index]), dtype=torch.long) # 新版本不用再做torch.long了

        return image_tensor,target

    def __len__(self):
        return len(self.X_data)

In [51]:
train_data = Image_dataset(X_train, y_train, mode='train')
train_loader = DataLoader(
    train_data,
    batch_size=64,
    shuffle=True
    )
# Dataloader parameter settings (sampler)
# GPU pytorch DDP
valid_data = Image_dataset(X_valid, y_valid, mode='valid')
valid_loader = DataLoader(
    valid_data,
    batch_size=64,
    shuffle=False
    )

In [52]:
dataiter = iter(train_loader)   #迭代器
inputs,labels = next(dataiter)

print(inputs.size())
print(labels.size())


torch.Size([64, 3, 224, 224])
torch.Size([64])


In [53]:
def activation_func(activation):
    return nn.ModuleDict({
        'relu': nn.ReLU(inplace=True),
        'leaky_relu': nn.LeakyReLU(negative_slope=0.01, inplace=True),
        'sigmoid': nn.Sigmoid(),
        'prelu': nn.PReLU(),
        'softmax': nn.Softmax(dim=1),
        'gelu': nn.GELU()})[activation]

In [54]:
class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channel, k_size, activation='relu', pad=1, s=1, dilation=1):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, out_channel,
                              k_size, padding=pad, stride=s, dilation=dilation)
        self.batchNorm = nn.BatchNorm2d(out_channel)
        self.actfunction = activation_func(activation)
        self.act_name = activation

    def forward(self, x):
        x = self.conv(x)
        x = self.actfunction(x)
        x = self.batchNorm(x)
        return x

In [55]:
class InceptionBlock(nn.Module):
    def __init__(self, in_channels, Filter_List):
        super().__init__()
        self.ConvA = nn.Sequential(nn.Conv2d(in_channels=in_channels, out_channels=Filter_List[0], kernel_size=1),
                                     nn.ReLU(inplace=True))

        self.ConvB = nn.Sequential(nn.Conv2d(in_channels=in_channels, out_channels=Filter_List[1], kernel_size=1),
                                   nn.ReLU(inplace=True),
                                   nn.Conv2d(in_channels=Filter_List[1], out_channels=Filter_List[2], kernel_size=3, padding=1),
                                   nn.ReLU(inplace=True))

        self.ConvC = nn.Sequential(nn.Conv2d(in_channels=in_channels, out_channels=Filter_List[3], kernel_size=1),
                                   nn.ReLU(inplace=True),
                                   nn.Conv2d(in_channels=Filter_List[3], out_channels=Filter_List[4], kernel_size=5, padding=2),
                                   nn.ReLU(inplace=True))

        self.ConvD = nn.Sequential(nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
                                   nn.Conv2d(in_channels=in_channels, out_channels=Filter_List[5], kernel_size=1),
                                   nn.ReLU(inplace=True))
    def forward(self, x):
        out1 = self.ConvA(x)
        out2 = self.ConvB(x)
        out3 = self.ConvC(x)
        out4 = self.ConvD(x)
        out = torch.cat([out1, out2, out3, out4], dim=1) # dim=1是因為用channel在疊
        return out

```
                      +--------------+
                      |  Input (x)   |
                      +--------------+
                             │
        ┌────────────────────┼────────────────────┬────────────────────┐
        │                    │                    │                    │
        ▼                    ▼                    ▼                    ▼
   +-----------+        +-------------+      +-------------+      +--------------+
   |  ConvA    |        |   ConvB     |      |   ConvC     |      |    ConvD     |
   | (1x1 conv)|        | (1x1→3x3)   |      | (1x1→5x5)   |      | (MaxPool→1x1)|
   |   ReLU    |        |   ReLU      |      |   ReLU      |      |    ReLU      |
   +-----------+        +-------------+      +-------------+      +--------------+
        │                    │                    │                    │
        └────────────┬───────┴───────┬────────────┴───────┬────────────┘
                     │                   │                    │
                     ▼                   ▼                    ▼
            +-----------------------------------------------+
            |  Concatenation along channel dimension (dim=1)|
            +-----------------------------------------------+
                             │
                             ▼
                         +----------+
                         |  Output  |
                         +----------+
```

In [56]:
# ResBlock 有兩種，要再去論文內閱讀補充
class ResBlock(nn.Module):
    def __init__(self, in_channels, activation='relu'):
        super().__init__()
        self.convR1 = CNNBlock(in_channels,
                               int(in_channels/4),
                               1, activation, 0)
        self.convR2 = CNNBlock(
            int(in_channels/4), int(in_channels/4), 3, activation, 1)
        self.convR3 = CNNBlock(
            int(in_channels/4), in_channels, 1, activation, 0)
        self.actfunctionR = activation_func(activation)
# 為什麼channel是除以4？ 原作者實驗下來的結果
    def forward(self, x):
        x1 = self.convR1(x)
        x2 = self.convR2(x1)
        x3 = self.convR3(x2)
        res = x + x3
        res = self.actfunctionR(res)
        return res

         ┌───────────────┐
         │  x (原始輸入)  │
         └───────────────┘
                 │
                 │
                 ▼
         ┌───────────────┐
         │CNNBlock convR1│   1×1 卷積 (降維：in_channels → in_channels/4)
         └───────────────┘
                 │
                 ▼
         ┌───────────────┐
         │CNNBlock convR2│   3×3 卷積 (保持通道數：in_channels/4)
         └───────────────┘
                 │
                 ▼
         ┌───────────────┐
         │CNNBlock convR3│   1×1 卷積 (升維：in_channels/4 → in_channels)
         └───────────────┘
                 │
                 ├─────────────────────┐
                 │                     │
                 ▼                     │
          ┌───────────────┐            │
          │加法 (Residual) │  ◄─────────┘   (將 convR3 的輸出與原始輸入 x 相加)
          └───────────────┘
                 │
                 ▼
         ┌───────────────┐
         │激活函數 (activation)│   (例如 ReLU)
         └───────────────┘
                 │
                 ▼
          ┌───────────────┐
          │     Output    │
          └───────────────┘

In [57]:
# Model structure
class CnnArchitecture(nn.Module):
    def __init__(self):
        super(CnnArchitecture, self).__init__()

        self.cls = 2

        self.pool = nn.MaxPool2d(2)

        self.cnn1 = CNNBlock(3, 16, 7, s=2, pad=3)
        self.res1 = ResBlock(16)

        self.inc = InceptionBlock(16, [64, 96, 128, 16, 32, 32])

        self.res2 = ResBlock(256)

        self.cnn2 = CNNBlock(256, 512, 1, pad=0)


        self.cnn3 = CNNBlock(512, 64, 1, pad=0)
        self.res3 = ResBlock(64)

        self.cnn4 = CNNBlock(64, 32, 1, pad=0)
        self.res4 = ResBlock(32)

        self.output = nn.Sequential(nn.Conv2d(in_channels = 32, out_channels = self.cls, kernel_size = 1, padding = 0),
                                    nn.AdaptiveAvgPool2d(1),
                                    ) # GAP 實作方法 可以提， 新的模型大多使用 GAP取代 Flatten, 神經網路夠深，才會建議使用GAP


    def forward(self, x):
        x = self.pool(self.res1(self.cnn1(x)))
        x = self.inc(x)
        x = self.res2(x)
        x = self.pool(self.cnn2(x))

        x = self.pool(self.res3(self.cnn3(x)))
        x = self.pool(self.res4(self.cnn4(x)))

        x = self.output(x)
        x = x.view(-1, self.cls) # input shape(batch, Class_size, 1, 1)

        # print(x.size())

        return x

cnn_model = CnnArchitecture().to(device)
# print(cnn_model)

test_tensor = torch.randn((1, 3, 224, 224)).to(device)
output = cnn_model(test_tensor)
# print(output.size())
# print(output)



In [58]:
summary(cnn_model, input_data=test_tensor)

Layer (type:depth-idx)                   Output Shape              Param #
CnnArchitecture                          [1, 2]                    --
├─CNNBlock: 1-1                          [1, 16, 112, 112]         --
│    └─Conv2d: 2-1                       [1, 16, 112, 112]         2,368
│    └─ReLU: 2-2                         [1, 16, 112, 112]         --
│    └─BatchNorm2d: 2-3                  [1, 16, 112, 112]         32
├─ResBlock: 1-2                          [1, 16, 112, 112]         --
│    └─CNNBlock: 2-4                     [1, 4, 112, 112]          --
│    │    └─Conv2d: 3-1                  [1, 4, 112, 112]          68
│    │    └─ReLU: 3-2                    [1, 4, 112, 112]          --
│    │    └─BatchNorm2d: 3-3             [1, 4, 112, 112]          8
│    └─CNNBlock: 2-5                     [1, 4, 112, 112]          --
│    │    └─Conv2d: 3-4                  [1, 4, 112, 112]          148
│    │    └─ReLU: 3-5                    [1, 4, 112, 112]          --
│    │    └─

In [59]:
optimizer = torch.optim.AdamW(cnn_model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()


In [60]:
early_stopping = EarlyStopping(patience=5,
                               verbose=True,
                               path="save_model/best_weights.pth",
                              )

In [61]:
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer,
#                                                        mode='min',
#                                                        factor=0.1,
#                                                        patience=3,
#                                                        verbose=True)

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)

# Warmup-Dicay

In [62]:
os.makedirs('save_model', exist_ok=True)

In [None]:
n_epochs = 20
i_iter = 0


train_losses = []
valid_losses = []
train_accs = []
valid_accs = []


for epoch in range(n_epochs):
    b_train_loss = []
    b_valid_loss = []
    b_train_acc = []
    b_valid_acc = []
    for idx, (imgs, labels) in enumerate(train_loader):
        cnn_model.train()
        n_correct_train = 0
        imgs = imgs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = cnn_model(imgs)

        train_loss = loss_fn(outputs, labels)
        train_loss.backward()
        optimizer.step()

        train_outputs_label = torch.argmax(outputs, 1)
        n_correct_train = len(torch.where(train_outputs_label == labels)[0])/len(labels)


        b_train_loss.append(train_loss.item())
        b_train_acc.append(n_correct_train)

    n_correct_val = 0
    n_val_data = 0
    cnn_model.eval()
    with torch.no_grad():
        for _idx, (imgs, labels) in enumerate(valid_loader):
            imgs = imgs.to(device)
            labels = labels.to(device)
            outputs = cnn_model(imgs)
            val_loss = loss_fn(outputs, labels)

            val_outputs_label = torch.argmax(outputs, 1)
            n_correct_val = len(torch.where(val_outputs_label == labels)[0])/len(labels)

            b_valid_loss.append(val_loss.item())
            b_valid_acc.append(n_correct_val)

    ep_train_loss = np.mean(b_train_loss)
    ep_vaild_loss = np.mean(b_valid_loss)
    ep_train_acc = np.mean(b_train_acc)
    ep_valid_acc = np.mean(b_valid_acc)

    train_losses.append(ep_train_loss)
    valid_losses.append(ep_vaild_loss)
    train_accs.append(ep_train_acc)
    valid_accs.append(ep_valid_acc)

    print(f'{epoch + 1:2d}/{n_epochs:2d} {idx + 1:3d}/{len(train_loader):3d}, \
        train loss: {ep_train_loss:8.5f}, \
        train acc: {ep_train_acc:7.5f}, \
        val loss: {ep_vaild_loss:8.5f}, \
        val acc: {ep_valid_acc:7.5f}')

    scheduler.step()
    # To check the current learning rate
    print("Current learning rate:", scheduler.get_last_lr())

    early_stopping(ep_vaild_loss, cnn_model) # __call__ function
    i_iter += 1

    if early_stopping.early_stop:
        print("[INFO] Early stopping")
        break

In [None]:
import matplotlib.pyplot as plt
# 視覺化訓練過程
plt.figure(figsize=(15, 7))

# 繪製 Training loss 和 Validation loss
plt.subplot(121)
plt.plot(range(len(train_losses)), train_losses, label='Training Loss')
plt.plot(range(len(valid_losses)), valid_losses, label='Validation Loss')
plt.legend(loc='upper left')
plt.title('Loss')

# 繪製 Training accuracy 和 Validation accuracy
plt.subplot(122)
plt.plot(range(len(train_accs)), train_accs, label='Training Accuracy')
plt.plot(range(len(valid_accs)), valid_accs, label='Validation Accuracy')
plt.yticks(np.arange(0.8, 1, 0.05))
plt.legend(loc='upper left')
plt.title('Accuracy')

plt.show()

In [None]:
test_preds = []
cnn_model = torch.load("save_model/best_weights.pth")
cnn_model.to(device)

test_data, test_label = collect_data("test_set")
print("test image size:" , len(test_data))
for path in test_data:
    image = Image.open(path)
    image_tensor = norm(image) # 要記得做一樣的前處理！
    image_tensor = torch.unsqueeze(image_tensor, dim=0)
    image_tensor = image_tensor.to(device)
    outputs = cnn_model(image_tensor)
    pred = torch.argmax(outputs, 1).item() #.item()
    #pred = torch.argmax(outputs, 1).numpy(*, force=False)
    test_preds.append(pred)

In [None]:
from sklearn import metrics
target_names = ['dog', 'cat']
print(metrics.classification_report(test_label, test_preds, target_names=target_names))

In [None]:
image = Image.open("./dataset/test_set/cats/cat.4003.jpg")
image_tensor = norm(image)

image_tensor = image_tensor.to(device)
image_tensor = torch.unsqueeze(image_tensor, dim=0)

outputs = cnn_model(image_tensor)
pred = torch.argmax(outputs, 1).item()
labelreverse[int(pred)]