# Imports
installation and imports of all the needed dependeces

In [None]:
pip install keras_cv

In [None]:
import numpy as np

import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
# for splitting training, validation and test set
from sklearn.model_selection import train_test_split
# mathplot lib
from matplotlib import pyplot as plt
from sklearn.utils import class_weight as cw



# contains randAugmentation, cutMix and MixUp
from keras_cv import layers as kcvl


# Load and process dataset

import of the dataset and eventual manipulation

In [None]:
import numpy as np

dataToLoad = '/kaggle/input/public-data-cleaned/public_data_cleaned.npz'
# Conditional check for unzipping and eventual conversion to categorical label
unzip = False
toCat = True

if unzip:
    !unzip public_data.zip

data = np.load(dataToLoad, allow_pickle = True)
X = data['data']
y = data['labels']

# shape of the dataset
print(X.shape)

# shape of the label
print(y.shape)
print(y)

if toCat:
  # Create labels: 0 for 'healthy', 1 for 'unhealthy'
  label_dict = {'healthy': 0, 'unhealthy': 1}
  numerical_labels = [label_dict[label] for label in y]
  labels = y
  # Convert labels to one-hot encoding format
  y = tfk.utils.to_categorical(numerical_labels,2)

print(y)


## Clear data - Only performed Once
script to clear data from the fake images, we noticed that all the fake images had a non integer value, which also makes no sense because images use integers values between 0-255

In [None]:
toDelete = []

for i in range(X.shape[0]):
    found = False
    for m in X[i]:
      if found:
        break;
      fractional, _ = np.modf(m)
      if np.any(fractional != 0):
        toDelete.append(i)
        found = True

print(len(toDelete))
print(toDelete)

Once found the index to delete, we created a dataset without fake data

In [None]:
X = np.delete(X, toDelete, axis=0)
Y = np.delete(labels, toDelete, axis=0)

np.savez("public_data_cleaned", data=X,labels=Y)

## Split data
creates validation, training and test data

In [None]:
# Split data into train_val and test sets
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.1, stratify=np.argmax(y,axis=1))

# Further split train_val into train and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=len(X_test), stratify=np.argmax(y_train_val,axis=1))



# Print shapes of the datasets
print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"X_val shape: {X_val.shape}, y_val shape: {y_val.shape}")
print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")

X_train shape: (4002, 96, 96, 3), y_train shape: (4002, 2)
X_val shape: (501, 96, 96, 3), y_val shape: (501, 2)
X_test shape: (501, 96, 96, 3), y_test shape: (501, 2)


## Label Weights - optional


We tried to implement a different weight to every output node during the training to solve the classes disparities, but the network did not performed better. To add it to the learning is only needed to **class_weight = class_weight_vect** parameter



In [None]:
class_weight_vect = cw.compute_class_weight('balanced',classes=('healthy','unhealthy'), y=labels)
class_weight_vect = {i : class_weight_vect[i] for i in range(2)}
print(class_weight_vect)

# Data Augmentation
> Here we provide the transformations we used during the challenge to augment our dataset.

When using the EfficientNetV2 model the RandAugment transformation was actually implemented as a layer, when we started using ConvNeXtBase the RandAugment layer raised some problems during the submit procedure on codalab, since we were not able to solve them we decided to use it as a pre-train transformation (the parameter did not change)

In [None]:
# decide what transformations to apply to the training set
# each transformation is applied to the original training set
cutMix = True
mixUp = True
randAugment = True

X_augmented = []
y_augmented = []

if cutMix:
  cutmix = kcvl.CutMix(alpha=0.5)
  # apply cutmix to the original training set
  temp = cutmix({"images": X_train, "labels": y_train})
  # insert augmented images into augmented array
  X_augmented = np.concatenate((X_augmented, temp["images"]), axis=0)
  y_augmented = np.concatenate((y_augmented, temp["labels"]), axis=0)
if mixUp:
  mixUp = kcvl.MixUp(0.3)
  # apply mixup to the original training set
  temp = mixUp({"images": X_train, "labels": y_train})
  # insert augmented images into augmented array
  X_augmented = np.concatenate((X_augmented, temp["images"]), axis=0)
  y_augmented = np.concatenate((y_augmented, temp["labels"]), axis=0)
if randAugment:
  # define the rand augment layer
  randAugment = kcvl.RandAugment(
      value_range=(0, 255),
      augmentations_per_image=3,
      magnitude=0.2,
      magnitude_stddev=0.2,
      rate=0.5
    )
  # apply the rand augment layer on the original training set
  temp = randAugment(X_train)
  # insert augmented images into augmented array
  X_augmented = np.concatenate((X_augmented, temp), axis=0)
  y_augmented = np.concatenate((y_augmented, y_train), axis=0) # randAugment preserves the labels so we simply duplicate the original ones

# concatenate the original training set with the augmented images, same for the labels
X_train = np.concatenate((X_train, X_augmented), axis=0)
y_train = np.concatenate((X_train, y_augmented), axis=0)
print(X_train.shape)
print(y_train.shape)

# Model

## Transfer Learning
due the limitation of the dataset and the complexity of creating a good FEN we decided to use transfer learning.

In [None]:
def buildModel(model):
    # download the pre-trained model
    FEN = None
    if model == "effnet":
      FEN = tf.keras.applications.EfficientNetV2S(
        include_top=False,
        weights="imagenet",
        input_shape=(96, 96, 3),
        pooling="avg",
        include_preprocessing=True)
    if model == "convnext":
      FEN = tf.keras.applications.ConvNeXtBase(
        include_top=False,
        weights="imagenet",
        input_shape=(96, 96, 3),
        pooling="avg",
        include_preprocessing=True)
    if FEN == None: return None

    #sequential data augmentation layer, applied in order to ensure more generality due the scarisity of the dataset
    data_augmentation = tf.keras.Sequential([
      tf.keras.layers.RandomFlip("horizontal_and_vertical"),
      tf.keras.layers.RandomRotation(factor=0.35, fill_mode='reflect'),
      tf.keras.layers.RandomZoom(height_factor=-0.2),
      tf.keras.layers.RandomContrast(0.125),
    ], name='data_augmentation')

    # freeze all its weigths of the FEN
    FEN.trainable = False
    # Create an input layer with shape (96, 96, 3)
    inputs = tfk.Input(shape=(96, 96, 3))
    # preprocess input with a sequential data augmentation
    prep= data_augmentation(inputs)
    # Connect FEN to the input
    x = FEN(prep)

    # we used dropout and batch normalization layers
    # in order to generalize as possible the network learning
    x = tfkl.Dropout(0.3)(x)
    x = tfkl.BatchNormalization()(x)

    x = tfkl.Dense(512, activation='relu', kernel_regularizer=tf.keras.regularizers.L1L2(1e-3))(x)
    x = tfkl.Dense(256, activation='relu', kernel_regularizer=tf.keras.regularizers.L1L2(1e-3))(x)

    x = tfkl.Dropout(0.3)(x)
    x = tfkl.BatchNormalization()(x)

    x = tfkl.Dense(128, activation='relu', kernel_regularizer=tf.keras.regularizers.L1L2(1e-3))(x)
    x = tfkl.Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.L1L2(1e-3))(x)
    x = tfkl.Dropout(0.3)(x)
    x = tfkl.Dense(32, activation='relu', kernel_regularizer=tf.keras.regularizers.L1L2(1e-3))(x)

    outputs = tfkl.Dense(2, activation='softmax')(x)

    # Create a Model connecting input and output
    model = tfk.Model(inputs=inputs, outputs=outputs, name='model')

    # Compile the model with Categorical Cross-Entropy loss and Nadam optimizer
    model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Nadam(), metrics=['accuracy'])

    # Display model summary
    model.summary()

    return model

Training the model with the FEN learning locked, as callback we used the early stopping in order to get the best epoch

In [None]:
# Train the model
chosenModel = "convnext" # 'convnext' or 'effnet'
model = buildModel(chosenModel)

patience = 30
batch_size = 1024
epochs = 300

history = model.fit(
    x = X_train, #the preprocessing is already included in the FEN so no operation is needed
    y = y_train,
    batch_size = batch_size,
    epochs = epochs,
    validation_data = (X_val, y_val),
    callbacks = [tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=pat, restore_best_weights=True)]
).history

Gets the best epoch in order to re train the network also with the validation set

In [None]:
# Calculate the average best epoch
best_epoch = len(history['loss']) - pat
print(best_epoch)

Retrain with validation set

In [None]:
final_model = buildModel(chosenModel)

batch_size = 1024

history = final_model.fit(
    x = np.concatenate((X_train, X_val), axis=0),
    y = np.concatenate((y_train, y_val), axis=0),
    batch_size = batch_size,
    epochs = best_epoch,
).history

In [None]:
# save the model
final_model.save("conv13-retrained-preFt-2-14-3.h5")

# Fine Tuning

function used to unlock layer for fine-tuning the network

In [None]:
def unlock_layers(ft_model,N, layer_name):
    # Set all FEN layers as trainable
    ft_model.get_layer(layer_name).trainable = True
    for i, layer in enumerate(ft_model.get_layer(layer_name).layers):
       print(i, layer.name, layer.trainable)

    # Freeze first N layers, e.g., until the 133rd one

    for i, layer in enumerate(ft_model.get_layer(layer_name).layers[:N]):
      layer.trainable=False
    for i, layer in enumerate(ft_model.get_layer(layer_name).layers):
       print(i, layer.name, layer.trainable)
    ft_model.summary()

    return ft_model

In [None]:
# since keras had some problems with convnex during save/load we added a custom object to save our model in .h5
ft_model = tfk.models.load_model('/kaggle/working/conv13-retrained-preFt-2-14-3.h5',custom_objects={'LayerScale': LayerScale})
ft_model.summary()

In [None]:
if chosenModel == "convnext":
  layer_name = "convnext_base"
  layer_number = 189
else
  layer_name = "efficientnetv2-s"
  layer_number = 343

ft_model = unlock_layers(ft_model, layer_number, layer_name)


In [None]:
# Compile the model
ft_model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Nadam(1e-4), metrics='accuracy')

Train the FEN via fine tuning

In [None]:
patience = 30
batch_size = 1024
epochs = 300
# Fine-tune the model
ft_history = ft_model.fit(
    x = X_train,
    y = y_train,
    batch_size = batch_size,
    epochs = epochs,
    validation_data = (X_val, y_val),
    callbacks = [tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=pat, restore_best_weights=True)]
).history

In [None]:
ft_model.save('conv_13_ft-14-3.h5')

Retrain the FEN with both the training set and validation set with the best epoch

In [None]:
# Calculate the average best epoch
best_epoch = len(ft_history['loss']) - patience

if chosenModel == "convnext":
  layer_name = "convnext_base"
  layer_number = 189
else
  layer_name = "efficientnetv2-s"
  layer_number = 343

final_model_ft = tfk.models.load_model('/kaggle/working/conv13-retrained-preFt-2-14-3.h5', custom_objects={'LayerScale': LayerScale})
final_model_ft = unlock_layers(final_model_ft, layer_number, layer_name)
# Compile the model
final_model_ft.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Nadam(1e-4), metrics='accuracy')

batch_size = 1024

history_final_ft = final_model_ft.fit(
    x = X_train_val,
    y = y_train_val,
    batch_size = batch_size,
    epochs = best_epoch,
).history

# View results
Display the plot of the training and compare the different versions, before fine tuning and after

In [None]:
# Plot the network before and after transfer learning to confront them
plt.figure(figsize=(15,5))
plt.plot(history['loss'], alpha=.3, color='#ff7f0e', linestyle='--')
plt.plot(history['val_loss'], label='Re-trained', alpha=.8, color='#ff7f0e')
plt.plot(history_final_ft['loss'], alpha=.3, color='#408537', linestyle='--')
plt.plot(history_final_ft['val_loss'], label='Fine Tuning', alpha=.8, color='#408537')
plt.legend(loc='upper left')
plt.title('Categorical Crossentropy')
plt.grid(alpha=.3)

plt.figure(figsize=(15,5))
plt.plot(history['accuracy'], alpha=.3, color='#ff7f0e', linestyle='--')
plt.plot(history['val_accuracy'], label='Re-trained', alpha=.8, color='#ff7f0e')
plt.plot(history_final_ft['accuracy'], alpha=.3, color='#408537', linestyle='--')
plt.plot(history_final_ft['val_accuracy'], label='Fine Tuning', alpha=.8, color='#408537')
plt.legend(loc='upper left')
plt.title('Accuracy')
plt.grid(alpha=.3)

plt.show()

# Test
Do a prediction on the test set and return the accuracy on such predictions

In [None]:
# Evaluate the model on the test set
test_accuracy = final_model_ft.evaluate(X_test,y_test,verbose=0)[-1]
print('Test set accuracy %.4f' % test_accuracy)

# Save
Save the final model

In [None]:
# Save the best model
final_model_ft.save('conv_ft_268_x')


# Confusion matrix


In [None]:

# Predict labels for the entire test set
predictions = final_model_ft.predict(X_test, verbose=0)

# Display the shape of the predictions
print("Predictions Shape:", predictions.shape)

In [None]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
import seaborn as sns
# Compute the confusion matrix
cm = confusion_matrix(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1))

# Compute classification metrics
accuracy = accuracy_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1))
precision = precision_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), average='macro')
recall = recall_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), average='macro')
f1 = f1_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), average='macro')

# Display the computed metrics
print('Accuracy:', accuracy.round(4))
print('Precision:', precision.round(4))
print('Recall:', recall.round(4))
print('F1:', f1.round(4))

# Plot the confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(cm.T, xticklabels=list(('healty','unhealty')), yticklabels=list(('healty','unhealty')), cmap='Blues', annot=True)
plt.xlabel('True labels')
plt.ylabel('Predicted labels')
plt.show()

# Utilities
Because of some bugs we faced using newer FEN as the convnex, we had to correctly define the LayerScale class in order to upload the model between two training

In [None]:
from keras import layers
from keras import initializers

class LayerScale(layers.Layer):
    """Layer scale module.

    References:
      - https://arxiv.org/abs/2103.17239

    Args:
      init_values (float): Initial value for layer scale. Should be within
        [0, 1].
      projection_dim (int): Projection dimensionality.

    Returns:
      Tensor multiplied to the scale.
    """

    def __init__(self, init_values, projection_dim, **kwargs):
        super().__init__(**kwargs)
        self.init_values = init_values
        self.projection_dim = projection_dim

    def build(self, input_shape):
        self.gamma = self.add_weight(
            shape=(self.projection_dim,),
            initializer=initializers.Constant(self.init_values),
            trainable=True,
        )

    def call(self, x):
        return x * self.gamma

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "init_values": self.init_values,
                "projection_dim": self.projection_dim,
            }
        )
        return config
