In [None]:
import os
import numpy as np
import cv2
from glob import glob
from tqdm import tqdm
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from patchify import patchify
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger, EarlyStopping

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!tar -xvzf /content/drive/MyDrive/flower_photos.tgz

In [None]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

In [None]:
""" Hyperparameters """
hp = {}
hp["image_size"] = 200
hp["num_channels"] = 3
hp["patch_size"] = 25
hp["num_patches"] = (hp["image_size"] **2) // (hp["patch_size"] **2)
hp["flat_patches_shape"] = (hp["num_patches"], hp["patch_size"] * hp["patch_size"] * hp["num_channels"])

hp["batch_size"] = 8
hp["learning_rate"] = 1e-4
hp["num_epochs"] = 500
hp["num_classes"] = 5
hp["class_names"] = ["daisy", "dandelion", "roses", "sunflowers", "tulips"]

hp["num_layers"] = 12
hp["hidden_dim"] = 768
hp["mlp_dim"] = 3072
hp["num_heads"] = 12
hp["dropout_rate"] = 0.1

In [None]:
dataset_path = "/content/flower_photos"

In [None]:
def load_data(dataset_path, split=0.1):
    images = shuffle(glob(os.path.join(dataset_path, "*", "*.jpg")))
    #print(len(images))

    split_size = int(len(images) * split)

    X_train, X_val = train_test_split(images, test_size=split_size, random_state=42)
    X_train, X_test = train_test_split(X_train, test_size=split_size, random_state=42)

    return X_train, X_val, X_test

In [None]:
def process_image_label(path):
    #print(path)
    path = path.decode()
    #print(path)
    image = cv2.imread(path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (hp["image_size"], hp["image_size"]))
    image = image / 255.0
    #print(image.shape)

    """ Preprocessing to patches """
    patch_shape = (hp["patch_size"], hp["patch_size"], hp["num_channels"])
    patches = patchify(image, patch_shape, hp["patch_size"])
    # patches = np.reshape(patches, (64, 25, 25, 3))
    # for i in range(64):
    #     cv2.imwrite(save_path + f"\\files\\{i}.png", patches[i])

    patches = np.reshape(patches, hp["flat_patches_shape"])
    patches = patches.astype(np.float32)

    """ Label """

    class_name = path.split("/")[-2]
    #print(class_name)
    class_index = hp["class_names"].index(class_name)
    #print(class_index)
    class_index = np.array(class_index, dtype=np.int32)
    #print(patches.shape)

    return patches, class_index

In [None]:
def parse(path):
    patches, labels = tf.numpy_function(process_image_label, [path], [tf.float32, tf.int32])
    labels = tf.one_hot(labels, hp["num_classes"])

    patches.set_shape(hp["flat_patches_shape"])
    labels.set_shape(hp["num_classes"])

    return patches, labels

In [None]:
def tf_dataset(images, batch=32):
    ds = tf.data.Dataset.from_tensor_slices((images))
    ds = ds.map(parse).batch(batch).prefetch(8)
    return ds

In [None]:
class ClassToken(Layer):
    def __init__(self):
        super().__init__()

    def build(self, input_shape):
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(
            initial_value = w_init(shape=(1, 1, input_shape[-1]), dtype=tf.float32),
            trainable = True
        )

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        hidden_dim = self.w.shape[-1]

        cls = tf.broadcast_to(self.w, [batch_size, 1, hidden_dim])
        cls = tf.cast(cls, dtype=inputs.dtype)
        return cls

In [None]:
def mlp(x, cf):
    x = Dense(cf["mlp_dim"], activation="gelu")(x)
    x = Dropout(cf["dropout_rate"])(x)
    x = Dense(cf["hidden_dim"])(x)
    x = Dropout(cf["dropout_rate"])(x)
    return x

In [None]:
def transformer_encoder(x, cf):
    skip_1 = x
    x = LayerNormalization()(x)
    x = MultiHeadAttention(
        num_heads=cf["num_heads"], key_dim=cf["hidden_dim"]
    )(x, x)
    x = Add()([x, skip_1])

    skip_2 = x
    x = LayerNormalization()(x)
    x = mlp(x, cf)
    x = Add()([x, skip_2])

    return x

In [None]:
def ViT(cf):
    input_shape = (cf["num_patches"], cf["patch_size"] * cf["patch_size"] * cf["num_channels"])
    inputs = Input(input_shape)
    #print(inputs.shape)

    """ Patch + Position Embeddings """
    patch_embed = Dense(cf["hidden_dim"])(inputs)
    #print(patch_embed.shape)

    positions = tf.range(start=0, limit=cf["num_patches"], delta=1)
    #print(positions)
    pos_embed = Embedding(input_dim=cf["num_patches"], output_dim=cf["hidden_dim"])(positions)
    #print(pos_embed)

    embed = patch_embed + pos_embed
    #print(embed.shape)

    """ Adding Class Tokec """
    token = ClassToken()(embed)
    x = Concatenate(axis=1)([token, embed])
    #print(x.shape)

    for _ in range(cf["num_layers"]):
        x = transformer_encoder(x, cf)

    #print(x.shape)

    """ Classification Head """
    x = LayerNormalization()(x)
    x = x[:, 0, :]
    #print(x.shape)
    x = Dense(cf["num_classes"], activation="softmax")(x)

    model = Model(inputs, x)
    return model

In [None]:
if __name__ == "__main__":
    config = {}
    config["num_layers"] = 12
    config["hidden_dim"] = 768
    config["mlp_dim"] = 3072
    config["num_heads"] = 12
    config["dropout_rate"] = 0.1
    config["num_patches"] = 256
    config["patch_size"] = 32
    config["num_channels"] = 3
    config["num_classes"] = 5

    model = ViT(config)
    model.summary()

In [None]:
if __name__ == "__main__":

    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory for storing files """
    create_dir("/content/drive/MyDrive/files" + "/flowers")

    """ Paths """
    model_path = os.path.join("/content/drive/MyDrive/files", "flowers", "Image_Classification_using_Vision_Transformer_Vit.h5")
    csv_path = os.path.join("/content/drive/MyDrive/files", "flowers", "Image_Classification_using_Vision_Transformer_Vit.csv ")

    """ Dataset """
    X_train, X_val, X_test = load_data(dataset_path)
    #print(f"Train - {len(X_train)}, Validation - {len(X_val)}, Test - {len(X_test)}")

    #process_image_label(X_train[0], dataset_path)

    train_dataset = tf_dataset(X_train, batch=hp["batch_size"])
    val_dataset = tf_dataset(X_val, batch=hp["batch_size"])
    # for x, y in train_dataset:
    #     print(x.shape, y.shape)
    #     break

    """ Model """
    model = ViT(hp)
    model.compile(
        loss="categorical_crossentropy",
        optimizer = tf.keras.optimizers.Adam(learning_rate=hp["learning_rate"], clipvalue=1.0),
        metrics = ["accuracy"]
    )

    callbacks = [
        ModelCheckpoint(model_path, monitor="val_loss", verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=10, min_lr=1e-1),
        CSVLogger(csv_path),
        EarlyStopping(monitor="val_loss", patience=50, restore_best_weights=False)
    ]

In [None]:
model.fit(
    train_dataset,
    epochs=hp["num_epochs"],
    validation_data=val_dataset,
    callbacks=callbacks
)

In [None]:
if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Paths """
    model_path = os.path.join("/content/drive/MyDrive/files", "flowers", "Image_Classification_using_Vision_Transformer_Vit.h5")

    """ Dataset """
    X_train, X_val, X_test = load_data(dataset_path)
    print(f"Train - {len(X_train)}, Validation - {len(X_val)}, Test - {len(X_test)}")

    test_ds = tf_dataset(X_test, batch=hp["batch_size"])

    """ Model """
    model = ViT(hp)
    model.load_weights(model_path)
    model.compile(
        loss="categorical_crossentropy",
        optimizer = tf.keras.optimizers.Adam(learning_rate=hp["learning_rate"], clipvalue=1.0),
        metrics = ["accuracy"]
    )

    model.evaluate(test_ds)