# Training and Fine Tuning of ConvNeXtLarge with augmentation layers


# Base imports


In [None]:
# Fix randomness and hide warnings
seed = 90

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd()+'/configs/'

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

import numpy as np

import logging

import random

!pip install tensorflow

In [None]:
# Import tensorflow
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
from tensorflow.keras.applications import ConvNeXtLarge
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.models import Model, Sequential

import tensorflow as tf

from keras.models import Model
from keras.layers import Conv2D, MaxPool2D,  \
    Dropout, Dense, Input, concatenate,      \
    GlobalAveragePooling2D, AveragePooling2D,\
    Flatten
import numpy as np

from tensorflow.keras import layers

tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)
print(tf.__version__)

In [None]:
# Import other libraries
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
import seaborn as sns

# Train

## Load dataset with rotate/flip augmentation

In [None]:
dataset = np.load('public_data/OptimizedDatasets/train_dataset_aug_mix.npz', allow_pickle=True)
keys = dataset.keys()

# Print the keys to see what is inside the dataset
print("Keys in the dataset:", keys)

# Access individual arrays/objects using the keys and print their shapes or values if needed
for key in keys:
    print(f"Shape of {key}: {dataset[key].shape}")


In [None]:
# Standardization of dataset and conversion to int
# WARNING: dictionary makes a mess, you must create a new data structure
def standardization(images):
    min_value = images.min()
    max_value = images.max()

    max_value = max_value - min_value

    images = ((images - min_value)/max_value) * 255
    return images.astype(int)

# images = standardization(dataset['data'])
images = dataset['data']

# Creation of new arrays instead of dict
labels_dict = {'healthy': 0, 'unhealthy': 1}
labels = []   #target values
for i in range(len(dataset['labels'])):
  labels.append(labels_dict[tuple(dataset['labels'])[i]])  #adding targets to new array, using dictionary (using tuple because of the type of np array)
labels = np.array(labels)


In [None]:
# Display a sample of images from the training-validation dataset
num_img = 20
fig, axes = plt.subplots(1, num_img, figsize=(20,20))

# Iterate through the selected number of images
for i in range(num_img):
    # Select a random index
    idx = np.random.randint(0, len(images))

    ax = axes[i % num_img]
    # Display the normalized image using imshow
    ax.imshow(images[idx])
    ax.set_title({labels[idx]})  # Show the corresponding digit label

# Adjust layout and display the images
plt.tight_layout()
plt.show()

In [None]:
# Assessment of dataset balance
print(pd.DataFrame(labels, columns=['label'])['label'].value_counts())

# Classes are unbalanced, healthy elements are many more than unhealthy ones; important to stratify the split

In [None]:
# Split between training-validation and test
X_trainval, X_test, y_trainval, y_test = train_test_split(images, labels, random_state=seed, test_size=0.1, stratify=labels)

# Split between training and validation
X_train, X_val, y_train, y_val = train_test_split(X_trainval, y_trainval, random_state=seed, test_size=0.2, stratify=y_trainval)


In [None]:
# One-hot encoding for the classes

y_train = tfk.utils.to_categorical(y_train)
y_val = tfk.utils.to_categorical(y_val)
y_test = tfk.utils.to_categorical(y_test)
y_trainval = tfk.utils.to_categorical(y_trainval)

In [None]:
# Print the shapes of the sets

print("Training Data Shape:", X_train.shape)
print("Training Label Shape:", y_train.shape)
print("Validation Data Shape:", X_val.shape)
print("Validation Label Shape:", y_val.shape)
print("Test Data Shape:", X_test.shape)
print("Test Label Shape:", y_test.shape)

## Build the baseline model

In [None]:
# Define key model parameters
input_shape = X_train.shape[1:]  # Input shape for the model
output_shape = y_train.shape[1]  # Output shape for the model
batch_size = 128                 # Batch size for training
epochs = 200                     # Number of training epochs

# Print the defined parameters
print("Epochs:", epochs)
print("Batch Size:", batch_size)
print("Input Shape:", input_shape)
print("Output Shape:", output_shape)

## Download and customize the pre-trained model

In [None]:
def build_ConvNeXtLarge(input_shape, output_shape, lr, seed=seed):
    tf.random.set_seed(seed)

    #Load the pre-trained ResNet-50 model (excluding the top classification layer)
    base_model = ConvNeXtLarge(
        weights='imagenet',
        input_shape=input_shape,
        include_top=False,
        )

    # Freeze the weights of the pre-trained layers
    for layer in base_model.layers:
        layer.trainable = False


    # Add custom layers for your specific classification task
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = tfkl.Dropout(0.2)(x)
    x = Dense(256, activation='relu')(x)
    # add a regularisation layer
    x = tfkl.Dropout(0.2)(x)
    x = Dense(128, activation='relu')(x)
    # add a regularisation layer
    x = tfkl.BatchNormalization()(x)


    predictions = Dense(output_shape, activation='softmax')(x)  # Two units for binary classification

    # Create the final model
    model = Model(inputs=base_model.input, outputs=predictions, name = 'ConvNeXtLarge_custom')

    # Compile the model
    model.compile(optimizer=tfk.optimizers.AdamW(learning_rate=lr), loss='categorical_crossentropy', metrics=['accuracy'])

    return model


## Create support data structures


In [None]:

# create an array of 3 possible learning rates

lrs = [0.001, 0.0001, 0.01] # the best was 0.001

# create an array of 2 possible batch sizes, multiple of 2

bss = [64, 32] # the best was 64

# create an array of histories

histories = []

# create a list, each node must have: modelnam, accuracy and val_accuracy

models_scores = []

# create an array of model names

model_names = ["", "", "", "", "", "", "", "", "", "", "", ""] # this was an array of custom model names to save that will be filled later




## Cycle through the support data structures to select the best hyperparameters

In [None]:
# Define the callbacks
callbacks = [
    tfk.callbacks.EarlyStopping(
        monitor="val_accuracy",
        patience=10,
        restore_best_weights=True,
    ),
    tfk.callbacks.ReduceLROnPlateau(
        monitor="val_accuracy",
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1,
    ),
    tfk.callbacks.ModelCheckpoint(
        filepath="ConvNeXtLarge.h5",
        monitor="val_accuracy",
        save_best_only=True,
        verbose=1,
    ),
]

In [None]:
# cycle through the learning rates
i = 0;

for lr in lrs:

    # cycle though the batch sizes

    for bs in bss:

        ConvNeXtLarge_model = build_ConvNeXtLarge(input_shape, output_shape, lr)


        model_names[i] = 'ConvNeXtLarge_lr_' + str(lr)+ '_bs_' + str(bs)

        # Define the callbacks
        callbacks = [
        tfk.callbacks.EarlyStopping(
            monitor="val_accuracy",
            patience=10,
            restore_best_weights=True,
        ),
        tfk.callbacks.ReduceLROnPlateau(
            monitor="val_accuracy",
            factor=0.5,
            patience=5,
            min_lr=1e-6,
            verbose=1,
        ),
        tfk.callbacks.ModelCheckpoint(
            filepath="ConvNeXtLarge.h5",
            monitor="val_accuracy",
            save_best_only=True,
            verbose=1,
        ),
        ]

        # Fit the model
        history = ConvNeXtLarge_model.fit(
            x=X_train,
            y=y_train,
            batch_size=bs,
            epochs=200,
            validation_data=(X_val, y_val),
            callbacks=callbacks
        ).history


         # How many initial epochs to skip in the plot
        begin_plot = 2

        # Find the epoch with the highest validation accuracy
        best_epoch = np.argmax(history['val_accuracy'][begin_plot:])

        # Save the trained model
        ConvNeXtLarge_model.save("All_Colab_Scripts/ConvNeXtLarge/Cycle_Trained/" + model_names[i] + "_val_" + str(history['val_accuracy'][best_epoch+begin_plot]))


        # Plot training and validation performance metrics
        plt.figure(figsize=(20, 5))

        # Plot training and validation loss
        plt.plot(history['loss'][begin_plot:], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
        plt.plot(history['val_loss'][begin_plot:], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
        plt.legend(loc='upper left')
        plt.title('Categorical Crossentropy')
        plt.grid(alpha=0.3)

        plt.figure(figsize=(20, 5))

        # Plot training and validation accuracy, highlighting the best epoch
        plt.plot(history['accuracy'][begin_plot:], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
        plt.plot(history['val_accuracy'][begin_plot:], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
        plt.plot(best_epoch, history['val_accuracy'][best_epoch+begin_plot], marker='*', alpha=0.8, markersize=10, color='#4D61E2')
        plt.legend(loc='upper left')
        plt.title('Accuracy')
        plt.grid(alpha=0.3)

        plt.show()

        # add the model name, accuracy and val_accuracy to the list
        models_scores.append([str(model_names[i]), history['accuracy'][best_epoch+begin_plot], history['val_accuracy'][best_epoch+begin_plot]])

        del history

        del ConvNeXtLarge_model

        i = i + 1

## Plot the results

In [None]:
# create a dataframe with the list
df = pd.DataFrame(models_scores, columns = ['model', 'accuracy', 'val_accuracy'])
print(df)
# plot the dataframe
plt.figure(figsize=(20, 5))
sns.barplot(x='model', y='accuracy', data=df, color='#ff7f0e', alpha=0.8)
sns.barplot(x='model', y='val_accuracy', data=df, color='#4D61E2', alpha=0.8)
plt.title('Accuracy')
plt.grid(alpha=0.3)
plt.xticks(rotation=90)
plt.show()


# Fine Tuning

In [None]:
ConvNeXtLarge_model = tfk.models.load_model('All_Colab_Scripts/ConvNeXtLarge/Cycle_Trained/ConvNeXtLarge_lr_0.001_bs_64_val_0.9717361927032471') # load the best model

name = 'ConvNeXtLarge_FineTuned'

ConvNeXtLarge_model.summary()

In [None]:
# Enable layers
trainable_layers = 15
for i, layer in enumerate(ConvNeXtLarge_model.layers[(len(ConvNeXtLarge_model.layers) - trainable_layers):]):
  layer.trainable=True # da 1 a N sono trainabili
for i, layer in enumerate(ConvNeXtLarge_model.layers[:(len(ConvNeXtLarge_model.layers) - trainable_layers)]):
  layer.trainable=False # da 1 a N non sono trainabili
for i, layer in enumerate(ConvNeXtLarge_model.layers):
   print(i, layer.name, layer.trainable)



# Print the model summary
ConvNeXtLarge_model.summary()

In [None]:
!pip install keras-cv tensorflow --upgrade
import keras_cv
import keras_core as keras
import numpy as np

ConvNeXtLarge_model = tf.keras.Sequential([
  # Add the preprocessing layers you created earlier.
  #layers.Resizing(IMG_SIZE, IMG_SIZE),
  keras_cv.layers.RandomSaturation((0.0, 0.5)),
  layers.RandomTranslation(0.2, 0.2),
  layers.RandomFlip("horizontal_and_vertical"),
  layers.RandomRotation(0.15),
  layers.RandomContrast(0.5),
  layers.RandomBrightness(0.2),
  layers.RandomZoom(.3, .3),
  keras_cv.layers.RandomShear(.1, .1),
  #keras_cv.layers.RandAugment(value_range=(0, 1), augmentations_per_image=3, magnitude=0.3),

  # Rest of the model.
  ConvNeXtLarge_model
])



In [None]:
ConvNeXtLarge_model.compile(optimizer=tfk.optimizers.AdamW(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:

# Define early stopping callbacks
# Define the callbacks
callbacks = [
    tfk.callbacks.EarlyStopping(
        monitor="loss",
        patience=15,
        restore_best_weights=True,
    ),
    tfk.callbacks.ReduceLROnPlateau(
        monitor="loss",
        factor=0.5,
        patience=15,
        min_lr=1e-6,
        verbose=1,
    ),
]

# Fit the model
history = ConvNeXtLarge_model.fit(
    x=X_train,
    y=y_train,
    batch_size=64,
    epochs=100,
    validation_data=(X_val, y_val),
    callbacks=callbacks
).history

# Save the trained model
ConvNeXtLarge_model.save("All_Colab_Scripts/ConvNeXtLarge/Fine_Tuned/Final_" + name)

# How many initial epochs to skip in the plot
begin_plot = 2

# Find the epoch with the highest validation accuracy
best_epoch = np.argmax(history['val_accuracy'][begin_plot:])

# Plot training and validation performance metrics
plt.figure(figsize=(20, 5))

# Plot training and validation loss
plt.plot(history['loss'][begin_plot:], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_loss'][begin_plot:], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
plt.legend(loc='upper left')
plt.title('Categorical Crossentropy')
plt.grid(alpha=0.3)

plt.figure(figsize=(20, 5))

# Plot training and validation accuracy, highlighting the best epoch
plt.plot(history['accuracy'][begin_plot:], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_accuracy'][begin_plot:], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
plt.plot(best_epoch, history['val_accuracy'][best_epoch+begin_plot], marker='*', alpha=0.8, markersize=10, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Accuracy')
plt.grid(alpha=0.3)

plt.show()

del history

del ConvNeXtLarge_model

## Make inference on the test set

In [None]:
name = "All_Colab_Scripts/ConvNeXtLarge/Fine_Tuned/Final_ConvNeXtLarge_fine"

ConvNeXtLarge_model = tfk.models.load_model(name)

# Evaluate the model on the test set
score = ConvNeXtLarge_model.evaluate(X_test, y_test, verbose=0)

# Print test accuracy
print("Test accuracy:", score[1])

# plot the confusion matrix
from sklearn.metrics import confusion_matrix
import itertools

# Predict the values from the validation dataset
Y_pred = ConvNeXtLarge_model.predict(X_test)
# Convert predictions classes to one hot vectors
Y_pred_classes = np.argmax(Y_pred, axis=1)
# Convert validation observations to one hot vectors
Y_true = np.argmax(y_test, axis=1)
# compute the confusion matrix
confusion_mtx = confusion_matrix(Y_true, Y_pred_classes)

# plot the confusion matrix
f,ax = plt.subplots(figsize=(8, 8))
sns.heatmap(confusion_mtx, annot=True, linewidths=0.01,cmap="Greens",linecolor="gray", fmt= '.1f',ax=ax)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()


# Re-Train on validation and train

In [None]:
# Define key model parameters
input_shape = X_trainval.shape[1:]  # Input shape for the model
output_shape = y_trainval.shape[1]  # Output shape for the model
batch_size = 64                 # Batch size for training
epochs = 200                     # Number of training epochs

# Print the defined parameters
print("Epochs:", epochs)
print("Batch Size:", batch_size)
print("Input Shape:", input_shape)
print("Output Shape:", output_shape)

## Train

In [None]:
name = "All_Colab_Scripts/ConvNeXtLarge/Fine_Tuned/Final_ConvNeXtLarge_fine"

ConvNeXtLarge_model = tfk.models.load_model(name)

ConvNeXtLarge_model.summary()

In [None]:

# Keep only the layer corresponding to the model without the augmentation layers

print(str(ConvNeXtLarge_model.layers[8]))

model = Sequential()

model.add(ConvNeXtLarge_model.layers[8])






In [None]:
model.compile(optimizer=tfk.optimizers.AdamW(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

# unfreeze the layers

print(model.layers[0].name)

trainable_layers = 11
for i, layer in enumerate(model.layers[0].layers[(len(model.layers[0].layers) - trainable_layers):]):
  layer.trainable=True # da 1 a N sono trainabili
for i, layer in enumerate(model.layers[0].layers[:(len(model.layers[0].layers) - trainable_layers)]):
  layer.trainable=False # da 1 a N non sono trainabili
for i, layer in enumerate(model.layers[0].layers):
   print(i, layer.name, layer.trainable)

model.layers[0].summary()

In [None]:
# Define the callbacks
callbacks = [
    tfk.callbacks.EarlyStopping(
        monitor="loss",
        patience=10,
        restore_best_weights=True,
    ),
    tfk.callbacks.ReduceLROnPlateau(
        monitor="loss",
        factor=0.1,
        patience=3,
        min_lr=1e-6,
        verbose=1,
    ),
]

# Train the model
history = model.fit(
    X_trainval,
    y_trainval,
    batch_size=64,
    epochs=30,
    callbacks=callbacks,
)

model.save("All_Colab_Scripts/ConvNeXtLarge/Final/FinalModel")


# Final Evaluation

In [None]:
# Evaluate the model on the test set
score = model.evaluate(X_test, y_test, verbose=0)

# Print test accuracy
print("Test accuracy:", score[1])

# plot the confusion matrix
from sklearn.metrics import confusion_matrix
import itertools

# Predict the values from the validation dataset
Y_pred = model.predict(X_test)
# Convert predictions classes to one hot vectors
Y_pred_classes = np.argmax(Y_pred, axis=1)
# Convert validation observations to one hot vectors
Y_true = np.argmax(y_test, axis=1)
# compute the confusion matrix
confusion_mtx = confusion_matrix(Y_true, Y_pred_classes)

# plot the confusion matrix
f,ax = plt.subplots(figsize=(8, 8))
sns.heatmap(confusion_mtx, annot=True, linewidths=0.01,cmap="Greens",linecolor="gray", fmt= '.1f',ax=ax)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()