In [1]:
import math

import os
import glob as gb
import cv2
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from keras.models import load_model
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Add, Dropout, Activation,\
               BatchNormalization, Conv2D, MaxPooling2D, GlobalMaxPooling2D,MaxPool2D
from tensorflow.keras.optimizers import SGD
from sklearn.metrics import classification_report, confusion_matrix
from mlxtend.plotting import plot_confusion_matrix                  # pip install mlxtend
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, precision_score, recall_score 
from sklearn.metrics import precision_recall_fscore_support

from keras.callbacks import EarlyStopping, ModelCheckpoint, LearningRateScheduler
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image_dataset_from_directory
from operator import truediv


## Import and prepare the dataset

In [2]:
def image_datagenerator(trainpath, img_height=224, img_width=224, batch_size=20):
    
    train_datagen = ImageDataGenerator(rescale=1./255, validation_split=0.2) # set validation split
    test_datagen = ImageDataGenerator(rescale=1./255)
    
    print("The data is being split into training and validation set")

    train_generator = train_datagen.flow_from_directory(
    trainpath,# This is the target directory
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True,
    subset='training') # set as training data
    print("----------------------------------------------------------------")

    validation_generator = train_datagen.flow_from_directory(
    trainpath, # same directory as training data
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True,
    subset='validation') # set as validation data

    # check the number of images in each class in the training dataset
    No_images_per_class = []
    training_class = []

    for i in os.listdir (trainpath):         
        Class_name = os.listdir(os.path.join(trainpath, i))
        No_images_per_class.append(len(Class_name))
        training_class.append(i)
        print('Number of images in {} = {} \n'.format(i, len(Class_name)))
        
    
    return train_generator, validation_generator, training_class

### Import the test set

In [3]:
# Initialize the ImageDataGenerator to rescale pixel values to the range [0, 1]
test_datagen = ImageDataGenerator(rescale=1./255)

# Set image dimensions for resizing
img_height = 224  # Height of the image after resizing
img_width = 224   # Width of the image after resizing

# Define the path to the test dataset
testpath= ('V:/......../')
test_generator = test_datagen.flow_from_directory(
        testpath,# This is the target directory
        target_size=(img_height, img_width),
        batch_size=1,
        class_mode='categorical',
        shuffle=False, 
        seed=42) 


# check the number of images in each class in the test dataset
No_images_per_class = []
test_class = []

for i in os.listdir (testpath):       
    Class_name = os.listdir(os.path.join(testpath, i))
    No_images_per_class.append(len(Class_name))
    test_class.append(i)
    print('Number of images in {} = {} \n'.format(i, len(Class_name)))
        
test_classes = test_generator.class_indices
print('test_classes: ',test_classes)

    


Found 615 images belonging to 3 classes.
Number of images in glioma = 286 

Number of images in meningioma = 143 

Number of images in pituitary tumor = 186 

test_classes:  {'glioma': 0, 'meningioma': 1, 'pituitary tumor': 2}


In [4]:
"""
This class allows the user to choose and run different training processes for the model.
It includes options for:
1. Pretrained-ImageNet training
2. DEG process training
3. CLOG-CD with Δ=1, Δ=2, and Δ=4 training

The user is prompted to select one of the options, and based on their input, the corresponding
training function is executed. Each function trains the model using different configurations 
and processes, with varying levels and delta values.

"""

class The_proccess():

    def first(self):
        print("Training the dataset based on a pretrained-imagenet")
        Level=-1
        ImageNet_model(Dataset_path, g, index, Level, Process= 'pretrained-imagenet')
        
    def second(self):
        print("Training ASG Model")
        Level=-1
        delta= -1
        ASG(g,index, Level, delta, Process='ASG')
        
    def third(self):
        print("Training DEG Model")
        Level=0
        delta=1
        DEG(g,index, Level, delta, Process='DEG')

    def forth(self):
        print("Training CLOG-CD based on (\u0394=1) process")
        Level=0
        delta=1
        CLOG_CD (g, index, Level, delta, Process='CLOG_CD (\u0394=1)')

    def fifth(self):
        print("Training CLOG-CD based on (\u0394=2) process")
        Level=0
        delta=2
        CLOG_CD (g, index, Level, delta, Process='CLOG_CD (\u0394=2)')
        
    def sixth(self):
        print("Training CLOG-CD based on (\u0394=4) process")
        Level=0
        delta=4
        CLOG_CD (g, index, Level, delta, Process='CLOG_CD (\u0394=4)')
        
   
        
    def __init__(self):
        self.method = input("Which process do you want to use? \n\n 1) Traditional transfer learning. \n\n 2) ASG. \n\n 3) DEG. \n\n 4) CLOG_CD (\u0394=1). \n\n 5) CLOG_CD (\u0394=2). \n\n 6) CLOG_CD (\u0394=4).  \n\n Please enter the corresponding number and hit enter >>>>> ")

        if self.method == str(1):
            self.first()
        elif self.method == str(2):
            self.second()
        elif self.method == str(3):
            self.third()
        elif self.method == str(4):
            self.forth()
        elif self.method == str(5):
            self.fifth()
        elif self.method == str(6):
            self.sixth()


### Traditional transfer learning

In [5]:
def ImageNet_model(Dataset_path, granularity,index,Level, Process):       
    
    
    folder= granularity[Level]  #'k_1'    #lowest_level or folder
    index= index[Level]
    
    granularity_path= os.path.join(Dataset_path, folder)
    train_generator, validation_generator, training_class = image_datagenerator(granularity_path)
    

    # loading ResNet50
    # Loading the pretrained model without the output of the last convolution block 
    base_model = ResNet50(include_top=False, input_shape=(224, 224, 3), weights = 'imagenet')

    # Flatten the output layer to 1 dimension
    x = layers.Flatten()(base_model.output)
    # Add a fully connected layer with 2048 hidden units and ReLU activation
    x = layers.Dense(2048, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    
    # Adding a fully connected layer having len(image_class) neurons which will  give the probability of image 
    predictions = layers.Dense(len(training_class), activation='softmax')(x)

    base_model = Model(inputs=base_model.input, outputs=predictions)
    #base_model.summary()
    
    finetuned_model, best_learnetweights= Training_model(base_model,train_generator,validation_generator, training_class, 
                                                         folder, index, Process )  #, Process='Pretrained_model'
    
    return finetuned_model, best_learnetweights

### ASG Model

In [6]:
"""
This function implements the ASG process which utilises the curriculum learning strategy
from ascending to descending order with one single iteration. 
"""

def ASG(granularity,index,Level, delta, Process):   
    
    
    finetuned_model, best_learnetweights= ImageNet_model(Dataset_path, granularity,index, Level, Process)   
       
    for i in range(len(granularity)-1):       
        
        Level= Level+ delta   
        next_level= granularity[Level] 
        next_index=index[Level]   
        print('level=',Level,'g=',i,'next_level= ',next_level,'next_index=',next_index)
        granularity_path= os.path.join(Dataset_path, next_level)
        train_generator, validation_generator, training_class = image_datagenerator(granularity_path)
        
        finetuned_model, best_learnetweights= TransferLearning(finetuned_model,best_learnetweights, train_generator,
                                                         validation_generator, training_class, 
                                                          next_level, next_index, Process) 

### DEG Model

In [7]:
"""
This function implements the DEG process from descending to ascending order with one single iteration. It takes in a list of granularity levels,
an index, the current level, a delta value, and the process type. The function follows these steps:

1. Fine-tunes the model using a pretrained ImageNet model.
2. Iteratively moves through the granularity levels, adjusting the level and index based on the delta value.
3. For each new level, the function loads the corresponding granularity dataset, applies transfer learning, 
   and updates the model weights.

Returns:
- finetuned_model: The model after transfer learning.
- best_learnetweights: The best model weights found during training.
"""

def DEG(granularity,index,Level, delta, Process):   # granularity=[gmax,g_4,...,g_1], level=0, delta=1, index=5
    
    
    finetuned_model, best_learnetweights= ImageNet_model(Dataset_path, granularity,index, Level, Process)   
       
    for i in range(len(granularity)-1):       
        
        Level= Level+ delta   
        next_level= granularity[Level] 
        next_index=index[Level]   
        print('level=',Level,'g=',i,'next_level= ',next_level,'next_index=',next_index)
        granularity_path= os.path.join(Dataset_path, next_level)
        train_generator, validation_generator, training_class = image_datagenerator(granularity_path)
        
        finetuned_model, best_learnetweights= TransferLearning(finetuned_model,best_learnetweights, train_generator,
                                                         validation_generator, training_class, 
                                                          next_level, next_index, Process) 
        

### CLOG-CD based on different oscillation steps

In [8]:
"""
This function implements the CLOG-CD model over I iterations with different oscillation steps.
Returns:
- finetuned_model: The model after transfer learning.
- best_learnetweights: The best model weights found during training.
"""

def CLOG_CD(granularity,index,Level, delta, Process):
    finetuned_model, best_learnetweights= ImageNet_model(Dataset_path, granularity, index, Level, Process)
    
    num_iter= 0
    # Loop through the granularity levels for 20 iterations.   
    while num_iter < 20:
        
        num_iter+=1
        
        for i in range(len(granularity)-1):        #[g_4,...,g_1]    
            
           # ascending_order= granularity
            path_iter = os.path.join(save_to_dir,Process, str(num_iter))
            if not os.path.exists(path_iter):
                os.makedirs(path_iter)
            
            
            Level= Level+ delta            
            next_level= granularity[Level]  
            next_index=index[Level]        
            print('level=',Level,'g=',i,'next_level= ',next_level,'next_index=',next_index)
            granularity_path= os.path.join(Dataset_path, next_level)
            
            train_generator, validation_generator, training_class = image_datagenerator(granularity_path)
        
            finetuned_model, best_learnetweights= TransferLearning(finetuned_model,best_learnetweights, train_generator,
                                                         validation_generator, training_class, 
                                                          next_level, next_index, Process=path_iter)  
            

            
        # apply the backward direction   
        beta=Level   # backward from mininum granularity to maximum direction
        num_iter+=1
        
        for j in range(len(granularity)-1):        #[g_4,...,g_1]
                
            path_iter = os.path.join(save_to_dir,Process, str(num_iter))
            if not os.path.exists(path_iter):
                os.makedirs(path_iter)
                
                
            beta= beta- delta    #beta=4-1 = 3
            backward_level= granularity[beta]  # granularity[3]=g_2
            backward_index=index[beta]         # index[3] = 2
            print('beta=',beta,'g=',j,'backward_level= ',backward_level,'backward_index=',backward_index)
            
            granularity_path= os.path.join(Dataset_path, backward_level)
            train_generator, validation_generator, training_class = image_datagenerator(granularity_path)
        
            finetuned_model, best_learnetweights= TransferLearning(finetuned_model,best_learnetweights, train_generator,
                                                         validation_generator, training_class, 
                                                          backward_level, backward_index, Process=path_iter) 
            Level=0

### Transfer the learnt weight

In [9]:
def TransferLearning(TransfereLearned_Model, best_learnetweights, train_generator, validation_generator, 
                      training_class, next_level, index,  Process):       
    

    TransfereLearned_Model.load_weights(best_learnetweights)
    
    TransfereLearned_Model = Model(TransfereLearned_Model.input, TransfereLearned_Model.layers[-2].output) #cut off the last layer

              
# Add Dense layer
#adding the new classification output layer corresponding to the new downstream task
    new_prediction =layers.Dense(len(training_class), activation='softmax', name="new_task")(TransfereLearned_Model.output)
    
    
# build the 4S_DT model and visualize it
    TransfereLearned_Model = Model(inputs=TransfereLearned_Model.input, outputs=new_prediction)

    #folder= Level
    finetuned_model, best_learnetweights = Training_model(TransfereLearned_Model,train_generator,validation_generator, 
                                                    training_class, next_level, index, Process)
    
   
    return finetuned_model, best_learnetweights

In [10]:
def Training_model(finetuning_model, train_generator, validation_generator, training_class, folder, index, Process):

    save_here = os.path.join(save_to_dir, Process)
    
    if not os.path.exists(save_here):
            os.makedirs(save_here)
            
    #name=os.path.basename(os.path.normpath(trainpath))
    best_learnetweights = os.path.join(save_here,'weights_'+folder+'.h5')     #creat folders based on the last name
   
    #define checkpoint
    checkpoint = ModelCheckpoint(filepath= best_learnetweights,
                                 monitor='val_accuracy',save_best_only=True,
                                 save_weights_only=True, 
                                 mode='max', verbose=1)      
    #early stopping
    earlystop = EarlyStopping (monitor="val_accuracy", 
                                patience=5,  
                                mode="auto")

    def lr_scheduler(epoch):
        initial_lrate = 0.001
        drop = 0.85
        epochs_drop = 10.0
        lrate = initial_lrate * math.pow(drop, math.floor((1+epoch)/epochs_drop))
        return lrate
  
    lrscheduler = LearningRateScheduler(lr_scheduler)
    callbacks = [checkpoint, earlystop, lrscheduler]

    batch_size=5
    
    # Print the number of samples
    print(f"Number of training samples: {train_generator.samples}")
    print(f"Number of validation samples: {validation_generator.samples}")

    # Calculate steps per epoch
    steps_per_epoch = int(np.ceil(train_generator.samples / batch_size))
    validation_steps = int(np.ceil(validation_generator.samples / batch_size))

    # Print steps per epoch and validation steps
    print(f"Steps per epoch: {steps_per_epoch}")
    print(f"Validation steps: {validation_steps}")
    
    finetuning_model.compile( optimizer=SGD(), loss="mse", metrics=["accuracy"])

    history=finetuning_model.fit (train_generator,
                                    steps_per_epoch=steps_per_epoch,
                                    validation_data=validation_generator,
                                    validation_steps=validation_steps, epochs=50,
                                    callbacks= callbacks, verbose=1, shuffle= True)
    
 
    visualize_results(history, save_here, folder)
    
    #######################################################################################
    y_true, y_predict= model_prediction(finetuning_model, best_learnetweights, save_here)
    
    Evaluation.ConfusionMatrix (y_true, y_predict, save_here, folder, index)
    
    
    return finetuning_model, best_learnetweights

In [11]:
# Utility function for plotting of the model results

def visualize_results(history, save_here,folder_name):
    # Plot the accuracy and loss curves
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    epochs = range(len(acc))

    plt.figure(figsize=(10,4))
    plt.grid()
    
    plt.subplot(1,2,1)
    plt.plot(epochs, acc, 'b', label='Training acc')
    plt.plot(epochs, val_acc, 'r', label='Validation acc')
    plt.legend(loc='lower right')
    plt.title('Training and validation Accuracy')
    plt.legend()

    plt.subplot(1,2,2)
    plt.plot(epochs, loss, 'b', label='Training loss')
    plt.plot(epochs, val_loss, 'r', label='Validation loss')
    plt.legend(loc='upper right')
    plt.title('Training and validation Loss')
 
    # save the figure

    plt.savefig(save_here+'LearningCurve_'+ folder_name+'.png')
    plt.savefig(os.path.join(save_here, 'LearningCurve_'+folder_name+'.png'))

    plt.show()


###  Make a prediction on a test data

In [12]:
def model_prediction( model, best_learnetweights, save_here, batch_size=1):
    x_test , y_test = [] , []
    for i in range(test_generator.n//1):
        a , b = test_generator.next()
        x_test.extend(a) 
        y_test.extend(b)
    y_test= np.array(y_test)
    
    # Predict the output
    STEP_SIZE_TEST=test_generator.n//test_generator.batch_size
    test_generator.reset()
    
    
    # loading the convergence weights 
    model.load_weights(os.path.join(save_here,best_learnetweights) )       
    #make prediction
    print('Make a prediction on a test set:')
    y_test_pred= model.predict(test_generator,steps=STEP_SIZE_TEST,verbose=1)
    y_prediction = np.argmax(y_test_pred, axis=1)## predicted_class_indices
     
    # ground truth labels
    y_true = np.argmax(y_test, axis=1)

    return y_true, y_prediction

### Evaluate the model performance

In [13]:
"""
This class handles the evaluation of a classification model, including generating the confusion matrix,
computing accuracy, precision, recall, and F1 score.

Attributes:
- test_classes: List of class names used in the classification task.
- y_predict: Array of predicted labels.
- y_true: Array of true labels.
- folder_name: Identifier for the current experiment or granularity level.
- index: Correction factor applied to predictions when needed.
- save_to_dir: Directory where evaluation results will be saved.
"""

class Evaluation:
    def __int__(self, test_classes, y_predict, y_true, folder_name, index, save_here):
        self.test_classes= test_classes
        self.y_predict= y_predict
        self.y_true= y_true
        self.folder_name= folder_name
        self.index= index
        self.save_to_dir= save_to_dir
        
    
    
    def ConfusionMatrix (y_true, y_predict,save_here, folder_name, index):       #get confusion matrix
        
        if folder_name !='k_1':        ###  Refine the final classification using error-correction criteria. 
            correct_prediction=[]
            for i in y_predict:
                correct_prediction.append(i // index)            
            y_predict=np.array(correct_prediction)
 

        #get confusion matrix
        cm = confusion_matrix(y_true, y_predict)
        #plot
        fig, ax = plot_confusion_matrix(conf_mat=cm,  figsize=(6, 6),
                                colorbar=False,
                                show_absolute=True,
                                show_normed=False,
                                class_names=test_classes,cmap="Blues")
        # save the figure
        plt.savefig(os.path.join(save_here, 'ConfusionMatrix_'+ folder_name +'.png'))
        plt.show()

        
        #get classification report
        print(classification_report(y_true, y_predict, target_names = test_classes, digits=4))
          
        print('Overall accuracy= ', accuracy_score(y_true, y_predict))
        #
        #
        #precision and recall for each class

        tp = np.diag(cm)
        prec = list(map(truediv, tp, np.sum(cm, axis=0)))
        rec = list(map(truediv, tp, np.sum(cm, axis=1)))
        print ('\nPrecision: {}\nRecall: {}'.format(prec, rec))

        #print precision value of model
        precision = np.diag(cm) / np.sum(cm, axis = 0)

        #print recall value of model
        recall = np.diag(cm) / np.sum(cm, axis = 1)

        F1_score= 2*((precision*recall)/(precision + recall))
        print('F1_score= ', F1_score)

        print('================================================')
        #To get overall measures of precision and recall, use then
        PR=np.mean(precision)
        RE= np.mean(recall)
        print('overall_ Precision= ', PR)
        print('overall_Recall= ', RE)


        F1= 2*((PR* RE)/( PR + RE))
        print('overall F1_score= ', F1)    

In [14]:
# directory where the data is located

Dataset_path= ('V:/CLOG_CD/brain/k_means - Copy/')  #data path
save_to_dir=('V:/CLOG_CD/brain/New folder/')
#Next_level= False

#Determine the decomposition granularity
decomposition_granularity = os.listdir(Dataset_path)    # G=[g1,g2,...,9_max]  
print('The decomposition granularity is: ', decomposition_granularity )


'''
In our work, the CLOG_CD model starts training from the high-level granularity (g=5), then move towards
the lower-level granularity (g=1)

'''

# sort array with descending-ascending order

g=[]
index=[]
decomposition_granularity.sort(reverse=True)


print('The descending-ascending order is: ', decomposition_granularity )

for i, folder in list(enumerate(decomposition_granularity,1)):         # G=[9_max,...,g2,g1]
    print('Index= ',i, '  The granularity decomposition class is: ',folder)
    g.append(folder)     # if max=5 then: G=[9_5, g_4, g_3, g_2, g_1]
    index.append(i)      # if max=5 then:  index=[5, 4, 3, 2, 1]
    
index.sort(reverse=True)
print(index)

The decomposition granularity is:  ['g_1', 'g_2', 'g_3', 'g_4', 'g_5']
The descending-ascending order is:  ['g_5', 'g_4', 'g_3', 'g_2', 'g_1']
Index=  1   The granularity decomposition class is:  g_5
Index=  2   The granularity decomposition class is:  g_4
Index=  3   The granularity decomposition class is:  g_3
Index=  4   The granularity decomposition class is:  g_2
Index=  5   The granularity decomposition class is:  g_1
[5, 4, 3, 2, 1]


In [None]:
CLOG_CD = The_proccess()
