# Image classification with custom dataset

以下將演示如何使用 huggingface 框架實現自定義載入資料集的方式，達到影像分類的結果。

huggingface 的工作流程：
![](https://hackmd.io/_uploads/SyirxbiP3.png)

## Import packages

In [None]:
# 安裝所需套件
!pip -q install torchio
!pip -q install transformers==4.30.0 datasets evaluate accelerate

In [None]:
# 匯入基本操作相關套件
import torchio as tio
import glob
import numpy as np
import matplotlib.pyplot as plt

# 匯入 PyTorch 相關套件
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

## Load dataset

* ### check data pipeline

In [None]:
# upload dataset NIH-3.zip
!wget https://github.com/TA-aiacademy/CMU_Course/releases/download/image_classification_data/NIH-3.zip
!unzip -q NIH-3.zip

In [None]:
# 蒐集資料集路徑
train_dir = glob.glob("NIH-3/train/*/*.png")
valid_dir = glob.glob("NIH-3/valid/*/*.png")

In [None]:
img = tio.ScalarImage(train_dir[0])

In [None]:
img

In [None]:
img.data

In [None]:
plt.figure(figsize=(8, 8))
plt.imshow(img.data.permute((3, 2, 1, 0))[0, :, :, 0], cmap='gray')

* ### check image preprocessing

In [None]:
# 資料增強（data augmentation）
transform = tio.Compose([
    tio.RandomAffine(scales=0.1,
                     degrees=10)
])

In [None]:
plt.figure(figsize=(8, 8))
plt.imshow(transform(img).data.permute((3, 2, 1, 0))[0, :, :, 0], cmap='gray')

In [None]:
from transformers import AutoImageProcessor
# checkpoint = "google/efficientnet-b6"
checkpoint = "microsoft/cvt-13"
image_processor = AutoImageProcessor.from_pretrained(checkpoint)

In [None]:
image_processor

In [None]:
image_processor(img.as_pil())

In [None]:
image_processor(img.as_pil())['pixel_values'][0].shape

* ### Build custom dataset

In [None]:
class NIHDataset(Dataset):
    def __init__(self, data_dir, label2id, image_processor=None, is_train=True):
        self.data_dir = data_dir
        self.transform = tio.Compose([
            tio.RandomAffine(scales=0.1, degrees=10)
        ])
        self.label2id = label2id
        self.image_processor = image_processor
        self.is_train = is_train
    def __getitem__(self, idx):
        path = self.data_dir[idx]
        label = self.label2id[path.split('/')[-2]]
        image = tio.ScalarImage(path)

        # confirm the consistency of image dimension
        if image.data.shape[0] != 1:  # channel_dim == 1 (grayscale)
            image.set_data(torch.unsqueeze(image.data[0], 0))

        # data augmentation
        if self.is_train:
            image = self.transform(image)

        # preprocessing
        if self.image_processor:
            image = image_processor(image.as_pil())['pixel_values'][0]
            return {'pixel_values': torch.tensor(image), 'label': label}
        else:
            image = tio.Resize((224, 224, 1))(image)
            image = image.data.permute(0, 3, 2, 1).squeeze(0)  # (1, W, H, C)->(1, C, H, W)->(C, H, W)
            image = image.repeat((3, 1, 1))  # gray to rgb
            image = image.float()/255.
            return {'pixel_values': image, 'label': label}

    def __len__(self):
        return len(self.data_dir)

In [None]:
labels = [i.split('/')[-1] for i in sorted(glob.glob("NIH-3/train/*"))]

In [None]:
label2id = {c: c_idx for c_idx, c in enumerate(labels)}
id2label = {str(c_idx): c for c_idx, c in enumerate(label2id)}

In [None]:
label2id

In [None]:
id2label

In [None]:
train_dataset = NIHDataset(train_dir, label2id, image_processor)
valid_dataset = NIHDataset(valid_dir, label2id, image_processor, is_train=False)

* ### Check dataset item

In [None]:
iter_dataset = iter(train_dataset)

In [None]:
item = next(iter_dataset)
item

In [None]:
plt.imshow(item['pixel_values'].permute((1, 2, 0)))

* ### Create data collator

In [None]:
from transformers import DefaultDataCollator

data_collator = DefaultDataCollator()

## Evaluate

在訓練過程中加入評估指標通常有助於評估模型的表現。可以使用 Hugging Face 的 [Evaluate](https://huggingface.co/docs/evaluate/index) 函式庫快速載入評估方法。在此任務上載入 [accuracy](https://huggingface.co/spaces/evaluate-metric/accuracy) 指標（請參閱 Hugging Face 的 Evaluate [快速導覽](https://huggingface.co/docs/evaluate/a_quick_tour)，以了解如何載入和計算指標的詳細資訊）：

In [None]:
import evaluate

accuracy = evaluate.load("accuracy")

然後創建一個函數，將預測及標籤使用 [compute](https://huggingface.co/docs/evaluate/v0.4.0/en/package_reference/main_classes#evaluate.EvaluationModule.compute) 以計算準確度：

In [None]:
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)

## Build model

現在已準備好開始訓練模型了！使用 [AutoModelForImageClassification](https://huggingface.co/docs/transformers/v4.28.1/en/model_doc/auto#transformers.AutoModelForImageClassification) 載入模型。指定標籤的數量以及標籤的對應方式：

In [None]:
from transformers import AutoModelForImageClassification

model = AutoModelForImageClassification.from_pretrained(
    checkpoint,
    num_labels=len(labels),
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True  # 預訓練模型的分類數量與自定義的資料集不同時使用
)

In [None]:
# freeze layers without training
for param in model.cvt.encoder.stages[:1].parameters():
    param.requires_grad = False

## Train model

接著的階段，只剩以下三個步驟：

1. 在 [TrainingArguments](https://huggingface.co/docs/transformers/v4.28.1/en/main_classes/trainer#transformers.TrainingArguments) 中定義訓練的超參數。請務必留意資料集中未使用的資訊，設定 remove_unused_columns=False 可以防止被刪除未使用到的資訊！例如 image，這會導致無法獲得 pixel_values。另一個必需設定的參數是 output_dir，指定模型儲存的位置。通過設定 push_to_hub=True 將模型上傳至 Hub（需要登入 Hugging Face 才能上傳模型）。在每個 epoch 結束時，[Trainer](https://huggingface.co/docs/transformers/v4.28.1/en/main_classes/trainer#transformers.Trainer) 將評估準確性並儲存訓練模型。
2. 將訓練參數、模型、資料集、預處理器、資料收集器以及計算評估指標函數傳遞給 [Trainer](https://huggingface.co/docs/transformers/v4.28.1/en/main_classes/trainer#transformers.Trainer)。
3. 呼叫 [train](https://huggingface.co/docs/transformers/v4.28.1/en/main_classes/trainer#transformers.Trainer.train) 來微調模型。

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="my_cvt_model",
    remove_unused_columns=False,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy")

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
    compute_metrics=compute_metrics,
)

In [None]:
from transformers.integrations import MLflowCallback
trainer.remove_callback(MLflowCallback)
trainer.train()

## Visualization

In [None]:
train_loss = []
valid_loss = []
valid_acc = []
for i in range(0, len(trainer.state.log_history)-1, 2):
    train_loss.append(trainer.state.log_history[i]['loss'])
    valid_loss.append(trainer.state.log_history[i+1]['eval_loss'])
    valid_acc.append(trainer.state.log_history[i+1]['eval_accuracy'])

In [None]:
plt.figure(figsize=(15, 4))
plt.subplot(1, 2, 1)
plt.plot(range(len(train_loss)), train_loss, label='train')
plt.plot(range(len(valid_loss)), valid_loss, label='val')
plt.legend()
plt.title('Loss')
plt.xlabel('Epoch')
plt.ylabel('Cross entropy')
plt.subplot(1, 2, 2)
plt.plot(range(len(valid_acc)), valid_acc)
plt.title('Validation accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.show()

In [None]:
cfm_metric = evaluate.load("BucketHeadP65/confusion_matrix")

In [None]:
trainer.evaluate(eval_dataset=valid_dataset)

In [None]:
outputs = trainer.predict(test_dataset=valid_dataset)
cm = cfm_metric.compute(predictions=np.argmax(outputs.predictions, axis=1),
                        references=outputs.label_ids)['confusion_matrix']

In [None]:
import seaborn as sns
plt.figure(figsize=(6, 4))
ax = sns.heatmap(cm, annot=True)
ax.set(xlabel="prediction", ylabel="label")
ax.set_xticklabels(labels)
ax.set_yticklabels(labels, rotation=0)
plt.show()

# Inference

現在，微調後的模型以存放在指定路徑，並可使用它來進行推論！

載入想要進行推論的影像：

In [None]:
test_dir = glob.glob("NIH-3/test/*/*.png")
test_dataset = NIHDataset(test_dir, label2id, image_processor, is_train=False)

In [None]:
test_loader = iter(DataLoader(test_dataset, batch_size=1))

In [None]:
item = next(test_loader)
test_x = item['pixel_values']
test_y = item['label']

將輸入傳遞給模型，並回傳 logits（尚未經過 softmax）：

In [None]:
from transformers import AutoModelForImageClassification

model = AutoModelForImageClassification.from_pretrained("my_cvt_model/checkpoint-380/")
with torch.no_grad():
    logits = model(test_x).logits

In [None]:
predicted_label = logits.argmax(-1).item()
model.config.id2label[predicted_label]

In [None]:
model.config.id2label[test_y.item()]

* ### Evaluate

In [None]:
trainer.evaluate(eval_dataset=test_dataset)

In [None]:
outputs = trainer.predict(test_dataset=test_dataset)
cm = cfm_metric.compute(predictions=np.argmax(outputs.predictions, axis=1),
                        references=outputs.label_ids)['confusion_matrix']

In [None]:
import seaborn as sns
plt.figure(figsize=(6, 4))
ax = sns.heatmap(cm, annot=True)
ax.set(xlabel="prediction", ylabel="label")
ax.set_xticklabels(labels)
ax.set_yticklabels(labels, rotation=0)
plt.show()