<a href="https://colab.research.google.com/github/WoojinJeonkr/DeepLearning/blob/main/Image_Classification_with_Perceiver.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Perceiver를 이용한 이미지 분류
- 내용 출처: https://keras.io/examples/vision/perceiver_image_classification/
- 사용 데이터 세트: CIFAR-100 데이터 세트
- 목표: Andrew Jaegle et al. 의 Perceiver: Iterative Attention을 사용한 일반 인식 모델 구현

## Perceiver란?
- 내용 출처: https://yhkim4504.tistory.com/14
- 구글 딥마인드에서 제안한 기존 Perceiver의 단점을 보완한 모델
- 비대칭 주의 메커니즘을 활용하여 입력을 반복적으로 좁은 잠재 병목 상태로 증류하여 매우 큰 입력을 처리하도록 확장 가능
- 어떠한 데이터 형태도 처리할 수 있으면서 계산과 메모리 사용이 입력 사이즈에 선형적으로 작동하지만(기존의 transformer는 데이터가 클수록 느려짐) 간단한 output 형태만 출력 가능
- NLP, Vision, Audio, Multi-modal

In [2]:
!pip install -U tensorflow-addons

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## 01. 라이브러리 불러오기

In [3]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

## 02. 데이터 로드

In [4]:
num_classes = 100
input_shape = (32, 32, 3)

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar100.load_data()

print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

x_train shape: (50000, 32, 32, 3) - y_train shape: (50000, 1)
x_test shape: (10000, 32, 32, 3) - y_test shape: (10000, 1)


## 03. 하이퍼파라미터 구성

In [5]:
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 64
num_epochs = 50
dropout_rate = 0.2
image_size = 64
patch_size = 2
num_patches = (image_size // patch_size) ** 2
latent_dim = 256
projection_dim = 256
num_heads = 8
ffn_units = [
    projection_dim,
    projection_dim,
]
num_transformer_blocks = 4
num_iterations = 2
classifier_units = [
    projection_dim,
    num_classes,
]

In [6]:
print(f"Image size: {image_size} X {image_size} = {image_size ** 2}")
print(f"Patch size: {patch_size} X {patch_size} = {patch_size ** 2} ")
print(f"Patches per image: {num_patches}")
print(f"Elements per patch (3 channels): {(patch_size ** 2) * 3}")
print(f"Latent array shape: {latent_dim} X {projection_dim}")
print(f"Data array shape: {num_patches} X {projection_dim}")

Image size: 64 X 64 = 4096
Patch size: 2 X 2 = 4 
Patches per image: 1024
Elements per patch (3 channels): 12
Latent array shape: 256 X 256
Data array shape: 1024 X 256


## 04. 데이터 보강

In [7]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)
data_augmentation.layers[0].adapt(x_train)

## 05. 피드포워드 네트워크(FFN) 구성
- 내용 출처: [딥러닝 피드 포워드 (feed forward)](https://m.blog.naver.com/beyondlegend/221373971859)
- 피드포워드(Feed Forward): 입력 층으로 데이터가 입력되고, 1개 이상으로 구성되는 은닉층을 거쳐서 마지막에 있는 출력층으로 출력값을 내보내는 과정
=> 이전 층에서 나온 출력값이 층과 층 사이에 적용되는 가중치 영향을 받은 다음 다음층의 입력 값으로 들어 가는 것


In [8]:
def create_ffn(hidden_units, dropout_rate):
    ffn_layers = []
    for units in hidden_units[:-1]:
        ffn_layers.append(layers.Dense(units, activation=tf.nn.gelu))

    ffn_layers.append(layers.Dense(units=hidden_units[-1]))
    ffn_layers.append(layers.Dropout(dropout_rate))

    ffn = keras.Sequential(ffn_layers)
    return ffn

## 06. 레이어로 패치 생성 구현

In [9]:
class Patches(layers.Layer):
    def __init__(self, patch_size):
        super(Patches, self).__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches

## 07. 패치 인코딩 계층 구현

In [10]:
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder, self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patches):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patches) + self.position_embedding(positions)
        return encoded

## 08. Perceiver 모델 구현
- 교차주의 모듈과 자가주의가 있는 표준 변환기의 두 가지 모듈로 구성

### 교차주의 모듈
1. (latent_dim, projection_dim)잠재 배열을 예상하고 (data_dim, projection_dim)데이터 배열을 입력으로 예상하여 잠재 배열 (latent_dim, projection_dim)을 출력으로 생성
2. 교차 주의를 적용하기 위해 query벡터는 잠재 배열에서 생성 key되고 및 value벡터는 인코딩된 이미지에서 생성

In [11]:
def create_cross_attention_module(
    latent_dim, data_dim, projection_dim, ffn_units, dropout_rate
):

    inputs = {
        "latent_array": layers.Input(shape=(latent_dim, projection_dim)),
        "data_array": layers.Input(shape=(data_dim, projection_dim)),
    }

    latent_array = layers.LayerNormalization(epsilon=1e-6)(inputs["latent_array"])
    data_array = layers.LayerNormalization(epsilon=1e-6)(inputs["data_array"])

    query = layers.Dense(units=projection_dim)(latent_array)
    key = layers.Dense(units=projection_dim)(data_array)
    value = layers.Dense(units=projection_dim)(data_array)

    attention_output = layers.Attention(use_scale=True, dropout=0.1)(
        [query, key, value], return_attention_scores=False
    )

    attention_output = layers.Add()([attention_output, latent_array])

    attention_output = layers.LayerNormalization(epsilon=1e-6)(attention_output)
    ffn = create_ffn(hidden_units=ffn_units, dropout_rate=dropout_rate)
    outputs = ffn(attention_output)
    outputs = layers.Add()([outputs, attention_output])

    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

### 변압기 모듈
- 교차 주의 모듈의 출력 잠재 벡터를 입력으로 예상하고 다중 헤드 자체 주의를 해당 latent_dim요소에 적용한 다음 피드포워드 네트워크를 적용하여 다른 (latent_dim, projection_dim)잠재 배열을 생성

In [12]:
def create_transformer_module(
    latent_dim,
    projection_dim,
    num_heads,
    num_transformer_blocks,
    ffn_units,
    dropout_rate,
):

    inputs = layers.Input(shape=(latent_dim, projection_dim))
    x0 = inputs

    for _ in range(num_transformer_blocks):
        x1 = layers.LayerNormalization(epsilon=1e-6)(x0)
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(x1, x1)
        x2 = layers.Add()([attention_output, x0])
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        ffn = create_ffn(hidden_units=ffn_units, dropout_rate=dropout_rate)
        x3 = ffn(x3)
        x0 = layers.Add()([x3, x2])

    model = keras.Model(inputs=inputs, outputs=x0)
    return model

## 감지기 모델
- 잠재적인 배열이 필요에 따라 반복적으로 입력 이미지에서 정보를 추출할 수 있도록 공유 가중치 및 건너뛰기 연결을 사용 하여 교차 주의 및 변환기 모듈 시간을 반복

In [13]:
class Perceiver(keras.Model):
    def __init__(
        self,
        patch_size,
        data_dim,
        latent_dim,
        projection_dim,
        num_heads,
        num_transformer_blocks,
        ffn_units,
        dropout_rate,
        num_iterations,
        classifier_units,
    ):
        super(Perceiver, self).__init__()

        self.latent_dim = latent_dim
        self.data_dim = data_dim
        self.patch_size = patch_size
        self.projection_dim = projection_dim
        self.num_heads = num_heads
        self.num_transformer_blocks = num_transformer_blocks
        self.ffn_units = ffn_units
        self.dropout_rate = dropout_rate
        self.num_iterations = num_iterations
        self.classifier_units = classifier_units

    def build(self, input_shape):
        # latent array 생성
        self.latent_array = self.add_weight(
            shape=(self.latent_dim, self.projection_dim),
            initializer="random_normal",
            trainable=True,
        )

        # 패치 모델 생성
        self.patcher = Patches(self.patch_size)

        # 패치 인코더 생성
        self.patch_encoder = PatchEncoder(self.data_dim, self.projection_dim)


       # 교차주의 어텐션 모델 생성
        self.cross_attention = create_cross_attention_module(
            self.latent_dim,
            self.data_dim,
            self.projection_dim,
            self.ffn_units,
            self.dropout_rate,
        )

        # 트랜스포머 모델 생성
        self.transformer = create_transformer_module(
            self.latent_dim,
            self.projection_dim,
            self.num_heads,
            self.num_transformer_blocks,
            self.ffn_units,
            self.dropout_rate,
        )

        self.global_average_pooling = layers.GlobalAveragePooling1D()
        self.classification_head = create_ffn(
            hidden_units=self.classifier_units, dropout_rate=self.dropout_rate
        )

        super(Perceiver, self).build(input_shape)

    def call(self, inputs):
        augmented = data_augmentation(inputs)
        patches = self.patcher(augmented)
        encoded_patches = self.patch_encoder(patches)
        cross_attention_inputs = {
            "latent_array": tf.expand_dims(self.latent_array, 0),
            "data_array": encoded_patches,
        }
        for _ in range(self.num_iterations):
            latent_array = self.cross_attention(cross_attention_inputs)
            latent_array = self.transformer(latent_array)
            cross_attention_inputs["latent_array"] = latent_array

        representation = self.global_average_pooling(latent_array)
        logits = self.classification_head(representation)
        return logits

## 09. 모드 컴파일, 훈련 및 평가

In [14]:
def run_experiment(model):
    optimizer = tfa.optimizers.LAMB(
        learning_rate=learning_rate, weight_decay_rate=weight_decay,
    )

    # 모델 컴파일
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="acc"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top5-acc"),
        ],
    )

    reduce_lr = keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.2, patience=3
    )

    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=15, restore_best_weights=True
    )

    # Fit the model.
    history = model.fit(
        x=x_train,
        y=y_train,
        batch_size=batch_size,
        epochs=num_epochs,
        validation_split=0.1,
        callbacks=[early_stopping, reduce_lr],
    )

    _, accuracy, top_5_accuracy = model.evaluate(x_test, y_test)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print(f"Test top 5 accuracy: {round(top_5_accuracy * 100, 2)}%")

    return history

In [15]:
# 모델 훈련
perceiver_classifier = Perceiver(
    patch_size,
    num_patches,
    latent_dim,
    projection_dim,
    num_heads,
    num_transformer_blocks,
    ffn_units,
    dropout_rate,
    num_iterations,
    classifier_units,
)


history = run_experiment(perceiver_classifier)