# Vision transformer models for defect detection in Leather samples.

In [None]:
# importing part
import math
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
#TensorFlow version: 2.13.1
from tensorflow.keras.preprocessing import image
from keras.applications.imagenet_utils import preprocess_input
import matplotlib.pyplot as plt
import os, time
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

In [None]:
#Load the image data
img_data = np.load("Two_class/output_image.npy")
#Load the Labels
labels  = np.load('Two_class/output_labels.npy')

print("[INFO] TRAINING SET SHAPE: {}".format(img_data.shape))
# TRAINING SET SHAPE: (3540, 227, 227, 3)

### Preprocessing data, and aggregate training and testing

In [None]:
# we need keras api (of tensorflow) for one-shot encoding, we gonna use np_utils of keras.
num_classes = 2
input_shape = (224, 224, 3)      

# Use LabelEncoder to convert string labels to numerical labels
label_encoder = LabelEncoder()
numerical_labels = label_encoder.fit_transform(labels)

# One-Hot Encoding of labels #
Y = to_categorical(numerical_labels,num_classes)
# Shuffle data              #
x,y = shuffle(img_data,Y,random_state=2)
# Split data - Train/Test   #
x_train,x_test,y_train,y_test = train_test_split(x,y,test_size=0.2,random_state=2)

print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

print("[INFO] PRE-PROCESSING COMPLETE --")
# x_train shape: (2832, 227, 227, 3) - y_train shape: (2832, 2)
# x_test shape: (708, 227, 227, 3) - y_test shape: (708, 2)

### configuring hyper parameter

In [None]:
learning_rate   = 0.001
weight_decay    = 0.0001
batch_size      = 96           # (64) reduced from 256
num_epochs      = 100           # reduced from 100
image_size      = 72            # We'll resize input images to this size 
patch_size      = 6             # (was 6) Size of the patches to be extract from the input images
num_patches     = (image_size // patch_size) ** 2
projection_dim  = 64
num_heads       = 4             # 8?
transformer_units = [projection_dim * 2, projection_dim, ] # Size of the transformer layers
transformer_layers = 8
#mlp_head_units  = [2048, 1024] # Size of the dense layers of the final classifier
mlp_head_units  = [512, 256]    # Change mlp_head_units


### Implement patch creation as a layer

In [None]:
# include layers from layer
class Patches(layers.Layer):
    def __init__(self, patch_size):
        super().__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches


In [None]:
### Implement the patch encoding layer

In [None]:
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super().__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim)

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded


## self attention mechanism to get context

In [None]:
class MultiHeadAttentionLSA(tf.keras.layers.MultiHeadAttention):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # The trainable temperature term. The initial value is
        # the square root of the key dimension.
        self.tau = tf.Variable(math.sqrt(float(self._key_dim)), trainable=True)

    def _compute_attention(self, query, key, value, attention_mask=None, training=None):
        query = tf.multiply(query, 1.0 / self.tau)
        attention_scores = tf.einsum(self._dot_product_equation, key, query)
        attention_scores = self._masked_softmax(attention_scores, attention_mask)
        attention_scores_dropout = self._dropout_layer(
            attention_scores, training=training
        )
        attention_output = tf.einsum(
            self._combine_equation, attention_scores_dropout, value
        )
        return attention_output, attention_scores

## data agumentation

In [None]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(height_factor=0.2, width_factor=0.2),
    ],
    name="data_augmentation",
)
## Compute the mean and the variance of the training data for normalization.
data_augmentation.layers[0].adapt(x_train)


In [None]:
## Implement multilayer perceptron (MLP)

In [None]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

## Build the diagonal attention mask

In [None]:
diag_attn_mask = 1 - tf.eye(num_patches)
diag_attn_mask = tf.cast([diag_attn_mask], dtype=tf.int8)


## Build the ViT model

In [None]:
def create_vit_classifier(vanilla=False):
    inputs = layers.Input(shape=input_shape)
    # Augment data.
    augmented = data_augmentation(inputs)
    # Create patches.
    patches = Patches(patch_size)(augmented)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        if not vanilla:
            print("[INFO] ~ MultiHeadAttentionLSA ~")
            attention_output = MultiHeadAttentionLSA(
                num_heads=num_heads, key_dim=projection_dim, dropout=0.1
            )(x1, x1, attention_mask=diag_attn_mask)
        else:
            attention_output = layers.MultiHeadAttention(
                num_heads=num_heads, key_dim=projection_dim, dropout=0.1)(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0.1)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)
    
    # Add MLP
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.5)
    
    # Add final dense layer with softmax activation.
    outputs = layers.Dense(num_classes, activation='softmax')(features)
    
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=outputs)
    
    return model


## Compile, train, and evaluate the mode

In [None]:
def run_experiment(model):
    ##----------------------------------##
    ## ~~ MODEL EVALUATION FUNCTIONS ~~ ##
    ##----------------------------------##
    from keras import backend as K

    def recall_m(y_true, y_pred):
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = true_positives / (possible_positives + K.epsilon())
        return recall

    def precision_m(y_true, y_pred):
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision

    def f1_m(y_true, y_pred):
        precision = precision_m(y_true, y_pred)
        recall = recall_m(y_true, y_pred)
        return 2*((precision*recall)/(precision+recall+K.epsilon()))

    ##----------------------------------##
    ##      COMPILE THE MODEL           ##
    ##----------------------------------##
    #model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy', f1_m, precision_m, recall_m])
    model.compile(optimizer='adam', 
                  loss='categorical_crossentropy', 
                  metrics=['accuracy', f1_m, precision_m, recall_m, tf.keras.metrics.AUC(name='auc')])


    checkpoint_filepath = "/checkpoint/"    #"/tmp/checkpoint"
    checkpoint_callback = ModelCheckpoint(
        checkpoint_filepath,
        monitor="val_accuracy",
        save_best_only=True,
        save_weights_only=True,
    )

    earlystop = EarlyStopping(
        monitor='val_accuracy', 
        patience=12, 
        mode='max', 
        verbose=1)

    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
    ##                          ~ MODEL FIT ~                       ##
    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
    t=time.time()
    history = model.fit(
        x=x_train,
        y=y_train,
        batch_size=batch_size,                          # 64
        epochs=num_epochs,                              # 100
        validation_split=0.2,                           # was 0.2
        callbacks=[checkpoint_callback],
    )
    ##----------------------------------##
    ##      EVALUATE THE MODEL          ##
    ##----------------------------------##
    #loss, accuracy, f1_score, precision, recall = model.evaluate(x_test, y_test, batch_size=batch_size, verbose=0)
    loss, accuracy, f1_score, precision, recall, auc = model.evaluate(x_test, y_test, batch_size=batch_size, verbose=0)

    # View Results
    print('[INFO] Training time: %s' % (t - time.time()))
    print("[INFO] accuracy={:.4f}%".format(accuracy * 100))
    print("[INFO] loss={:.4f}".format(loss))
    print("[INFO] precision={:.4f}%".format(precision))
    print("[INFO] recall={:.4f}%".format(recall))
    print("[INFO] f1_score={:.4f}%".format(f1_score))
    print("[INFO] AUC={:.4f}%".format(auc))

    ##----------------------------------##
    ##      RETURN METRICS & MODEL      ##
    ##----------------------------------##
    return history, model


In [None]:
vit_classifier = create_vit_classifier(vanilla=False)
history, model = run_experiment(vit_classifier)