# Project Setup


## Import Libraries


In [1]:
import os

import numpy as np

import tensorflow as tf
from tensorflow import keras
from keras import layers

import tensorflow_addons as tfa

from datetime import datetime

#from models.transformer import Patches

from vit_keras import vit

from hyperas import optim
from hyperopt import Trials, tpe

import cv2
from keras_preprocessing.image import ImageDataGenerator

from sklearn.metrics import confusion_matrix

%load_ext tensorboard

In [None]:
## Setup Variables

In [2]:
weight_decay = 0.0001
batch_size = 32
num_epochs = 50
dropout_rate = 0.2
image_size = 256  # We'll resize input images to this size.
patch_size = 16  # Size of the patches to be extracted from the input images.
num_patches = (image_size // patch_size) ** 2  # Size of the data array.
embedding_dim = 256  # Number of hidden units.
num_blocks = 4  # Number of blocks.
input_shape = (256, 256, 3)
num_classes = 2
x_test = []

path = '/Users/alexk/Documents/GitHub/Deep_Learning_for_Binary_Classification_of_Infectious_Keratitis/data'

test_path = "/Users/alexk/Documents/GitHub/Deep_Learning_for_Binary_Classification_of_Infectious_Keratitis/data"

## Tensorboard Setup

In [3]:
# Load the TensorBoard notebook extension
logdir = "/Users/alexk/Documents/GitHub/Deep_Learning_for_Binary_Classification_of_Infectious_Keratitis/Model Logs" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)

# Data Preprocessing

## Setup Training and Validation Datasets

In [4]:
train_ds = tf.keras.utils.image_dataset_from_directory(
    path,
    validation_split=0.2,
    subset='training',
    seed=42,
    batch_size=batch_size,
    shuffle=True,
    label_mode='categorical',
    image_size=(image_size, image_size),
    class_names=['Bacterial', 'Fungal'])

val_ds = keras.preprocessing.image_dataset_from_directory(
    path,
    validation_split=0.2,
    subset='validation',
    seed=42,
    label_mode='categorical',
    shuffle=True,
    batch_size=batch_size,
    image_size=(image_size, image_size),
    class_names=['Bacterial', 'Fungal'])

Found 671 files belonging to 2 classes.
Using 537 files for training.
Metal device set to: Apple M1 Max

systemMemory: 64.00 GB
maxCacheSize: 24.00 GB

Found 671 files belonging to 2 classes.
Using 134 files for validation.


## Generate Test Dataset

In [5]:
def listdir_nohidden(path):
    for f in os.listdir(path):
        if not f.startswith('.'):
            yield f

for folder in listdir_nohidden(test_path):

    sub_path=test_path+"/"+folder

    for img in listdir_nohidden(sub_path):

        image_path=sub_path+"/"+img

        img_arr=cv2.imread(image_path)

        img_arr=cv2.resize(img_arr,(256,256))

        x_test.append(img_arr)


In [6]:
#predicted_classes = np.argmax(mlpmixer_classifier.predict(X, axis = 1))
test_datagen = ImageDataGenerator(rescale = 1./255)

test_x=np.array(x_test)

test_set = test_datagen.flow_from_directory(test_path,
                                            target_size = (256, 256),
                                            batch_size = 32,
                                            class_mode = 'categorical')

test_y=test_set.classes

print(test_y.shape)

Found 671 images belonging to 2 classes.
(671,)


In [7]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(test_x, test_y, test_size=0.2, random_state=42)

### Load Test Images

In [8]:
image = tf.keras.utils.load_img("/Users/alexk/Documents/GitHub/Deep_Learning_for_Binary_Classification_of_Infectious_Keratitis/Test_Images/Fungal_test_image_1.JPG", target_size=(256, 256))
input_arr = tf.keras.utils.img_to_array(image)
input_arr = np.array([input_arr])

## Data Normalization

In [9]:
normalization_layer = tf.keras.layers.Rescaling(1./255)

normalized_ds = train_ds.map(lambda x, y: (normalization_layer(x), y))
image_batch, labels_batch = next(iter(normalized_ds))
first_image = image_batch[0]

2023-04-21 21:42:10.310947: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


## Autotuning

In [10]:
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.prefetch(buffer_size=AUTOTUNE)

## Data Augmentation

In [11]:
with tf.device("/cpu:0"):
    data_augmentation = keras.Sequential(
        [
            layers.Normalization(),
            layers.Resizing(image_size, image_size),
            layers.RandomFlip("horizontal"),
            layers.RandomRotation(factor=0.02),
            layers.RandomZoom(height_factor=0.2, width_factor=0.2),
        ],
        name="data_augmentation",
    )

# Model Setup

## VIT Classification Model

In [12]:
def build_classifier(blocks, positional_encoding=False):
    inputs = layers.Input(shape=input_shape)
    # Augment data.
    inputs = data_augmentation(inputs)
    # Create patches.
    inputs = vit.preprocess_inputs(inputs)
    patches = Patches(patch_size, num_patches)(inputs)
    # Encode patches to generate a [batch_size, num_patches, embedding_dim] tensor.
    x = layers.Dense(units=embedding_dim, input_shape=(256, 256, 3))(patches)
    if positional_encoding:
        positions = tf.range(start=0, limit=num_patches, delta=1)
        position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=embedding_dim
        )(positions)
        x = x + position_embedding
    # Process x using the module blocks.
    x = blocks(x)
    # Apply global average pooling to generate a [batch_size, embedding_dim] representation tensor.
    representation = layers.GlobalAveragePooling1D()(x)
    # Apply dropout.
    representation = layers.Dropout(rate=dropout_rate)(representation)
    # Compute logits outputs.
    logits = layers.Dense(num_classes)(representation)
    # Create the Keras model.
    return keras.Model(inputs=inputs, outputs=logits)

## Run Model Function

In [13]:
def run_experiment(model):
    # Create Adam optimizer with weight decay.
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay,
    )
    # Compile the model.
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.CategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.CategoricalCrossentropy(name="acc"),
            keras.metrics.AUC(name='auc', from_logits=True)
        ]
    )
    # Create a learning rate scheduler callback.
    reduce_lr = keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=5
    )
    # Create an early stopping callback.
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=10, restore_best_weights=True
    )
    # Fit the model.
    history = model.fit(
        train_ds,
        batch_size=batch_size,
        shuffle=True,
        epochs=num_epochs,
        validation_data=val_ds,
        callbacks=[early_stopping, reduce_lr, tensorboard_callback],
    )

    # Return history to plot learning curves.
    return history

## Patches Class

In [14]:
'''
This layer will extract patches of given size from an input image.

It takes two parameters -
1. patch_size : Size of patch that should be extracted.
2. num_patches : Number of patches we want to extract from a given image.

It returns a tensor of shape [batch_size, num_patches, patch_dims] containing
the patches as its elements.

For example if we pass an image with shape [batch_size, height, width , channels]
as input to the layer then it will extract patches of size given by the parameter
patch_size and create a tensor of shape [batch_size, num_patches, patch_size*patch_size*channels].
'''

class Patches(layers.Layer):
    def __init__(self, patch_size, num_patches):
        super(Patches, self).__init__()
        self.patch_size = patch_size
        self.num_patches = num_patches

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, self.num_patches, patch_dims])
        return patches

## MLP Mixer Class

In [15]:
class MLPMixerLayer(layers.Layer):
    def __init__(self, num_patches, hidden_units, dropout_rate, *args, **kwargs):
        super(MLPMixerLayer, self).__init__(*args, **kwargs)

        self.mlp1 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=num_patches),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.mlp2 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=embedding_dim),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.normalize = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize(inputs)
        # Transpose inputs from [num_batches, num_patches, hidden_units] to [num_batches, hidden_units, num_patches].
        x_channels = tf.linalg.matrix_transpose(x)
        # Apply mlp1 on each channel independently.
        mlp1_outputs = self.mlp1(x_channels)
        # Transpose mlp1_outputs from [num_batches, hidden_dim, num_patches] to [num_batches, num_patches, hidden_units].
        mlp1_outputs = tf.linalg.matrix_transpose(mlp1_outputs)
        # Add skip connection.
        x = mlp1_outputs + inputs
        # Apply layer normalization.
        x_patches = self.normalize(x)
        # Apply mlp2 on each patch independently.
        mlp2_outputs = self.mlp2(x_patches)
        # Add skip connection.
        x = x + mlp2_outputs

        return x

# Run the Model

## MLP Mixer Model

In [16]:
mlpmixer_blocks = keras.Sequential(
    [MLPMixerLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.005
mlpmixer_classifier = build_classifier(mlpmixer_blocks)
print(mlpmixer_classifier.summary())
history = run_experiment(mlpmixer_classifier)

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 256, 256, 3)]     0         
                                                                 
 patches (Patches)           (None, 256, 768)          0         
                                                                 
 dense_16 (Dense)            (None, 256, 256)          196864    
                                                                 
 sequential_8 (Sequential)   (None, 256, 256)          1054720   
                                                                 
 global_average_pooling1d (G  (None, 256)              0         
 lobalAveragePooling1D)                                          
                                                                 
 dropout_8 (Dropout)         (None, 256)               0         
                                                             

Layer Patches has arguments ['self', 'patch_size', 'num_patches']
in `__init__` and therefore must override `get_config()`.

Example:

class CustomLayer(keras.layers.Layer):
    def __init__(self, arg1, arg2):
        super().__init__()
        self.arg1 = arg1
        self.arg2 = arg2

    def get_config(self):
        config = super().get_config()
        config.update({
            "arg1": self.arg1,
            "arg2": self.arg2,
        })
        return config


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50


In [None]:
# Evaluate the Model

## TensorBoard Visualization

In [50]:
%tensorboard --logdir logs/scalars


NOTE: Using experimental fast data loading logic. To disable, pass
    "--load_fast=false" and report issues on GitHub. More details:
    https://github.com/tensorflow/tensorboard/issues/4784

Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
TensorBoard 2.8.0 at http://localhost:6006/ (Press CTRL+C to quit)
^C

NOTE: Using experimental fast data loading logic. To disable, pass
    "--load_fast=false" and report issues on GitHub. More details:
    https://github.com/tensorflow/tensorboard/issues/4784

Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
TensorBoard 2.8.0 at http://localhost:6006/ (Press CTRL+C to quit)


## Model Predictions

### MLP Mixer Model Prediction

In [483]:
mlp_predicted_classes = np.argmax(mlpmixer_classifier.predict(test_x), axis=1)

## Confusion Matrices

### MLP Confusion Matrix

In [484]:
mlp_cm = confusion_matrix(test_y, mlp_predicted_classes)
print(mlp_cm)

[[154  77]
 [278 162]]


## Confusion Matrix Computed Values

### MLP Computed Values

In [487]:
mlp_TN = mlp_cm[0][0]
mlp_FN = mlp_cm[0][1]
mlp_FP = mlp_cm[1][0]
mlp_TP = mlp_cm[1][1]
mlp_sensitivity = mlp_TP / (mlp_TP + mlp_FN)
mlp_specificity = mlp_TN / (mlp_TN + mlp_FP)
print(mlp_sensitivity)
print(mlp_specificity)
print((mlp_TP + mlp_TN) / 671)

0.6778242677824268
0.35648148148148145
0.47093889716840537
0.6778242677824268
0.35648148148148145
0.47093889716840537
