# (KOR) Train a Vision Transformer on small datasets

**저자:** [Aritra Roy Gosthipaty](https://twitter.com/ariG23498)<br>
**만든 날:** 2022/01/07<br>
**최종수정:** 2022/01/10<br>
**간단설명:** shifted patch tokenizaiton과 locality self-attention을 적용하여 작은 크기의 데이터셋을 활용한 ViT학습을 밑바닥부터 수행해 봅니다.<br>
**한글번역:** [모두의연구소 박은수](https://www.linkedin.com/in/eunsoo/) 

## Introduction
Vision Transformer(ViT)를 설명하는 논문인 [An Image is Worth 16x16 Words: Transformers for Image Recognition at Scale](https://arxiv.org/abs/2010.11929)에서 ViT는 엄청난 수의 데이터를 필요로 한다고 합니다. 따라서 ViT를 JFM300M과 같은 큰 사이즈의 데이터로 사전학습(pretraining)하고 이를 ImageNet과 같은 중간크기 사이즈의 데이터로 미세조정(fine-tuning)하는 방법이 State-of-the-art 성능의 Convolutional Neural Networks(CNNs)모델의 성능을 넘어서는 유일한 방법이라고 했습니다.

ViT의 self-attention레이어는 이미지가 **locality inductive bias** (이미지 픽셀들이 가까운곳끼리(locally) 관련되어있으며, 그것의 상관맵(correlation maps)이 이동불변)하다는 특징을 활용하지 않습니다. 바로 이점이 ViT가 더 많은 데이터를 필요로하는 이유입니다. 반면에 CNNs은 슬라이딩 윈도우 형태로 이미지를 처리하기 때문에 더 적은 수의 데이터를 갖고도 좋은 성능을 얻을 수 있습니다.

[Vision Transformer for Small-Size Datasets](https://arxiv.org/abs/2112.13492v1)논문의 저자들은 ViT가 갖고있는 locality inductive bias문제를 해결하고자 합니다. 

핵심 아이디어는 다음과 같습니다 : 
- **Shifted Patch Tokenization**
- **Locality Self Attention** 

이 예제는 이 논문의 아이디어를 구현한 것입니다. 이 코드의 많은 부분을 [Image classification with Vision Transformer](https://keras.io/examples/vision/image_classification_with_vision_transformer/)에서 영감을 받아 작성하였습니다.

_참고_ : 이 예제는 TensorFlow 2.6 이상의 버젼과 [TensorFlow Addons](https://www.tensorflow.org/addons)이 필요합니다. TensorFlow Addons의 경우 다음 명령어로 설치할 수 있습니다. 

```python
pip install -qq -U tensorflow-addons
```

## 셋업 (Setup)

In [None]:
import math
import numpy as np
import tensorflow as tf
from tensorflow import keras
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
from tensorflow.keras import layers

# 재현할 수 있도록 시드 값 설정
SEED = 42
keras.utils.set_random_seed(SEED)

## 데이터 준비 (Prepare the data)

In [None]:
NUM_CLASSES = 100
INPUT_SHAPE = (32, 32, 3)

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar100.load_data()

print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

## 하이퍼 파라미터 구성 (Configure the hyperparameters)
이 예제의 하이퍼파라미터는 논문과 다릅니다. 하이퍼파라미터로 자유롭게 바꿔보셔도 됩니다.

In [None]:
# 데이터
BUFFER_SIZE = 512
BATCH_SIZE = 256

# 데이터 증강 (AUGMENTATION)
IMAGE_SIZE = 72
PATCH_SIZE = 6
NUM_PATCHES = (IMAGE_SIZE // PATCH_SIZE) ** 2

# 옵티마이저 (OPTIMIZER)
LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.0001

# 헉습 (TRAINING)
EPOCHS = 50

# 구조 (ARCHITECTURE)
LAYER_NORM_EPS = 1e-6
TRANSFORMER_LAYERS = 8
PROJECTION_DIM = 64
NUM_HEADS = 4
TRANSFORMER_UNITS = [
    PROJECTION_DIM * 2,
    PROJECTION_DIM,
]
MLP_HEAD_UNITS = [2048, 1024]

## 데이터 증강 (Use data augmentation)

논문에서 언급한 부분 : 
*"DeiT논문에서 ViT를 효과적으로 학습하기위해서는 다양한 방법이 필요하다고한다. 따라서 CutMix, Mixup, Auto Augment, Repeated Augment 같은 데이터 증강방법을 모든 모델에 적용하였다."*

이 예제에서는 논문을 그대로 재현하는것이 아니라 논문에서 제시한 참신성(novelty)에만 초점을 둘것 입니다. 따라서 위 논문에서 언급되었던 데이터증강을 사용하지 않을 것입니다. 그렇기에 자유롭게 데이터증강을 위한 파이프라인을 추가하거나 삭제하셔도 됩니다.

In [None]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(IMAGE_SIZE, IMAGE_SIZE),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(height_factor=0.2, width_factor=0.2),
    ],
    name="data_augmentation",
)
# 노말라이제이션(normalizaiton)을 위해서 학습데이터의 mean과 variance를 계산
data_augmentation.layers[0].adapt(x_train)

## Patch Tokenization의 구현 (Implement Shifted Patch Tokenization)
ViT의 파이프라인에서 입력 이미지는 선형투영(linear projection)을 통해 토큰으로 바뀝니다. ViT는 작은 크기의 recpetive field를 갖고 있기 때문에 이를 극복하고자 Shifted Patch Tokenizaiton (STP)가 도입되었습니다. Shifted Patch Tokenization은 다음 단계를 따릅니다. 

- 이미지로 시작
- 이미지를 대각선 방향으로 이동
- 대각선으로 이동한 이미지들과 원본 이미지를 채널 방향으로 연결(Concat)
- 연결된 이미지에서 패치들을 추출
- 모든 패치의 공간차원(spatial dimension)을 벡터형태로 변환 (flatten)
- 벡터형태의 패치에 레이어 노말라이제이션을 적용한 후 다시 선형투영 (linear projection)

| ![Shifted Patch Toekenization](https://i.imgur.com/bUnHxd0.png) |
| :--: |
| Shifted Patch Tokenization [Source](https://arxiv.org/abs/2112.13492v1) |

In [None]:
# from IPython.core.debugger import set_trace
class ShiftedPatchTokenization(layers.Layer):
    def __init__(
        self,
        image_size=IMAGE_SIZE,
        patch_size=PATCH_SIZE,
        num_patches=NUM_PATCHES,
        projection_dim=PROJECTION_DIM,
        vanilla=False,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.vanilla = vanilla  # 바닐라 패치 추출기로 전환할 수 있는 플래그
        self.image_size = image_size
        self.patch_size = patch_size
        self.half_patch = patch_size // 2
        self.flatten_patches = layers.Reshape((num_patches, -1))
        self.projection = layers.Dense(units=projection_dim)
        self.layer_norm = layers.LayerNormalization(epsilon=LAYER_NORM_EPS)

    def crop_shift_pad(self, images, mode):
        # 대각방향으로 이동된 이미지 만들기
        if mode == "left-up":
            crop_height = self.half_patch
            crop_width = self.half_patch
            shift_height = 0
            shift_width = 0
        elif mode == "left-down":
            crop_height = 0
            crop_width = self.half_patch
            shift_height = self.half_patch
            shift_width = 0
        elif mode == "right-up":
            crop_height = self.half_patch
            crop_width = 0
            shift_height = 0
            shift_width = self.half_patch
        else:
            crop_height = 0
            crop_width = 0
            shift_height = self.half_patch
            shift_width = self.half_patch

        # 대각이동 이미지 만들고 자른 후 패딩하기
        crop = tf.image.crop_to_bounding_box(
            images,
            offset_height=crop_height,
            offset_width=crop_width,
            target_height=self.image_size - self.half_patch,
            target_width=self.image_size - self.half_patch,
        )
        shift_pad = tf.image.pad_to_bounding_box(
            crop,
            offset_height=shift_height,
            offset_width=shift_width,
            target_height=self.image_size,
            target_width=self.image_size,
        )
        return shift_pad

    def call(self, images):
        if not self.vanilla:
            # 원본 이미지와 대각이동 이미지들의 concat
            images = tf.concat(
                [
                    images,
                    self.crop_shift_pad(images, mode="left-up"),
                    self.crop_shift_pad(images, mode="left-down"),
                    self.crop_shift_pad(images, mode="right-up"),
                    self.crop_shift_pad(images, mode="right-down"),
                ],
                axis=-1,
            )
        # 이미지를 패치로 만들고 패치들을 벡터형태로 변환하기 (flatten)
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        flat_patches = self.flatten_patches(patches)
        if not self.vanilla:
            # 레이어 노말라이제이션 적용 후 선형투영
            tokens = self.layer_norm(flat_patches)
            tokens = self.projection(tokens)
        else:
            # 선형투영 
            tokens = self.projection(flat_patches)
        return (tokens, patches)


### 패치를 시각화 하기 (Visualize the patches)

In [None]:
# 학습데이터셋에서 랜덤 이미지를 갖고 온 후 
# 이미지 리사이즈 적용
image = x_train[np.random.choice(range(x_train.shape[0]))]
resized_image = tf.image.resize(
    tf.convert_to_tensor([image]), size=(IMAGE_SIZE, IMAGE_SIZE)
)
# 바닐라 패치메이커 : ViT 논문에서 처럼 이미지를 받아서 패치로 변환함. 
(token, patch) = ShiftedPatchTokenization(vanilla=True)(resized_image / 255.0)
(token, patch) = (token[0], patch[0])
n = patch.shape[0]

# 번역자 주석추가 : 바닐라 패치메이커로 한장 추출 후 패치로 만들어 보기
count = 1
plt.figure(figsize=(4, 4))
for row in range(n):
    for col in range(n):
        plt.subplot(n, n, count)
        count = count + 1
        image = tf.reshape(patch[row][col], (PATCH_SIZE, PATCH_SIZE, 3))
        plt.imshow(image)
        plt.axis("off")
plt.show()

In [None]:
# Shifted Patch Tokenization : 입력 이미지에 4개의 대각선 이동을 추가한 후 
# 이를 채널 방향으로 연결(concat) 한 뒤 패치를 추출
(token, patch) = ShiftedPatchTokenization(vanilla=False)(resized_image / 255.0)
(token, patch) = (token[0], patch[0])
n = patch.shape[0]
shifted_images = ["ORIGINAL", "LEFT-UP", "LEFT-DOWN", "RIGHT-UP", "RIGHT-DOWN"]
for index, name in enumerate(shifted_images):
    print(name)
    count = 1
    plt.figure(figsize=(4, 4))
    for row in range(n):
        for col in range(n):
            plt.subplot(n, n, count)
            count = count + 1
            image = tf.reshape(patch[row][col], (PATCH_SIZE, PATCH_SIZE, 5 * 3))
            plt.imshow(image[..., 3 * index : 3 * index + 3])
            plt.axis("off")
    plt.show()

## Patch Encoding 레이어의 구현 (Implement the patch encoding layer)
프로젝션 된 패치를 입력으로 받은 후 위치정보(positional information)를 추가합니다.

In [None]:
class PatchEncoder(layers.Layer):
    def __init__(
        self, num_patches=NUM_PATCHES, projection_dim=PROJECTION_DIM, **kwargs
    ):
        super().__init__(**kwargs)
        self.num_patches = num_patches
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )
        self.positions = tf.range(start=0, limit=self.num_patches, delta=1)

    def call(self, encoded_patches):
        encoded_positions = self.position_embedding(self.positions)
        encoded_patches = encoded_patches + encoded_positions
        return encoded_patches


## Locality Self Attention의 구현 (Implement Locality Self Attention)

어텐션(attention)의 일반수식은 아래와 같습니다. 

| ![Equation of attention](https://miro.medium.com/max/396/1*P9sV1xXM10t943bXy_G9yg.png) |
| :--: |
| [Source](https://towardsdatascience.com/attention-is-all-you-need-discovering-the-transformer-paper-73e5ff5e0634) |

어텐션 모듈은 쿼리(query), 키(key), 벨류(value) 값을 사용합니다. 먼저 내적(dot prodcut)를 이용하여 쿼리와 키의 유사도를 계산합니다. 그 값은 키(key) 차원의 제곱근(squre root)으로 그 값이 조정됩니다. 이 조정을 통해 softmax함수가 굉장히 작은 기울기 값을 갖는 것을 방지할 수 있습니다. 이제 이 값은 Softmax 계산을 통해 어텐션을 위한 가중치 값으로 변하게 되고 이 어텐션 가중치가 벨류 값에 곱해져서 값의 조정이 발생합니다.

셀프 어텐션에서는 쿼리, 키, 벨류 값이 전부 동일한 입력을 갖습니다. 
이 내적의 계산을 통해서 토큰 간의 관계값(inter-token relations)보다는 큰 값의 셀프토큰 관계값(self-token relations)를 얻을 수 있습니다.
이것은 softmax가 토큰 간의 관계값 보다 셀프토큰 관계값에 더 큰 확률을 부여한다는 것을 의미합니다. 
이를 방지하기 위하여 저자들은 **내적의 대각선 값들을 마스킹**하는 것을 제안합니다. 
이 방법으로 어텐션 모듈이 토큰 간의 관계값 계산에 더 집중하도록 할 수 있습니다. 

일반적인 어텐션 모듈에서 스케일링 팩터(sacling factor)는 상수(constant) 입니다. 
이것은 sofmax 함수를 조절할 수 있는 온도 항처럼 사용 됩니다. 
저자들은 **온도 항을 상수항이 아닌 학습 가능한 형태로 사용**하는 것을 제안합니다. 


| ![Implementation of LSA](https://i.imgur.com/GTV99pk.png) |
| :--: |
| Locality Self Attention [Source](https://arxiv.org/abs/2112.13492v1) |

위의 두 아이디어가 바로 Locality Self Attention을 만듭니다. 예제에서는 이를 위해서 [`layers.MultiHeadAttention`](https://www.tensorflow.org/api_docs/python/tf/keras/layers/MultiHeadAttention)를 서브클래싱하고 학습 가능한 온도 항을 만들었습니다. 어텐션 마스크(attnetion mask)는 다음 스테이지에서 구현됩니다.

In [None]:

class MultiHeadAttentionLSA(tf.keras.layers.MultiHeadAttention):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # 학습 가능한 온도 텀. 초기 값은 키(key) 차원의 제곱근 값 (squre root)
        self.tau = tf.Variable(math.sqrt(float(self._key_dim)), trainable=True)

    def _compute_attention(self, query, key, value, attention_mask=None, training=None):
        query = tf.multiply(query, 1.0 / self.tau)
        attention_scores = tf.einsum(self._dot_product_equation, key, query)
        attention_scores = self._masked_softmax(attention_scores, attention_mask)
        attention_scores_dropout = self._dropout_layer(
            attention_scores, training=training
        )
        attention_output = tf.einsum(
            self._combine_equation, attention_scores_dropout, value
        )
        return attention_output, attention_scores


## MLP 구현하기 (Implement the MLP)

In [None]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x


# Build the diagonal attention mask
diag_attn_mask = 1 - tf.eye(NUM_PATCHES)
diag_attn_mask = tf.cast([diag_attn_mask], dtype=tf.int8)

## ViT 만들기 (Build the ViT)

In [None]:

def create_vit_classifier(vanilla=False):
    inputs = layers.Input(shape=INPUT_SHAPE)
    # 데이터 증강
    augmented = data_augmentation(inputs)
    # 패치 생성
    (tokens, _) = ShiftedPatchTokenization(vanilla=vanilla)(augmented)
    # 패치 인코딩
    encoded_patches = PatchEncoder()(tokens)

    # 다중 레이어의 트렌스포머 블록 만들기
    for _ in range(TRANSFORMER_LAYERS):
        # 레이어 노말라이제이션 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # 멀티-헤드 어텐션 레이어 만들기.
        if not vanilla:
            attention_output = MultiHeadAttentionLSA(
                num_heads=NUM_HEADS, key_dim=PROJECTION_DIM, dropout=0.1
            )(x1, x1, attention_mask=diag_attn_mask)
        else:
            attention_output = layers.MultiHeadAttention(
                num_heads=NUM_HEADS, key_dim=PROJECTION_DIM, dropout=0.1
            )(x1, x1)
        # 스캡 커넥션 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # 레이어 노말라이제이션 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=TRANSFORMER_UNITS, dropout_rate=0.1)
        # 스킵 커넥션 2.
        encoded_patches = layers.Add()([x3, x2])

    # [batch_size, projection_dim] 크기의 텐서(tensor) 만들기.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)
    # MLP 추가
    features = mlp(representation, hidden_units=MLP_HEAD_UNITS, dropout_rate=0.5)
    # 출력을 분류하기
    logits = layers.Dense(NUM_CLASSES)(features)
    # Keras 모델로 만들기 
    model = keras.Model(inputs=inputs, outputs=logits)
    return model


## 컴파일, 학습, 모드 평가 (Compile, train, and evaluate the mode)

In [None]:
# 몇몇 코드는 아래의 링크에서 가져 왔습니다 : 
# https://www.kaggle.com/ashusma/training-rfcx-tensorflow-tpu-effnet-b2.
class WarmUpCosine(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(
        self, learning_rate_base, total_steps, warmup_learning_rate, warmup_steps
    ):
        super(WarmUpCosine, self).__init__()

        self.learning_rate_base = learning_rate_base
        self.total_steps = total_steps
        self.warmup_learning_rate = warmup_learning_rate
        self.warmup_steps = warmup_steps
        self.pi = tf.constant(np.pi)

    def __call__(self, step):
        if self.total_steps < self.warmup_steps:
            raise ValueError("Total_steps must be larger or equal to warmup_steps.")

        cos_annealed_lr = tf.cos(
            self.pi
            * (tf.cast(step, tf.float32) - self.warmup_steps)
            / float(self.total_steps - self.warmup_steps)
        )
        learning_rate = 0.5 * self.learning_rate_base * (1 + cos_annealed_lr)

        if self.warmup_steps > 0:
            if self.learning_rate_base < self.warmup_learning_rate:
                raise ValueError(
                    "Learning_rate_base must be larger or equal to "
                    "warmup_learning_rate."
                )
            slope = (
                self.learning_rate_base - self.warmup_learning_rate
            ) / self.warmup_steps
            warmup_rate = slope * tf.cast(step, tf.float32) + self.warmup_learning_rate
            learning_rate = tf.where(
                step < self.warmup_steps, warmup_rate, learning_rate
            )
        return tf.where(
            step > self.total_steps, 0.0, learning_rate, name="learning_rate"
        )


def run_experiment(model):
    total_steps = int((len(x_train) / BATCH_SIZE) * EPOCHS)
    warmup_epoch_percentage = 0.10
    warmup_steps = int(total_steps * warmup_epoch_percentage)
    scheduled_lrs = WarmUpCosine(
        learning_rate_base=LEARNING_RATE,
        total_steps=total_steps,
        warmup_learning_rate=0.0,
        warmup_steps=warmup_steps,
    )

    optimizer = tfa.optimizers.AdamW(
        learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY
    )

    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )

    history = model.fit(
        x=x_train,
        y=y_train,
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        validation_split=0.1,
    )
    _, accuracy, top_5_accuracy = model.evaluate(x_test, y_test, batch_size=BATCH_SIZE)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print(f"Test top 5 accuracy: {round(top_5_accuracy * 100, 2)}%")

    return history


# 바닐라 ViT로 실험.
vit = create_vit_classifier(vanilla=True)
history = run_experiment(vit)

# Shifted Patch Tokenizaiton과 
# Locality Self Attention을 적용한 ViT 실험
vit_sl = create_vit_classifier(vanilla=False)
history = run_experiment(vit_sl)

# 마무리 하며
Shifted Patch Tokenization과 Locality Self Attention을 이용하여 CIFAR100에서 ~**3-4%** top-1 정확도를 얻을 수 있었습니다. 

Shifted Patch Tokenization과 Locality Self Attention에 대한 아이디어는 매우 직관적이고 구현하기 쉽습니다. 저자는 논문의 부록에서 Shifted Patch Tokenization에 대한 서로 다른 shit전략에 대한 실험결과도 제시하였습니다. 

GPU 크레딧으로 실험할 수 있는 환경을 제공한 [Jarvislabs.ai](https://jarvislabs.ai/)에 감사 드립니다.

학습 완료된 모델을 [Hugging Face Hub](https://huggingface.co/keras-io/vit_small_ds_v2)에서 사용해 볼 수 있으며 데모 또한 [Hugging Face Spaces](https://huggingface.co/spaces/keras-io/vit-small-ds)에서 확인하실 수 있습니다.