# Week 2 homework
Please train a model with PyTorch to classify Fashion MNIST dataset, and submit your source code for training, your best model, and the screenshot of the training log (not required if your jupyter notebook contains the training log).

Your source code must contain custom dataset, data normalization, data augmentation, loss & accuracy calculations on training / validation / testing set.
  
Deadline: 2024/08/13 23:59:59

本程式用到的檔案放在我的 GitHub repository：  
https://github.com/cjzrv/summer-course/tree/main/week2

fashion-mnist_train.csv 及 fashion-mnist_test.csv 須自行下載後放置到 fashion 目錄下：  
https://www.kaggle.com/datasets/zalando-research/fashionmnist

## 匯入需要的套件及模組

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, random_split
import torchvision.models as models
from torchvision import transforms
from torchvision.datasets import ImageFolder
import numpy as np
import pandas as pd
from PIL import Image, ImageOps, ImageEnhance
import os

## 將資料集轉換為圖片

由於 Fashion MNIST 資料集是以 CSV 檔案形式提供，我想先將這些資料還原回圖片格式，以模擬之後自己建立資料集時使用圖片作為輸入的情況。

In [2]:
label_names = [
    'T-shirt', 'Trouser', 'Pullover', 'Dress', 'Coat',
    'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot'
]

# 轉換 training dataset 裡的資料為圖片檔，並把 label 作為檔名
csv_file_path = './fashion/fashion-mnist_train.csv'
output_dir = 'images/train'

os.makedirs(output_dir, exist_ok=True)
data = pd.read_csv(csv_file_path)

labels = data.iloc[:, 0].values
images = data.iloc[:, 1:].values

for i in range(len(images)):
    image_array = images[i].reshape(28, 28)
    label = labels[i]
    
    image = Image.fromarray(image_array.astype(np.uint8))
    label_name = label_names[label]
    
    image.save(os.path.join(output_dir, f'label_{label}_{label_name}_{i}.png'))

# 轉換 testing dataset 裡的資料為圖片檔，並把 label 作為檔名
csv_file_path = './fashion/fashion-mnist_test.csv'
output_dir = 'images/test'

os.makedirs(output_dir, exist_ok=True)
data = pd.read_csv(csv_file_path)

labels = data.iloc[:, 0].values
images = data.iloc[:, 1:].values

for i in range(len(images)):
    image_array = images[i].reshape(28, 28)
    label = labels[i]
    
    image = Image.fromarray(image_array.astype(np.uint8))
    label_name = label_names[label]
    
    image.save(os.path.join(output_dir, f'label_{label}_{label_name}_{i}.png'))

## 增加額外的訓練資料

我拍了幾張自己的衣服、褲子還有拖鞋的照片，雖然不夠 fashion 但我還是把它們加進了這次的訓練資料集中。

（我已在這些照片的檔名上標好了 label）

轉換前：

<img src="./showcase/before.png" alt="before" width="500px"/>

轉換後：

<img src="./showcase/after.png" alt="after" width="500px"/>

In [3]:
input_folder = './pic'
output_folder = './images/train'      # 輸出到跟訓練資料集一樣的目錄下

os.makedirs(output_folder, exist_ok=True)

for filename in os.listdir(input_folder):
    if filename.endswith(('.png', '.jpg', '.jpeg')):
        img_path = os.path.join(input_folder, filename)
        img = Image.open(img_path).convert('RGBA')
        
        np_img = np.array(img)
        r, g, b, a = np.rollaxis(np_img, axis=-1)
        mask = ((r > 240) & (g > 240) & (b > 240))
        np_img[mask] = [0, 0, 0, 0]
        
        img_no_bg = Image.fromarray(np_img, 'RGBA')

        img_gray = ImageOps.grayscale(img_no_bg)
        img_black_bg = Image.new("L", img_gray.size, 0)
        img_black_bg.paste(img_gray, (0, 0), img_no_bg)
        
        img_resized = img_black_bg.resize((28, 28))
        
        img_inverted = ImageOps.invert(img_resized)
        enhancer = ImageEnhance.Contrast(img_inverted)
        img_contrast = enhancer.enhance(2.0)

        output_path = os.path.join(output_folder, filename)
        img_contrast.save(output_path)

## 定義這次訓練所要使用的模型

就是一個簡單的 CNN 模型。

In [4]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(256)
        
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.dropout = nn.Dropout(0.5)
        
        self.fc1 = nn.Linear(in_features=256 * 3 * 3, out_features=512)
        self.fc2 = nn.Linear(in_features=512, out_features=10)
        
    def forward(self, x):
        x = self.pool(torch.nn.functional.relu(self.bn1(self.conv1(x))))
        x = self.pool(torch.nn.functional.relu(self.bn2(self.conv2(x))))
        x = self.pool(torch.nn.functional.relu(self.bn3(self.conv3(x))))
        
        x = x.view(-1, 256 * 3 * 3)
        
        x = torch.nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

model = CNN()

## 自訂一個資料集的類別

能夠直接以圖片的格式載入資料集。

In [5]:
class FashionDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir                  # read data from the image folder
        self.transform = transform                  # initial transfrom

        # 取得所有圖片的檔名
        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.png')]

        # 取檔名的第二部份作為 label（前面就是這樣命名的）
        self.labels = [int(f.split('_')[1]) for f in self.image_files]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(img_name)
        
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

## 資料預處理

經過多次測試，資料增強方法的參數若設置過大，反而會嚴重降低模型預測的準確率，故這裡都是選用較小的參數。

在選擇資料增強的方法時也需注意該方法是否適合要處理的資料集，如辨識手寫數字時，若使用垂直翻轉，6 就直接變成 9 了。

而對一些不對稱的數字用水平翻轉（如 5），得出的額外資料也不能幫助模型的訓練。

In [6]:
train_transform = transforms.Compose([
    transforms.RandomRotation(degrees=10),                      # 隨機旋轉
    transforms.RandomResizedCrop(size=28, scale=(0.9, 1.1)),    # 隨機縮放
    transforms.RandomHorizontalFlip(p=0.3),                     # 隨機水平翻轉
    transforms.ColorJitter(brightness=0.1, contrast=0.1),       # 隨機調整亮度和對比度
    transforms.ToTensor(),                  # 將原來 [0, 255] 的像素值轉成範圍在 [0, 1] 的張量格式
    transforms.Normalize((0.5,), (0.5,)),   # 再對 [0, 1] 的像素值做標準化，輸出的範圍為 [-1, 1]，這麼做能夠加速模型收斂
])

test_transform = transforms.Compose([
    transforms.ToTensor(),                  # 測試資料集不需進行資料增強
    transforms.Normalize((0.5,), (0.5,))    # 但仍需將圖像轉換為張量並進行標準化，使測試時的輸入格式保持一致
])

## 載入資料集

載入訓練資料集和測試資料集並切割出一個驗證資料集。

In [7]:
train_path = 'images/train'
test_path = 'images/test'

train_data = FashionDataset(image_dir=train_path, transform=train_transform)
test_data = FashionDataset(image_dir=test_path, transform=test_transform)
testLen = int(len(test_data) * 0.5)             # 因為 fashion mnist 沒有提供 validation dataset
valLen = len(test_data) - testLen               # 故我們手動切割一半的 testing dataset 為 validation dataset
test_data, val_data = random_split(test_data, [testLen, valLen])

## 建立 DataLoader

In [8]:
train_loader = DataLoader(train_data, batch_size=256, shuffle=True)
val_loader = DataLoader(val_data, batch_size=256, shuffle=True)
test_loader = DataLoader(test_data, batch_size=256, shuffle=False)

## 選擇用於計算的裝置

有 GPU 就用 GPU，沒有就用 CPU。

In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


## 選擇 loss function 和 optimizer 的方法

In [10]:
model = CNN()

criterion = nn.CrossEntropyLoss()                       # loss funtion 選擇 CrossEntropyLoss
optimizer = optim.AdamW(model.parameters(), lr=0.0001)  # optimizer 選擇 AdamW，lr 為 learning rate

model.to(device)

CNN(
  (conv1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc1): Linear(in_features=2304, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=10, bias=True)
)

## 訓練模型

In [11]:
num_epochs = 20                             # 對訓練集完整訓練一次為一個 epoch
best_accuracy = 0.0
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct_train = 0
    total_train = 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()               # 將梯度歸零，避免累加到先前計算的梯度
        outputs = model(inputs)
        loss = criterion(outputs, labels)   # 用前面設定的 loss function 對 output 和 label 計算出 loss 值
        loss.backward()                     # 進行 backpropagation，計算每個參數的梯度
        optimizer.step()                    # 使用計算出的梯度來更新模型的參數
        
        running_loss += loss.item() * inputs.size(0)    # loss.item() 為當前 batch 的 loss 值
                                                        # input.size() 等同於這次訓練的 batch size
                                                        # 把當前 batch 的 loss 累加到這個 epoch 的總 loss 中
        _, predicted = torch.max(outputs, 1)            # torch.max(outputs, 1) 會回傳預測機率的最大值和最大值所在的索引
                                                        # 這邊不需要用到預測機率的最大值，故回傳到 _ 變數並無視它
                                                        # predicted 則保存最大值所在的索引，也就是預測出的結果
        total_train += labels.size(0)                           # 把總訓練數加上這次訓練的 batch size
        correct_train += (predicted == labels).sum().item()     # correct_train 會累計當前這個 epoch 預測正確的總樣本數

    train_loss = running_loss / len(train_loader.dataset)       # train_loss 為當前這個 epoch 的平均 loss
    train_accuracy = correct_train / total_train                # train_accuracy 為當前這個 epoch 的平均準確率

    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {train_loss:.4f}, Training Accuracy: {train_accuracy:.4f}")

    # 使用 validation dataset 對模型進行測試
    model.eval()
    running_val_loss = 0.0
    correct_val = 0
    total_val = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_val_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

        val_loss = running_val_loss / len(val_loader.dataset)
        val_accuracy = correct_val / total_val

        if val_accuracy > best_accuracy:
            # 保存得出最佳結果的模型參數
            torch.save(model.state_dict(), 'best.pth')
            best_accuracy = val_accuracy

        print(f"** Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f} **")

Epoch 1/20, Training Loss: 0.7157, Training Accuracy: 0.7495
** Validation Loss: 0.3850, Validation Accuracy: 0.8608 **
Epoch 2/20, Training Loss: 0.4533, Training Accuracy: 0.8363
** Validation Loss: 0.3362, Validation Accuracy: 0.8736 **
Epoch 3/20, Training Loss: 0.3960, Training Accuracy: 0.8563
** Validation Loss: 0.2917, Validation Accuracy: 0.8924 **
Epoch 4/20, Training Loss: 0.3639, Training Accuracy: 0.8668
** Validation Loss: 0.2720, Validation Accuracy: 0.8958 **
Epoch 5/20, Training Loss: 0.3414, Training Accuracy: 0.8757
** Validation Loss: 0.2757, Validation Accuracy: 0.8946 **
Epoch 6/20, Training Loss: 0.3238, Training Accuracy: 0.8822
** Validation Loss: 0.2548, Validation Accuracy: 0.9032 **
Epoch 7/20, Training Loss: 0.3117, Training Accuracy: 0.8860
** Validation Loss: 0.2342, Validation Accuracy: 0.9126 **
Epoch 8/20, Training Loss: 0.3003, Training Accuracy: 0.8896
** Validation Loss: 0.2366, Validation Accuracy: 0.9120 **
Epoch 9/20, Training Loss: 0.2906, Train

## 評估模型

使用測試資料集對訓練出的最佳模型參數進行測試，以評估其性能。

In [12]:
model.load_state_dict(torch.load('best.pth', weights_only=False))

model.eval()
running_test_loss = 0.0
correct_test = 0
total_test = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        running_test_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs, 1)
        total_test += labels.size(0)
        correct_test += (predicted == labels).sum().item()

test_loss = running_test_loss / len(test_loader.dataset)
test_accuracy = correct_test / total_test

print(f'Testing Loss: {test_loss:.4f}, Testing Accuracy: {test_accuracy:.4f}')

Testing Loss: 0.1895, Testing Accuracy: 0.9340
