**LINK GOOGLE DRIVE**

In [None]:
from google.colab import drive
drive.mount('/gdrive')

In [None]:
%cd /gdrive/My Drive/Colab Notebook

**IMPORT LIBRARIES AND INITIALIZATION RANDOM SEED**

In [None]:
import tensorflow as tf
import numpy as np
import os
import random
import pandas as pd
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import confusion_matrix
from PIL import Image
from tensorflow.keras.preprocessing.image import ImageDataGenerator

tfk = tf.keras
tfkl = tf.keras.layers
print(tf.__version__)

In [None]:
# Random seed for reproducibility
seed = 42

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

**ConvNextLarge model uses tensorflow version 2.10**

In [None]:
!pip uninstall tensorflow
!pip install tensorflow==2.10

**IMPORTING DATASET AND SPLITTING DATA**

In [None]:
#We use the libraries splitfolders to generate new folders with a random splitting of the data in train and validation part
!pip install split-folders
import splitfolders

!unzip training_dataset_homework1.zip
dataset_dir = 'training_data_final'
labels = ['Species1',       
          'Species2',          
          'Species3',    
          'Species4',             
          'Species5',          
          'Species6',         
          'Species7',   
          'Species8']

splitfolders.ratio('training_data_final', output="train_data_final_80_20", seed=42, ratio=(0.80, 0.20, 0.0), group_prefix=None, move=False) 

**DEFINITION OF CLASS AND FUNCTIONS FOR MIXUP AND CUTOUT IMPLEMENTATION**

- *Class defintion for MixUp Implementation*

In [None]:
class MixupImageDataGenerator():
    def __init__(self, generator, directory, batch_size, img_height, img_width, alpha=0.1):
        """Constructor for mixup image data generator.
        Arguments:
            generator {object} -- An instance of Keras ImageDataGenerator.
            directory {str} -- Image directory.
            batch_size {int} -- Batch size.
            img_height {int} -- Image height in pixels.
            img_width {int} -- Image width in pixels.
        Keyword Arguments:
            alpha {float} -- Mixup beta distribution alpha parameter. (default: {0.2})
            subset {str} -- 'training' or 'validation' if validation_split is specified in
            `generator` (ImageDataGenerator).(default: {None})
        """

        self.batch_index = 0
        self.batch_size = batch_size
        self.alpha = alpha

        # First iterator yielding tuples of (x, y)
        self.generator1 = generator.flow_from_directory(directory,
                                                        target_size=(
                                                            img_height, img_width),
                                                        class_mode="categorical",
                                                        batch_size=batch_size,
                                                        shuffle=True)

        # Second iterator yielding tuples of (x, y)
        self.generator2 = generator.flow_from_directory(directory,
                                                        target_size=(
                                                            img_height, img_width),
                                                        class_mode="categorical",
                                                        batch_size=batch_size,
                                                        shuffle=True)

        # Number of images across all classes in image directory.
        self.n = self.generator1.samples

    def reset_index(self):
        """Reset the generator indexes array.
        """

        self.generator1._set_index_array()
        self.generator2._set_index_array()

    def on_epoch_end(self):
        self.reset_index()

    def reset(self):
        self.batch_index = 0

    def __len__(self):
        # round up
        return (self.n + self.batch_size - 1) // self.batch_size

    def get_steps_per_epoch(self):
        """Get number of steps per epoch based on batch size and
        number of images.
        Returns:
            int -- steps per epoch.
        """

        return self.n // self.batch_size

    def __next__(self):
        """Get next batch input/output pair.
        Returns:
            tuple -- batch of input/output pair, (inputs, outputs).
        """

        if self.batch_index == 0:
            self.reset_index()

        current_index = (self.batch_index * self.batch_size) % self.n
        if self.n > current_index + self.batch_size:
            self.batch_index += 1
        else:
            self.batch_index = 0

        # Get a pair of inputs and outputs from two iterators.
        X1, y1 = self.generator1.next()
        X2, y2 = self.generator2.next()

        # random sample the lambda value from beta distribution.
        l = np.random.beta(self.alpha, self.alpha, X1.shape[0])

        X_l = l.reshape(X1.shape[0], 1, 1, 1)
        y_l = l.reshape(X1.shape[0], 1)

        # Perform the mixup.
        X = X1 * X_l + X2 * (1 - X_l)
        y = y1 * y_l + y2 * (1 - y_l)
        return X, y

    def __iter__(self):
        while True:
            yield next(self)

- *Function definition for CutOut implementation*

In [None]:
def cut_out(p=0.8, s_l=0.02, s_h=0.4, r_1=0.3, r_2=1/0.3, v_l=0, v_h=255, pixel_level=False):
    def eraser(input_img):
        img_h, img_w, img_c = input_img.shape
        p_1 = np.random.rand()

        if p_1 > p:
            return input_img

        while True:
            s = np.random.uniform(s_l, s_h) * img_h * img_w
            r = np.random.uniform(r_1, r_2)
            w = int(np.sqrt(s / r))
            h = int(np.sqrt(s * r))
            left = np.random.randint(0, img_w)
            top = np.random.randint(0, img_h)

            if left + w <= img_w and top + h <= img_h:
                break

        if pixel_level:
            c = np.random.uniform(v_l, v_h, (h, w, img_c))
        else:
            c = np.random.uniform(v_l, v_h)

        input_img[top:top + h, left:left + w, :] = c

        return input_img

    return eraser

**PRE-PROCESSING OF THE DATA AND AUGMENTATION**


> *Initialization of different data generators*



In [None]:
#Preprocess_input is usefull to preprocess the data so we'll have pixel in range of values [0,1]
from tensorflow.keras.applications.convnext import preprocess_input

#This function is usefull to do cutout augmentation
def custom_function(input_image):
    input_image = preprocess_input(input_image)
    return cut_out(v_l=0, v_h=1)(input_image)

#Validation Generator
val_data_gen = ImageDataGenerator(preprocessing_function = preprocess_input)

#Train Generator with classical Augmentation techniques
aug_train_data_gen = ImageDataGenerator(
                                        height_shift_range=10,
                                        width_shift_range=10,
                                        horizontal_flip=True,
                                        vertical_flip=True, 
                                        fill_mode='nearest',
                                        preprocessing_function = preprocess_input)

#Train Generator with CutOut
aug_train_data_gen_cut_out = ImageDataGenerator(
                                        height_shift_range=10,
                                        width_shift_range=10,
                                        horizontal_flip=True,
                                        vertical_flip=True, 
                                        fill_mode='nearest',
                                        preprocessing_function = custom_function) 

#Train Generator with MixUp
train_gen_mixup = MixupImageDataGenerator(generator=aug_train_data_gen,
                                    directory='train_data_final_80_20/train',
                                    batch_size=8,
                                    img_height=96,
                                    img_width=96)                                  


train_gen = aug_train_data_gen.flow_from_directory(directory='train_data_final_80_20/train',
                                               target_size=(96,96),
                                               color_mode='rgb',
                                               classes=labels, # can be set to labels
                                               class_mode='categorical',
                                               batch_size=8,
                                               shuffle=True) 

train_gen_cut_out = aug_train_data_gen_cut_out.flow_from_directory(directory='train_data_final_80_20/train',
                                               target_size=(96,96),
                                               color_mode='rgb',
                                               classes=labels, # can be set to labels
                                               class_mode='categorical',
                                               batch_size=8,
                                               shuffle=True)                                             

val_gen = val_data_gen.flow_from_directory(directory='train_data_final_80_20/val',
                                               target_size=(96,96),
                                               color_mode='rgb',
                                               classes=labels, # can be set to labels
                                               class_mode='categorical',
                                               batch_size=8,
                                               shuffle=False)

**INITIALIZATION MODEL**

In [None]:
#Useful for implementation of Quasi-SVM
from tensorflow.keras.layers.experimental import RandomFourierFeatures

input_shape = (96, 96, 3)
epochs = 200

#Load ConvNextLarge Model from Keras
supernet = tfk.applications.convnext.ConvNeXtLarge(
    include_top=False,
    weights="imagenet",
    input_shape=(96,96,3)
)

# Use the supernet as feature extractor
supernet.trainable = False
elastic_lambda = 1e-5

**TRAINING TWO DIFFERENT MODELS**

- *First Model: Implementation QUASI-SVM classier and training + Fine tuning with CutOut*

In [None]:
inputs = tfk.Input(shape=(96,96,3))
x = tfkl.Resizing(96, 96, interpolation="bicubic")(inputs)
x = supernet(x)
x = tfkl.Flatten(name='Flattening')(x)
x = tfkl.Dense(
  512, 
  activation='relu',
  kernel_initializer = tfk.initializers.HeUniform(seed),
  kernel_regularizer=tf.keras.regularizers.L1(elastic_lambda))(x)
x = tfkl.Dense(
  256, 
  activation='relu',
  kernel_initializer = tfk.initializers.HeUniform(seed),
  kernel_regularizer=tf.keras.regularizers.L1(elastic_lambda))(x)
x = tfkl.Dense(
  256, 
  activation='relu',
  kernel_initializer = tfk.initializers.HeUniform(seed),
  kernel_regularizer=tf.keras.regularizers.L1(elastic_lambda))(x)
x = tfkl.Dense(
  256, 
  activation='relu',
  kernel_initializer = tfk.initializers.HeUniform(seed),
  kernel_regularizer=tf.keras.regularizers.L1(elastic_lambda))(x)
x = tfkl.Dense(
  128, 
  activation='relu',
  kernel_initializer = tfk.initializers.HeUniform(seed),
  kernel_regularizer=tf.keras.regularizers.L1(elastic_lambda))(x)
x = RandomFourierFeatures(
            output_dim=4096, scale=10.0, kernel_initializer="gaussian"
        )(x)
outputs = tfkl.Dense(
    units=8)(x)

# Connect input and output through the Model class
tl_model = tfk.Model(inputs=inputs, outputs=outputs, name='model')

# Compile the model
tl_model.compile(loss=tfk.losses.hinge, optimizer=tfk.optimizers.Adam(1e-4), metrics='accuracy')
tl_model.summary()

In [None]:
#Implementation of callbacks with EarlyStopping
callbacks_a = [tfk.callbacks.EarlyStopping(monitor='val_loss', mode='auto', patience=15, restore_best_weights=True)]
rlr_callback = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', 
                                                        factor=0.1, 
                                                        patience=10, 
                                                        verbose=1, 
                                                        mode='auto', 
                                                        min_delta=0.000001)
callbacks_a.append(rlr_callback)

In [None]:
# Define the weights to give more relevance to species less supported in the unbalanced dataset 
from collections import Counter
counter = Counter(train_gen.classes)
max_val = float(max(counter.values()))
class_weights = {class_id : max_val/n_images for class_id, n_images in counter.items()}

#Train the model with classical Augmentated train data
t1_history = tl_model.fit(
    x = train_gen,
    epochs = 200,
    class_weight = class_weights,
    validation_data = val_gen,
    callbacks = callbacks_a
).history

In [None]:
tl_model.save("convNextModel_first_model_tl")

In [None]:
ft_model = tfk.models.load_model('convNextModel_first_model_tl')
ft_model.summary()

In [None]:
#Freezing only the first 250 layers so the others now are trainable 
ft_model.get_layer('convnext_large').trainable = True
for i, layer in enumerate(ft_model.get_layer('convnext_large').layers):
   print(i, layer.name, layer.trainable)

for i, layer in enumerate(ft_model.get_layer('convnext_large').layers[:250]):
  layer.trainable=False
for i, layer in enumerate(ft_model.get_layer('convnext_large').layers):
   print(i, layer.name, layer.trainable)
ft_model.summary()

In [None]:
ft_model.compile(loss=tfk.losses.hinge, optimizer=tfk.optimizers.Adam(1e-5), metrics='accuracy')

#Fine tuning training with CutOut Augmentation
t1_history = ft_model.fit(
    x = train_gen_cut_out,
    epochs = 200,
    class_weight = class_weights,
    validation_data = val_gen,
    callbacks = callbacks_a
).history

In [None]:
tl_model.save("convNextModel_first_model_cutout_ft")
#END FIRST MODEL

- *Second Model: Implementation Dense Layers with SoftMax and training + Fine tuning with MixUp*

In [None]:
inputs = tfk.Input(shape=(96,96,3))
x = tfkl.Resizing(96, 96, interpolation="bicubic")(inputs)
x = supernet(x)
x = tfkl.Flatten(name='Flattening')(x)
x = Dropout(0.2, seed=seed)(x)
x = tfkl.Dense(
  512, 
  activation='relu',
  kernel_initializer = tfk.initializers.HeUniform(seed),
  kernel_regularizer=tf.keras.regularizers.L1(elastic_lambda))(x)
x = tfkl.Dense(
  512, 
  activation='relu',
  kernel_initializer = tfk.initializers.HeUniform(seed),
  kernel_regularizer=tf.keras.regularizers.L1(elastic_lambda))(x)
x = GaussianNoise(0.01)(x)
x = tfkl.Dense(
  512, 
  activation='relu',
  kernel_initializer = tfk.initializers.HeUniform(seed),
  kernel_regularizer=tf.keras.regularizers.L1(elastic_lambda))(x)
outputs = tfkl.Dense(
  8, 
  activation='softmax',
  kernel_initializer = tfk.initializers.GlorotUniform(seed),
  kernel_regularizer=tf.keras.regularizers.L1(elastic_lambda))(x)

# Connect input and output through the Model class
tl_model = tfk.Model(inputs=inputs, outputs=outputs, name='model')

# Compile the model
tl_model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(1e-4), metrics='accuracy')
tl_model.summary()

In [None]:
callbacks_a = [tfk.callbacks.EarlyStopping(monitor='val_loss', mode='auto', patience=15, restore_best_weights=True)]
rlr_callback = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', 
                                                        factor=0.1, 
                                                        patience=10, 
                                                        verbose=1, 
                                                        mode='auto', 
                                                        min_delta=0.000001)
callbacks_a.append(rlr_callback)

In [None]:
# Define the weights to give more relevance to species less supported in the unbalanced dataset
from collections import Counter
counter = Counter(train_gen.classes)
max_val = float(max(counter.values()))
class_weights = {class_id : max_val/n_images for class_id, n_images in counter.items()}

#Train the model with classical Augmentated train data
t1_history = tl_model.fit(
    x = train_gen,
    epochs = 200,
    class_weight = class_weights,
    validation_data = val_gen,
    callbacks = callbacks_a)
.history

In [None]:
tl_model.save("convNextModel_second_model_tl")

In [None]:
ft_model = tfk.models.load_model('convNextModel_second_model_tl')
ft_model.summary()

In [None]:
#Freezing only the first 240 layers so the others now are trainable 
ft_model.get_layer('convnext_large').trainable = True
for i, layer in enumerate(ft_model.get_layer('convnext_large').layers):
   print(i, layer.name, layer.trainable)

for i, layer in enumerate(ft_model.get_layer('convnext_large').layers[:240]):
  layer.trainable=False
for i, layer in enumerate(ft_model.get_layer('convnext_large').layers):
   print(i, layer.name, layer.trainable)
ft_model.summary()

In [None]:
ft_model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(1e-5), metrics='accuracy')

from collections import Counter
counter = Counter(train_gen.classes)
max_val = float(max(counter.values()))
class_weights = {class_id : max_val/n_images for class_id, n_images in counter.items()}

#Train the model with MixUp Augmentation
ft_history = ft_model.fit(
    x = train_gen_mixup,
    epochs = 200,
    steps_per_epoch = train_gen_mixup.get_steps_per_epoch(),
    class_weight = class_weights,
    validation_data = val_gen,
    callbacks = callbacks_a
).history

In [None]:
tl_model.save("convNextModel_second_model_mixup_ft")
#END SECOND MODEL

**BUILD A NEW MODEL WITH ENSEMBLE TECHNIQUES**

In [None]:
#Load two models previously trained
models=[]

ft_model1 = tfk.models.load_model('convNextModel_first_model_cutout_ft')
ft_model1._name="model_cutout_ft"
ft_model1.summary()
models.append(ft_model1)

ft_model2 = tfk.models.load_model('convNextModel_second_model_mixup_ft')
ft_model2._name="model_mixup_ft"
ft_model2.summary()
models.append(ft_model2)

In [None]:
inputs = tfk.Input(shape=(96,96,3))

#Define the function for ensembling the models previously loaded
def ensembleModels(models, model_input):
    # collect outputs of models in a list
    yModels=[model(model_input) for model in models] 
    # averaging outputs
    yAvg=tfk.layers.average(yModels) 
    # build model from same input and avg output
    modelEns = tfk.Model(inputs=model_input, outputs=yAvg, name='ensemble')  
   
    return modelEns

modelEns = ensembleModels(models, inputs)

#Evaluation of the new model in the validation set to check if it has better performance than the previous ones
modelEns.compile(loss=tfk.losses.hinge, optimizer=tfk.optimizers.Adam(1e-5), metrics='accuracy')
modelEns.evaluate(val_gen, verbose=1)

In [None]:
#Save ensemble model
modelEns.save("convNextModel_ensemble")