<a href="https://colab.research.google.com/github/changdaeoh/DACON_Dirty-MNIST/blob/main/0227_ViT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 구글드라이브와 연동
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install tensorflow_addons
import tensorflow_addons as tfa

# common modules
import tensorflow as tf
from tensorflow import  keras

import numpy as np
import pandas as pd 
import random

from tensorflow.keras.preprocessing.image import ImageDataGenerator

import os
os.chdir('/content/drive/MyDrive/project_dataset/dacon_v2')

# 데이터 경로
train_dir = "/content/drive/MyDrive/project_dataset/dacon_v2/dirty_mnist_2nd"
test_dir = "/content/drive/MyDrive/project_dataset/dacon_v2/test_route"

# GPU 확인
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
    raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

# global seed 고정
SEED = 227

def seed_everything(seed = 42):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    tf.random.set_seed(seed)

seed_everything(SEED)

Collecting tensorflow_addons
[?25l  Downloading https://files.pythonhosted.org/packages/74/e3/56d2fe76f0bb7c88ed9b2a6a557e25e83e252aec08f13de34369cd850a0b/tensorflow_addons-0.12.1-cp37-cp37m-manylinux2010_x86_64.whl (703kB)
[K     |▌                               | 10kB 17.2MB/s eta 0:00:01[K     |█                               | 20kB 20.8MB/s eta 0:00:01[K     |█▍                              | 30kB 24.6MB/s eta 0:00:01[K     |█▉                              | 40kB 21.6MB/s eta 0:00:01[K     |██▎                             | 51kB 16.7MB/s eta 0:00:01[K     |██▉                             | 61kB 15.1MB/s eta 0:00:01[K     |███▎                            | 71kB 14.4MB/s eta 0:00:01[K     |███▊                            | 81kB 14.6MB/s eta 0:00:01[K     |████▏                           | 92kB 13.6MB/s eta 0:00:01[K     |████▋                           | 102kB 13.1MB/s eta 0:00:01[K     |█████▏                          | 112kB 13.1MB/s eta 0:00:01[K     |████

In [None]:
from tensorflow.keras import layers

# Configure the hyperparameters

In [None]:
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 128
num_epochs = 100
image_size = 72  # We'll resize input images to this size
patch_size = 6  # Size of the patches to be extract from the input images
num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [2048, 1024]  # Size of the dense layers of the final classifier

# Prepare the data

In [None]:
num_classes = 26
input_shape = (256, 256, 3)


In [None]:
meta_df = pd.read_csv('dirty_mnist_2nd_answer.csv')
meta_df['index'] = meta_df['index'].apply(lambda x: str("{0:05d}".format(x))+'.png')
columns = list(meta_df.columns[1:])


datagen = ImageDataGenerator(rescale=1./255.,
                             rotation_range = 10,
                             width_shift_range = 0.1,
                             height_shift_range = 0.1,
                             horizontal_flip = True,
                             vertical_flip = True,
                             validation_split = 0.1)

# generator
train_gen = datagen.flow_from_dataframe(dataframe = meta_df,        
                                        directory = train_dir,       
                                        x_col='index',               
                                        y_col=columns,                
                                        batch_size = batch_size,               
                                        seed = SEED,
                                        color_mode = "rgb",           
                                        class_mode='raw',
                                        target_size=(256, 256),       
                                        subset='training')

valid_gen = datagen.flow_from_dataframe(dataframe = meta_df,        
                                        directory = train_dir,       
                                        x_col='index',               
                                        y_col=columns,                
                                        batch_size = batch_size,               
                                        seed = SEED,
                                        color_mode = "rgb",           
                                        class_mode='raw',
                                        target_size=(256, 256),       
                                        subset='validation')

Found 45000 validated image filenames.
Found 5000 validated image filenames.


# Use data augmentation

In [None]:
data_augmentation = keras.Sequential(
    [
        # layers.experimental.preprocessing.Normalization(),
        layers.experimental.preprocessing.Resizing(image_size, image_size)
        # layers.experimental.preprocessing.RandomFlip("horizontal"),
        # layers.experimental.preprocessing.RandomRotation(factor=0.02),
        # layers.experimental.preprocessing.RandomZoom(
        #     height_factor=0.2, width_factor=0.2
        # ),
    ],
    name="data_augmentation",
)
# Compute the mean and the variance of the training data for normalization.
# data_augmentation.layers[0].adapt(x_train)

# Implement multilayer perceptron (MLP)

In [None]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

# Implement patch creation as a layer

In [None]:
class Patches(layers.Layer):
    def __init__(self, patch_size):
        super(Patches, self).__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches

# Implement the patch encoding layer

In [None]:
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder, self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded

# Build the ViT model

In [None]:
def create_vit_classifier():
    inputs = layers.Input(shape=input_shape)
    # Augment data.
    augmented = data_augmentation(inputs) 

    # Create patches.
    patches = Patches(patch_size)(augmented)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0.1)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)
    # Add MLP.
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.5)
    # Classify outputs.
    logits = layers.Dense(num_classes)(features)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=logits)
    return model

# Compile, train, and evaluate the mode

In [None]:
def run_experiment(model):
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss=keras.losses.BinaryCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.BinaryAccuracy(name='binary_accuracy')
        ],
    )

    checkpoint_filepath = "/model/ViT_vanilla_0227.h5"
    checkpoint_callback = keras.callbacks.ModelCheckpoint(
        checkpoint_filepath,
        monitor="val_binary_accuracy",
        save_best_only=True
        # save_weights_only=True,
    )

    history = model.fit(
        train_gen,
        batch_size=batch_size,
        epochs=num_epochs,
        validation_data = valid_gen,
        callbacks=[checkpoint_callback],
    )

    # model.load_weights(checkpoint_filepath)
    # _, accuracy, top_5_accuracy = model.evaluate(x_test, y_test)
    # print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    # print(f"Test top 5 accuracy: {round(top_5_accuracy * 100, 2)}%")

    return history


vit_classifier = create_vit_classifier()
history = run_experiment(vit_classifier)

Epoch 1/100