## Setup

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator,load_img, img_to_array
import tensorflow_addons as tfa
import pandas as pd
import os
from sklearn.model_selection import train_test_split


from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
print(tf.__version__)

In [None]:
def get_filepaths(directory):
    
    file_paths = []  # List which will store all of the full filepaths.
    file_image = []  # Image filename
   
    # Walk the tree.
    for root, directories, files in os.walk(directory):
        for filename in files:
            # Join the two strings in order to form the full filepath.
                       
            filepath = os.path.join(root, filename)
            file_paths.append(filepath)  # Add it to the list.       
            file_image.append(filename.split('.')[0])

    return file_paths, file_image 

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
        
    def __init__(self, df_X, arr_Y, batch_size=32,shuffle=False,image_size=72):
        self.batch_size = batch_size
        self.df_X = df_X
        self.arr_Y = arr_Y
        self.indices = self.df_X.index.tolist()
        self.shuffle = shuffle
        self.image_size = image_size
        self.on_epoch_end()
        
    def __len__(self):
        return int(np.floor(len(self.indices) / self.batch_size))

    def __getitem__(self, index):
        index = self.index[index * self.batch_size:(index + 1) * self.batch_size]
        batch = [self.indices[k] for k in index]
        
        X, y = self.__get_data(batch)
        return X, y
    
    def n(self):
        return len(self.indices)
    
    def on_epoch_end(self):
        self.index = np.arange(len(self.indices))
        if self.shuffle == True:
            np.random.shuffle(self.index)

    def __get_data(self, batch):
        X = []
        y = []
               
        for i, id in enumerate(batch):
                       
            # Data
            file = self.df_X.iloc[self.indices[id],0]
            img = load_img(file,color_mode='rgb', target_size=(self.image_size,self.image_size),interpolation='nearest')
                                                
            img = img_to_array(img).astype(np.float32)
                        
            X.append(img/255.)
            
            y.append(self.arr_Y[self.indices[id]])
            
        return np.array(X), np.array(y).reshape(self.batch_size,1)

## Prepare the data

In [None]:
data = pd.read_csv('train.csv')

In [None]:
data = data.loc[pd.isna(data.bolts) == False]

In [None]:
num_classes = 1
input_shape = (72, 72, 3)

In [None]:
file_path, image_id = get_filepaths('images')

In [None]:
X_train, X_test, y_train, y_test = train_test_split(data, data.bolts.values, test_size=0.2, random_state=42)

In [None]:
X_train.reset_index(inplace=True,drop=True)
X_test.reset_index(inplace=True,drop=True)

## Configure the hyperparameters

In [None]:
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 8
num_epochs = 30
image_size = 72  # We'll resize input images to this size
patch_size = 6  # Size of the patches to be extract from the input images
num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 16
mlp_head_units = [2048, 1024]  # Size of the dense layers of the final classifier


## Implement multilayer perceptron (MLP)

In [None]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

## Implement patch creation as a layer

In [None]:
class Patches(layers.Layer):
    def __init__(self, patch_size):
        super(Patches, self).__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(4, 4))

image = img_to_array(load_img(X_train.loc[np.random.choice(range(X_train.shape[0])),'file_path'],
                     color_mode='rgb', 
                     target_size=(image_size,image_size),
                     interpolation='nearest'))

plt.imshow(image.astype("uint8"))
plt.axis("off")

resized_image = tf.image.resize(
    tf.convert_to_tensor([image]), size=(image_size, image_size)
)

patches = Patches(patch_size)(resized_image)
print(f"Image size: {image_size} X {image_size}")
print(f"Patch size: {patch_size} X {patch_size}")
print(f"Patches per image: {patches.shape[1]}")
print(f"Elements per patch: {patches.shape[-1]}")

n = int(np.sqrt(patches.shape[1]))
plt.figure(figsize=(4, 4))
for i, patch in enumerate(patches[0]):
    ax = plt.subplot(n, n, i + 1)
    patch_img = tf.reshape(patch, (patch_size, patch_size, 3))
    plt.imshow(patch_img.numpy().astype("uint8"))
    plt.axis("off")

## Implement the patch encoding layer

The `PatchEncoder` layer will linearly transform a patch by projecting it into a
vector of size `projection_dim`. In addition, it adds a learnable position
embedding to the projected vector.

In [None]:
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder, self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded


## Build the ViT model

The ViT model consists of multiple Transformer blocks,
which use the `layers.MultiHeadAttention` layer as a self-attention mechanism
applied to the sequence of patches. The Transformer blocks produce a
`[batch_size, num_patches, projection_dim]` tensor, which is processed via an
classifier head with softmax to produce the final class probabilities output.

Unlike the technique described in the [paper](https://arxiv.org/abs/2010.11929),
which prepends a learnable embedding to the sequence of encoded patches to serve
as the image representation, all the outputs of the final Transformer block are
reshaped with `layers.Flatten()` and used as the image
representation input to the classifier head.
Note that the `layers.GlobalAveragePooling1D` layer
could also be used instead to aggregate the outputs of the Transformer block,
especially when the number of patches and the projection dimensions are large.

In [None]:

def create_vit_classifier():
    inputs = layers.Input(shape=input_shape)
    # Create patches.
    patches = Patches(patch_size)(inputs)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1)(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0.1)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)
    # Add MLP.
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.1)
    # Classify outputs.
    output = layers.Dense(1,activation='linear')(features)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=output)
    return model



In [None]:
def get_model():
    base_model = EfficientNetB0(
        include_top=False,
        weights=None,
        input_tensor=None,
        input_shape=(72, 72, 3))
    x = base_model.output
    logits = GlobalAveragePooling2D()(x)
    output = layers.Dense(1,activation='linear')(logits)

    model = Model(inputs=base_model.input, outputs=output)
    return model

## Compile, train, and evaluate the model

In [None]:

def run_experiment(model):
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss=tf.keras.losses.MeanSquaredError(name='mse'))

    checkpoint_filepath = "tmp/checkpoint_cnn.h5"
    checkpoint_callback = keras.callbacks.ModelCheckpoint(
        checkpoint_filepath,
        monitor="val_loss",
        save_best_only=True,
        save_weights_only=True,
    )

    train_generator = DataGenerator(df_X=X_train, arr_Y=y_train, batch_size=batch_size, shuffle=True)
    test_generator = DataGenerator(df_X=X_test, arr_Y=y_test, batch_size=batch_size)
    
    STEP_SIZE_TRAIN = train_generator.n()//train_generator.batch_size
    STEP_SIZE_VALID = test_generator.n()//test_generator.batch_size
    
    history = model.fit(x=train_generator,
                        steps_per_epoch=STEP_SIZE_TRAIN,
                        validation_data=test_generator,
                        validation_steps=STEP_SIZE_VALID, 
                        batch_size=batch_size,
                        epochs=num_epochs,
                        callbacks=[checkpoint_callback],
    )

    model.load_weights(checkpoint_filepath)
    loss = model.evaluate(x=test_generator,steps=STEP_SIZE_VALID)
    print(f"loss : {loss}")

    return history


vit_classifier = get_model() #create_vit_classifier()
history = run_experiment(vit_classifier)
