# Model

## Import libraries

In [None]:
# Fix randomness and hide warnings
SEED = 42

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['PYTHONHASHSEED'] = str(SEED)
os.environ['MPLCONFIGDIR'] = os.getcwd()+'/configs/'

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

import numpy as np
np.random.seed(SEED)

import logging

import random
random.seed(SEED)

# Import tensorflow
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.random.set_seed(SEED)
tf.compat.v1.set_random_seed(SEED)

# Import other libraries
import matplotlib.pyplot as plt
from keras.applications.convnext import ConvNeXtLarge, preprocess_input as convnext_l_preprocess
from keras.applications.convnext import ConvNeXtXLarge, preprocess_input as convnext_xl_preprocess
from keras.applications.efficientnet_v2 import EfficientNetV2L, preprocess_input as efficientnet_preprocess
from tensorflow.keras.applications.resnet_v2 import preprocess_input as inceptionresnet_preprocess
from tensorflow.keras.applications.xception import Xception, preprocess_input as xception_preprocess
from sklearn.model_selection import train_test_split, KFold
from sklearn.utils import class_weight, shuffle
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
import seaborn as sns
from keras.preprocessing.image import ImageDataGenerator
from tqdm import tqdm
from math import ceil
import keras_tuner as kt
import keras_cv
import os
import shutil

## Set Platform

In [None]:
def setPlatform(platform):
    if platform == "Colab":
        from google.colab import drive
        drive.mount('/gdrive')
        %cd /gdrive/My Drive/AN2DL challenges/challenge1
        return "dataset_clean.npz", "logfiles/"
    elif platform == "Kaggle":
        try:
            # Output directory cleaning
            shutil.rmtree("/kaggle/working")
        except:
            print("Output directory is empty.")
        return "/kaggle/input/dataset/dataset_clean.npz", "/kaggle/working/logfiles/"
    else:
        raise Exception("Error: platform should be Colab or Kaggle.")

# platform : "Colab" | "Kaggle"
PLATFORM = "Kaggle"
DATASET_PATH, LOGFILE_PATH = setPlatform(PLATFORM)

In [None]:
if not os.path.exists(LOGFILE_PATH):
    os.makedirs(LOGFILE_PATH)
else:
    try:
        shutil.rmtree(LOGFILE_PATH)
        os.makedirs(LOGFILE_PATH)
    except OSError as e:
        f"The folder {LOGFILE_PATH} does not exist."

## Load and process the dataset

In [None]:
items = np.load(DATASET_PATH, allow_pickle=True)
leaves = items['data']
labels = items['labels']

print(f'Input shape: {leaves.shape[1:]}\n')

# Calculate the unique target labels and their counts
unique, count = np.unique(labels, return_counts=True)
print('Target labels:', unique)
for u in unique:
    print(f'Class {unique[u]} has {count[u]} samples')

In [None]:
# Convert labels to one-hot encoding format
labels = tfk.utils.to_categorical(labels, 2)

# Random shuffle data
leaves, labels = shuffle(leaves, labels)

# Split data into train_val and test sets
X_train_val, X_test, y_train_val, y_test = train_test_split (
    leaves,
    labels,
    random_state = SEED,
    test_size = 0.1,
    stratify = np.argmax(labels, axis=1)
)

# Further split train_val into train and validation sets
X_train, X_val, y_train, y_val = train_test_split (
    X_train_val,
    y_train_val,
    random_state = SEED,
    test_size = len(X_test),
    stratify = np.argmax(y_train_val, axis=1)
)

# Print shapes of the datasets
print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"X_val shape: {X_val.shape}, y_val shape: {y_val.shape}")
print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")

## Training parameters

In [None]:
BATCH_SIZE = 128
EPOCHS = 1000
DROPOUT_RATE = 0.5
LR = 1e-4

# EarlyStopping patience
ES_PATIENCE = 20

# ReduceLROnPlateau patience
RLROP_PATIENCE = 15

INPUT_SHAPE = X_train.shape[1:]
OUTPUT_SHAPE = y_train.shape[-1]

callbacks = [
    tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=ES_PATIENCE, restore_best_weights=True),
    tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.1, patience=RLROP_PATIENCE, min_lr=1e-6, mode='max')
]

# keras applications supported: ConvNeXtLarge, ConvNeXtXLarge, EfficientNetV2L, Xception, InceptionResNetV2
KERAS_APP = "ConvNeXtLarge"

## Data Augmentation

In [None]:
dataAugmentation = ImageDataGenerator(
    horizontal_flip = True,
    vertical_flip = True,
    rotation_range = 10,
    zoom_range = 0.1,
    fill_mode = "nearest"
)

dataAugmentation.fit(X_train)

## Data Augmentation for Imbalanced Classes

In [None]:
def dataAugImbalanced(X_train, y_train):
    train_class_counts = [
        len(np.where(y_train[:, 0] == 1)[0]),
        len(np.where(y_train[:, 1] == 1)[0])
    ]
    min_class = np.argmin(train_class_counts)
    class_difference = np.max(train_class_counts) - train_class_counts[min_class]

    print(f"class_counts {train_class_counts}\n")

    minority_class_indices = np.where(y_train[:, min_class] == 1)[0]

    oversampled_X = np.empty((0,) + INPUT_SHAPE)
    oversampled_y = np.empty((0, OUTPUT_SHAPE))

    idx = np.random.choice(minority_class_indices, class_difference, False)
    for i in idx:
        augmented_img = dataAugmentation.random_transform(X_train[i], seed=SEED)
        oversampled_X = np.vstack([oversampled_X, augmented_img[np.newaxis, :]])
        oversampled_y = np.vstack([oversampled_y, y_train[i]])

    X_train_oversampled = np.vstack([X_train, oversampled_X])
    y_train_oversampled = np.vstack([y_train, oversampled_y])

    print(f"Classe 0: {len(np.where(y_train_oversampled[:, 0] == 1)[0])}")
    print(f"Classe 1: {len(np.where(y_train_oversampled[:, 1] == 1)[0])}")

    return X_train_oversampled, y_train_oversampled

## Class Weights for Imbalanced Classes

In [None]:
def compute_class_weights(labels, y_train):
    class_weights = class_weight.compute_class_weight(
        class_weight = 'balanced',
        classes = np.unique(labels),
        y = np.argmax(y_train, axis=1)
    )
    class_weights = dict(enumerate(class_weights))
    return class_weights

## Fine Tuning

### Create and return the base model with specified settings

In [None]:
def getBaseModel(preprocessing):
    if KERAS_APP == "ConvNeXtLarge":
        base_model = ConvNeXtLarge(
            include_top = False,
            weights = "imagenet",
            input_shape = INPUT_SHAPE,
            pooling = "avg",
            classifier_activation = "softmax"
        )
        return base_model(
            convnext_l_preprocess(preprocessing)
        )
    
    elif KERAS_APP == "ConvNeXtXLarge":
        base_model = ConvNeXtXLarge(
            include_top = False,
            weights = "imagenet",
            input_shape = INPUT_SHAPE,
            pooling = "avg",
            classifier_activation = "softmax"
        )
        return base_model(
            convnext_xl_preprocess(preprocessing)
        )
    
    elif KERAS_APP == "EfficientNetV2L":
        base_model = EfficientNetV2L(
            include_top = False,
            weights = "imagenet",
            input_shape = INPUT_SHAPE,
            pooling = "avg",
            classifier_activation = "softmax"
        )
        return base_model(
            efficientnet_preprocess(preprocessing)
        )
    
    elif KERAS_APP == "Xception":
        base_model = Xception(
            include_top = False,
            weights = "imagenet",
            input_shape = (299, 299, 3),
            pooling = "avg",
            classifier_activation = "softmax"
        )
        return base_model(
            xception_preprocess(
                tfkl.Resizing(
                    299, 299, interpolation='bicubic', name='resizing'
                )(preprocessing)
            )
        )
    
    elif KERAS_APP == "InceptionResNetV2":
        base_model = tfk.applications.InceptionResNetV2(
            include_top = False,
            weights = "imagenet",
            input_shape = (224, 224, 3),
            pooling = "avg",
            classifier_activation = "softmax"
        )
        return base_model(
            inceptionresnet_preprocess(
                tfkl.Resizing(
                    224, 224, interpolation='bicubic', name='resizing'
                )(preprocessing)
            )
        )
    
    else:
        raise Exception("Error: look for the keras applications supported.")

def getLayerToFreeze():
    if KERAS_APP == "ConvNeXtLarge": 
        return "convnext_large"
    
    elif KERAS_APP == "ConvNeXtXLarge":
        return "convnext_xlarge"
    
    elif KERAS_APP == "EfficientNetV2L":
        return "efficientnetv2-l"
    
    elif KERAS_APP == "Xception":
        return "xception"
    
    elif KERAS_APP == "InceptionResNetV2":
        return "inception_resnet_v2"
        
    else:
        raise Exception("Error: look for the keras applications supported.")

def getPreprocessToUse():
    if KERAS_APP == "ConvNeXtLarge": 
        return convnext_l_preprocess
    
    elif KERAS_APP == "ConvNeXtXLarge":
        return convnext_xl_preprocess
    
    elif KERAS_APP == "EfficientNetV2L":
        return efficientnet_preprocess
    
    elif KERAS_APP == "Xception":
        return xception_preprocess
    
    elif KERAS_APP == "InceptionResNetV2":
        return inceptionresnet_preprocess
        
    else:
        raise Exception("Error: look for the keras applications supported.")

### Hypertuning

In [None]:
def build_model_hypertuning(hp):
    tf.random.set_seed(SEED)

    # Create a Model connecting input and output
    input_layer = tfk.Input(shape=INPUT_SHAPE)
    
    # best Random Flip
    hpRandomFlip = hp.Choice('RandomFlip', values=['horizontal_and_vertical', 'horizontal', 'vertical'])
    # best Random Rotation
    hpRandomRotation = hp.Float('RandomRotation', min_value=0.1, max_value=0.5, step=0.1)
    # best Random Zoom
    hpRandomZoom = hp.Float('RandomZoom', min_value=0.1, max_value=0.3, step=0.1)
    # best Random Translation
    hpRandomTranslation1 = hp.Float('RandomTranslation1', min_value=0.1, max_value=0.4, step=0.1)
    hpRandomTranslation2 = hp.Float('RandomTranslation2', min_value=0.1, max_value=0.4, step=0.1)    

    # Image Augmentation
    preprocessing = tf.keras.Sequential(
        [
            tfkl.RandomFlip(hpRandomFlip),
            tfkl.RandomRotation(hpRandomRotation),
            tfkl.RandomZoom(hpRandomZoom),
            tfkl.RandomTranslation(hpRandomTranslation1, hpRandomTranslation2),        
        ],
        name='preprocessing'
    ) (input_layer)
 
    base_model = getBaseModel(preprocessing)
    
    # best number of units
    hp_units1 = hp.Choice('units-HiddenDense1', values=[128, 256, 512, 1024, 2048])
    hp_units2 = hp.Choice('units-HiddenDense2', values=[128, 256, 512, 1024, 2048])
    
    # best activation
    hp_activation1 = hp.Choice('activation1', values=['swish','elu','relu'])
    hp_activation2 = hp.Choice('activation-HiddenDense2', values=['swish','elu','relu'])
    
    """
    # best dropout
    hp_dp1 = hp.Float('Dropout1', min_value=0.2, max_value=0.7, step=0.1)
    hp_dp2 = hp.Float('Dropout2', min_value=0.2, max_value=0.7, step=0.1)
    """


    x = tfkl.Dense(
        units = hp_units1,
        kernel_initializer = tfk.initializers.HeUniform(seed=SEED),
        name = "HiddenDense1",
    )(base_model)
    # Batch Normalization
    x = tfkl.BatchNormalization()(x)
    x = tfkl.Activation(hp_activation1)(x)
    
    # Dropout
    # x = tfkl.Dropout(DROPOUT_RATE, seed=SEED, name="Dropout1")(x)
    
    
    x = tfkl.Dense(
        units = hp_units2,
        kernel_initializer = tfk.initializers.HeUniform(seed=SEED),
        name = "HiddenDense2",
    )(x)
    # Batch Normalization
    x = tfkl.BatchNormalization()(x)
    x = tfkl.Activation(hp_activation2)(x) 

    # Dropout
    # x = tfkl.Dropout(DROPOUT_RATE, seed=SEED, name="Dropout2")(x)

    outputs = tfkl.Dense(
        2,
        kernel_initializer = tfk.initializers.GlorotUniform(seed=SEED),
        activation = 'softmax'
    )(x)
    
    model = tfk.Model(input_layer, outputs, name='model')
        
    # Freeze the first N layers
    layers = model.get_layer(getLayerToFreeze()).layers
    N = ceil(len(layers) * 0.2)
    for _, layer in enumerate(layers[:N]):
        layer.trainable = False
    
    # best learning rate
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4, 1e-5, 1e-6])

    # Compile the model with Categorical Cross-Entropy loss and Adam optimizer
    model.compile(
        loss = tfk.losses.CategoricalCrossentropy(),
        # Optimizer that implements the AdamW algorithm
        optimizer = tfk.optimizers.AdamW(learning_rate=hp_learning_rate),
        metrics = ['accuracy']
    )
    return model


def start_hypertuning(X_train, y_train, batch_size=BATCH_SIZE, callbacks=callbacks):
    tuner = kt.BayesianOptimization(
        build_model_hypertuning,
        objective = 'val_accuracy',
        max_trials = 10,
        directory = LOGFILE_PATH
    )
    
    tuner.search(
        X_train*255,
        y_train,
        validation_data = (
            X_val*255,
            y_val
        ),
        epochs=50,
        batch_size=batch_size,
        callbacks=callbacks
    )
    best_hp = tuner.get_best_hyperparameters()[0].values
    print(best_hp)
    return best_hp

In [None]:
def build_model():
    tf.random.set_seed(SEED)

    # Create a Model connecting input and output
    input_layer = tfk.Input(shape=INPUT_SHAPE)

    # Image Augmentation through preprocessing layer
    preprocessing = tf.keras.Sequential(
        [
            tfkl.RandomFlip(),
            tfkl.RandomRotation((-0.2, 0.2)),
            tfkl.RandomZoom(0.1),
            tfkl.RandomTranslation(0.2, 0.2)
        ],
        name='preprocessing'
    ) (input_layer)
    
    # RandAugment layer
    # rand_augment = keras_cv.layers.RandAugment(value_range=(0, 1), magnitude=0.2)(preprocessing)
       
    base_model = getBaseModel(preprocessing)

    x = tfkl.Dense(
        units = 256,
        kernel_initializer = tfk.initializers.HeUniform(seed=SEED),
        name = "HiddenDense1",
    )(base_model)
    # Batch Normalization
    x = tfkl.BatchNormalization()(x)
    x = tfkl.Activation('swish')(x)
    
    # Dropout
    # x = tfkl.Dropout(DROPOUT_RATE, seed=SEED, name="Dropout1")(x)
    
    
    x = tfkl.Dense(
        units = 128,
        kernel_initializer = tfk.initializers.HeUniform(seed=SEED),
        name = "HiddenDense2",
    )(x)
    # Batch Normalization
    x = tfkl.BatchNormalization()(x)
    x = tfkl.Activation('swish')(x) 

    # Dropout
    # x = tfkl.Dropout(DROPOUT_RATE, seed=SEED, name="Dropout2")(x)

    outputs = tfkl.Dense(
        2,
        kernel_initializer = tfk.initializers.GlorotUniform(seed=SEED),
        activation='softmax'
    )(x)

    model = tfk.Model(input_layer, outputs, name='model')
        
    # Freeze the first N layers
    layers = model.get_layer(getLayerToFreeze()).layers
    N = ceil(len(layers) * 0.2)
    for _, layer in enumerate(layers[:N]):
        layer.trainable = False
    
    # Compile the model with Categorical Cross-Entropy loss and Adam optimizer
    model.compile(
        loss = tfk.losses.CategoricalCrossentropy(),
        # Optimizer that implements the AdamW algorithm
        optimizer = tfk.optimizers.AdamW(learning_rate=LR),
        metrics = ['accuracy']
    )
    return model

## Model Fitting

In [None]:
def fitModel(X_train, y_train, X_val, y_val, use_datagen, epochs):
    model = build_model()

    validation_data = (X_val*255, y_val) if not X_val is None else None

    class_weights = compute_class_weights(labels, y_train)

    # X_train_oversampled, y_train_oversampled = dataAugImbalanced(X_train, y_train)

    # check whether use ImageDataGenerator or not
    if not use_datagen:
        model.fit(
            x = X_train*255,
            y = y_train,
            validation_data = validation_data,
            epochs = epochs,
            batch_size = BATCH_SIZE,
            class_weight = class_weights,
            callbacks = callbacks
        ).history
        
    else: 
        model.fit_generator(
            dataAugmentation.flow(
                X_train*255,
                y_train,
                batch_size = BATCH_SIZE,
                shuffle = True
            ),
            validation_data = validation_data,
            epochs = epochs,
            class_weight = class_weights,
            steps_per_epoch = len(X_train)/BATCH_SIZE,
            callbacks = callbacks
        ).history    
    return model

## K-Fold Cross-Validation

In [None]:
def KFoldCrossValidation(X_train_val, y_train_val, use_datagen):
    # Define the number of folds for cross-validation
    num_folds = 5

    # Initialize lists to store training histories, scores, and best epochs
    histories = []
    scores = []
    best_epochs = []

    # Create a KFold cross-validation object
    kfold = KFold(n_splits=num_folds, shuffle=True, random_state=SEED)

    # Loop through each fold
    for fold_idx, (train_idx, valid_idx) in enumerate(kfold.split(X_train_val, y_train_val)):
        print(f"Starting training on fold num: {fold_idx+1}")

        # Build a new model for each fold
        k_model = fitModel(X_train_val[train_idx], y_train_val[train_idx], X_train_val[valid_idx], y_train_val[train_idx], use_datagen=False, epochs=EPOCHS)
            
        # Evaluate the model on the validation data for this fold
        score = k_model.evaluate(X_train_val[valid_idx], y_train_val[valid_idx], verbose=0)
        scores.append(score[1])

        # Calculate the best epoch for early stopping
        best_epoch = len(k_model.history['loss']) - ES_PATIENCE
        best_epochs.append(best_epoch)

        # Store the training history for this fold
        histories.append(k_model.history)

    # Calculate the average best epoch
    avg_epochs = int(np.mean(best_epochs))
    print(f"Best average epoch: {avg_epochs}")
    
    return fitModel(X_train_val, y_train_val, None, None, use_datagen=False, epochs=avg_epochs)

In [None]:
hypertuning = False
use_datagen = False

model = fitModel(X_train, y_train, X_val, y_val, use_datagen, epochs=EPOCHS)
# model = KFoldCrossValidation(X_train_val, y_train_val, use_datagen)
# best_hp = start_hypertuning(X_train, y_train); hypertuning = True

In [None]:
if not hypertuning:
    # Save the model
    model.save('model')

In [None]:
if PLATFORM == "Kaggle":
    if not hypertuning:
        os.chdir(r'/kaggle/working')
        !zip -r file.zip /kaggle/working

## Test Time Augmentation

In [None]:
if not hypertuning:
    tta_steps = 10
    predictions = []

    for i in tqdm(range(tta_steps)):
        preds = model.predict_generator(
            dataAugmentation.flow(
                X_test,
                batch_size = BATCH_SIZE,
                shuffle = True
            ),
            steps=ceil(X_test.shape[0])
        )
        predictions.append(preds)

    pred = np.mean(predictions, axis=0)

    print(np.mean(np.equal(np.argmax(y_test, axis=-1), np.argmax(pred, axis=-1))))

## Testing

In [None]:
def predict(model, X_test, y_test):
    preprocess_input = getPreprocessToUse()
    
    # Predict labels for the entire test set
    predictions = model.predict(preprocess_input(X_test*255))

    # Compute classification metrics
    accuracy = accuracy_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1))
    precision = precision_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), average='macro')
    recall = recall_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), average='macro')
    f1 = f1_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), average='macro')

    # Display the computed metrics
    print('Accuracy:', accuracy.round(4))
    print('Precision:', precision.round(4))
    print('Recall:', recall.round(4))
    print('F1:', f1.round(4))

    # Compute the confusion matrix
    cm = confusion_matrix(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), normalize="true")

    # Plot the confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm.T, xticklabels=list(('healthy','unhealthy')), yticklabels=list(('healthy','unhealthy')), cmap='Blues', annot=True)
    plt.xlabel('True labels')
    plt.ylabel('Predicted labels')
    plt.show()

In [None]:
if not hypertuning:
    predict(model, X_test, y_test)