In [4]:
import math

import os
import glob as gb
import cv2
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from keras.models import load_model
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.applications.resnet50 import ResNet50
from keras.applications.densenet import DenseNet121
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Add, Dropout, Activation,\
               BatchNormalization, Conv2D, MaxPooling2D, GlobalMaxPooling2D,MaxPool2D
from tensorflow.keras.optimizers import SGD
from sklearn.metrics import classification_report, confusion_matrix
from mlxtend.plotting import plot_confusion_matrix                  # pip install mlxtend
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, precision_score, recall_score 
from sklearn.metrics import precision_recall_fscore_support

from keras.callbacks import EarlyStopping, ModelCheckpoint, LearningRateScheduler
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image_dataset_from_directory
from operator import truediv


## Import and prepare the dataset

In [5]:
def image_datagenerator(trainpath, img_height=224, img_width=224, batch_size=20):
    
    train_datagen = ImageDataGenerator(rescale=1./255, validation_split=0.2) # set validation split
    test_datagen = ImageDataGenerator(rescale=1./255)
    
    print("The data is being split into training and validation set")

    train_generator = train_datagen.flow_from_directory(
    trainpath,# This is the target directory
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True,
    subset='training') # set as training data
    print("----------------------------------------------------------------")

    validation_generator = train_datagen.flow_from_directory(
    trainpath, # same directory as training data
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True,
    subset='validation') # set as validation data

    # check the number of images in each class in the training dataset
    No_images_per_class = []
    training_class = []

    for i in os.listdir (trainpath):         #('/content/Coursera-Content/Brain-MRI/Training'):
        Class_name = os.listdir(os.path.join(trainpath, i))
        No_images_per_class.append(len(Class_name))
        training_class.append(i)
        print('Number of images in {} = {} \n'.format(i, len(Class_name)))
        
    
    return train_generator, validation_generator, training_class

### Import the test set

In [6]:
test_datagen = ImageDataGenerator(rescale=1./255)
img_height=224
img_width=224

testpath= ('V:/brain_dataset/test_set/')

test_generator = test_datagen.flow_from_directory(
        testpath,# This is the target directory
        target_size=(img_height, img_width),
        batch_size=1,
        class_mode='categorical',
        shuffle=False, 
        seed=42) 



# check the number of images in each class in the test dataset
No_images_per_class = []
test_class = []

for i in os.listdir (testpath):         #('/content/Coursera-Content/Brain-MRI/Training'):
    Class_name = os.listdir(os.path.join(testpath, i))
    No_images_per_class.append(len(Class_name))
    test_class.append(i)
    print('Number of images in {} = {} \n'.format(i, len(Class_name)))
        
test_classes = test_generator.class_indices
print('test_classes: ',test_classes)

    


Found 39 images belonging to 3 classes.
Number of images in glioma = 19 

Number of images in meningioma = 8 

Number of images in pituitary tumor = 12 

test_classes:  {'glioma': 0, 'meningioma': 1, 'pituitary tumor': 2}


### Transfer Learning 

In [7]:
def ImageNet_model(Dataset_path, granularity,index,Level, Process):     


    folder= granularity[Level]  
    index= index[Level]

    granularity_path= os.path.join(Dataset_path, folder)
    train_generator, validation_generator, training_class = image_datagenerator(granularity_path)


    base_model = DenseNet121(include_top=False, input_shape=(224, 224, 3), weights = 'imagenet')
    x = layers.Flatten()(base_model.output)
    x = layers.Dense(1024, activation='relu')(x)
    predictions = layers.Dense(len(training_class), activation='softmax')(x)

    base_model = Model(inputs=base_model.input, outputs=predictions)
    #base_model.summary()

    finetuned_model, learned_weights, save_here= Training_model(base_model,train_generator,validation_generator, training_class,
                                                         folder, index, Process ) 

    
    return finetuned_model, learned_weights

#### CURVET (WO/CL, W/CD)

In [8]:
def SampleDecomp_model(Dataset_path, granularity,index,Level, Process):      

    folder= granularity[Level]  
    index= index[Level]
    
    granularity_path= os.path.join(Dataset_path, folder)
    train_generator, validation_generator, training_class = image_datagenerator(granularity_path)
    
    ## load the learned weights after training the self-supervised model with sample decomposition using k=5
    model = load_model('.../.../model.h5')
    model.load_weights('.../.../weights.h5', by_name=True) 
   
    
    model = Model(model.input, model.layers[-2].output)
    x = layers.Flatten()(model.output)
    
    new_prediction =layers.Dense(len(train_generator.class_indices),
                             activation='softmax', name="new_task")(x)
    new_model = Model(inputs=model.input, outputs=new_prediction)
    #new_model.summary()
    finetuned_model, best_learnetweights , save_here = Training_model(new_model,train_generator,validation_generator, training_class, 
                                                         folder, index, Process) 
    
    return finetuned_model, best_learnetweights

#### CURVET (W/CLCD)

In [9]:
def pretext_model(Dataset_path, granularity,index,Level, Process):        
    
    
    folder= granularity[Level]  
    index= index[Level]
    
    granularity_path= os.path.join(Dataset_path, folder)
    train_generator, validation_generator, training_class = image_datagenerator(granularity_path)

    ## load the learned weights after training the model with granularity component of k=5
    model = load_model('.../.../model.h5')
    model.load_weights('.../.../weights.h5', by_name=True)  
    
    ## load the learned weights after training the model with granularity component of k=10
    #model = load_model('.../.../model.h5')
    #model.load_weights('.../.../weights.h5', by_name=True) 
    
    model = Model(model.input, model.layers[-2].output)
    x = layers.Flatten()(model.output)
    
    new_prediction =layers.Dense(len(train_generator.class_indices),
                             activation='softmax', name="new_task")(x)
    new_model = Model(inputs=model.input, outputs=new_prediction)
    #new_model.summary()
    finetuned_model, best_learnetweights , save_here = Training_model(new_model,train_generator,validation_generator, training_class, 
                                                         folder, index, Process) 
    
    return finetuned_model, best_learnetweights

In [10]:
def CURVET(granularity, index, Level, Process):
    
    """
    This function manages the training of CURVET model using different levels of granularity
    and applies different strategies based on the specified process. The function iteratively 
    refines the model through both descending and ascending levels of granularity, leveraging data 
    generators for training and validation.

    Parameters:
    granularity (list): A list of granularity levels used for training sorted in descending order.
    index (list): A list of indices corresponding to the granularity levels.
    Level (int): The current granularity level being processed.
    process (str): The type of training process to be executed, determining which model to use.

    Returns:
    None: The function performs training and does not return a value. 
    It saves the trained model and weights to the specified directory during the process.
    """
    
    if Process == 'CURVET (W_CLCD)':
        finetuned_model, learned_weights = pretext_model(Dataset_path, granularity, index, Level, Process)
    elif Process == 'CURVET (WO_CL W_CD)':
        finetuned_model, learned_weights = SampleDecomp_model(Dataset_path, granularity, index, Level, Process)
    elif Process == 'CURVET (WO_CLCD)':
        finetuned_model, learned_weights = ImageNet_model(Dataset_path, granularity, index, Level, Process)
    
    num_iter = 0
    max_iterations = 20

    # Repeat the process for a specified number of iterations
    while num_iter < max_iterations:
        num_iter += 1

        # Descending process
        for i in range(len(granularity) - 1):  # [g_4,...,g_1]
            path_iter = os.path.join(save_to_dir,Process, str(num_iter))
            if not os.path.exists(path_iter):
                os.makedirs(path_iter)

            Level += 1          
            next_level = granularity[Level]
            next_index = index[Level]
            print('Move towards the lower level.... ', next_level, 'next_index=', next_index)
            granularity_path = os.path.join(Dataset_path, next_level)

            train_generator, validation_generator, training_class = image_datagenerator(granularity_path)

            finetuned_model, learned_weights, save_here = TransferLearning(
                finetuned_model, learned_weights, train_generator,
                validation_generator, training_class, next_level,
                next_index, Process=path_iter
            )  

        # Backward process
        beta = Level  # backward from the lowest granularity to the highest granularity
        num_iter+=1
        
        for j in range(len(granularity) - 1):  
            
            path_iter = os.path.join(save_to_dir,Process, str(num_iter))
            if not os.path.exists(path_iter):
                os.makedirs(path_iter)

            beta -= 1    
            backward_level = granularity[beta]
            backward_index = index[beta]
            print('Move towards the higher level....', backward_level, 'backward_index=', backward_index)

            granularity_path = os.path.join(Dataset_path, backward_level)
            train_generator, validation_generator, training_class = image_datagenerator(granularity_path)

            finetuned_model, learned_weights, save_here = TransferLearning(
                finetuned_model, learned_weights, train_generator,
                validation_generator, training_class, backward_level,
                backward_index, Process=path_iter
            )
        
        Level = 0


In [11]:
def TransferLearning(TransfereLearned_Model, learned_weights, train_generator, validation_generator,
                      training_class, next_level, index, Process):
    """
    This function modifies a pre-trained model to adapt it for a new classification task.
    It loads the learned weights from the previous level, replaces the output layer with a new dense layer for the 
    current training class, and trains the model using provided data generators.
    Returns:
    The fine-tuned model, the learned weights, and the path to save the model.
    """
    
    # Load the learned weights into the pre-trained model
    TransfereLearned_Model.load_weights(learned_weights)  # Load weights from the specified path

    # Create a new model that outputs from the second-to-last layer of the pre-trained model
    TransfereLearned_Model = Model(TransfereLearned_Model.input, TransfereLearned_Model.layers[-2].output)

    # Add a new classification output layer for the new task
    new_prediction = layers.Dense(len(training_class), activation='softmax', name="new_task")(TransfereLearned_Model.output)

    TransfereLearned_Model = Model(inputs=TransfereLearned_Model.input, outputs=new_prediction)

    # Train the model using the provided training and validation generators
    finetuned_model, learned_weights, save_here = Training_model(
        TransfereLearned_Model, train_generator, validation_generator,
        training_class, next_level, index, Process
    )

    return finetuned_model, learned_weights, save_here


In [12]:
def Training_model(finetuning_model, train_generator, validation_generator, training_class, folder_name, index, Process):
    """
    This function handles the training process for a fine-tuned model.

    Parameters:
    folder (str): The current folder level for saving weights.
    index (int): The index corresponding to the current training iteration.
    Process (str): A string indicating the type of training process.

    Returns:
    The trained model and the directory for saving outputs.
    """
    
    # Define the directory to save the weights based on the process type
    save_here = os.path.join(save_to_dir, Process)
    
    # Create the directory if it does not exist
    if not os.path.exists(save_here):
        os.makedirs(save_here)
            
    learned_weights = os.path.join(save_here, 'weights_' + folder_name + '.h5')
   
    # Define checkpoint for saving the best model weights
    checkpoint = ModelCheckpoint(filepath=learned_weights,
                                 monitor='val_accuracy', save_best_only=True,
                                 save_weights_only=True, 
                                 mode='max', verbose=1)
    
    # Early stopping callback to prevent overfitting
    earlystop = EarlyStopping(monitor="val_accuracy", patience=8, mode="auto")

    # Learning rate scheduler to adjust learning rate during training
    def lr_scheduler(epoch):
        initial_lrate = 0.001
        drop = 0.85
        epochs_drop = 15.0
        lrate = initial_lrate * math.pow(drop, math.floor((1 + epoch) / epochs_drop))
        return lrate
  
    lrscheduler = LearningRateScheduler(lr_scheduler)
    callbacks = [checkpoint, earlystop, lrscheduler]

    batch_size = 50
    
    # Display the number of training and validation samples
    print(f"Number of training samples: {train_generator.samples}")
    print(f"Number of validation samples: {validation_generator.samples}")

    # Calculate steps per epoch based on the size of training data and batch size
    steps_per_epoch = max(1, train_generator.samples // batch_size)
    validation_steps = max(1, validation_generator.samples // batch_size)

    print(f"Steps per epoch: {steps_per_epoch}")
    print(f"Validation steps: {validation_steps}")
    
    # Compile the model with an optimizer, loss function, and metrics
    finetuning_model.compile(optimizer=SGD(), loss="mse", metrics=["accuracy"])

    # Train the model using the training and validation data generators
    history = finetuning_model.fit(train_generator,
                                   steps_per_epoch=steps_per_epoch,
                                   validation_data=validation_generator,
                                   validation_steps=validation_steps, 
                                   epochs=50,
                                   callbacks=callbacks, 
                                   verbose=1, 
                                   shuffle=True)
    
    finetuning_model.save(os.path.join(save_here, 'model_' + folder_name + '.h5'))
    
    # Get predictions and generate confusion matrix
    y_true, y_predict = model_prediction(finetuning_model, learned_weights, save_here)
    ConfusionMatrix(y_true, y_predict, save_here, folder_name, index)

    return finetuning_model, learned_weights, save_here


#####  Make a prediction on a test data

In [13]:
def model_prediction( model, learned_weights, save_here, batch_size=1):
    x_test , y_test = [] , []
    
    # Iterate over the test generator to collect the test samples and true labels
    for i in range(test_generator.n//1):
        a , b = test_generator.next()
        x_test.extend(a)
        y_test.extend(b)
    y_test= np.array(y_test)       # Convert labels to numpy array for easier processing

    # Predict the output
    STEP_SIZE_TEST=test_generator.n//test_generator.batch_size
    test_generator.reset()


    # # Load the best learned weights from the directory
    model.load_weights(os.path.join(save_here,learned_weights) )       
    print('Make a prediction on a test set:')
    y_test_pred= model.predict(test_generator,steps=STEP_SIZE_TEST,verbose=1)
    y_prediction = np.argmax(y_test_pred, axis=1)## predicted_class_indices

    # ground truth labels
    y_true = np.argmax(y_test, axis=1)

    return y_true, y_prediction

##### Evaluate the model performance

In [14]:
def ConfusionMatrix(y_true, y_predict, save_here, folder_name, index):  
    
    # Refine predictions for levels other than 'k_1' using error-correction
    if folder_name != 'k_1':
        correct_prediction = [i // index for i in y_predict]
        y_predict = np.array(correct_prediction)

    # Calculate confusion matrix
    cm = confusion_matrix(y_true, y_predict)
    
    # Plot confusion matrix
    fig, ax = plot_confusion_matrix(conf_mat=cm, figsize=(6, 6),
                                    colorbar=False,
                                    show_absolute=True,
                                    show_normed=False,
                                    class_names=test_classes, cmap="Blues")
    plt.savefig(os.path.join(save_here, 'ConfusionMatrix_' + folder_name + '.png'))
    plt.show()

    # Classification report
    print(classification_report(y_true, y_predict, target_names=test_classes, digits=4))

    # Print overall accuracy
    print('Overall accuracy= ', accuracy_score(y_true, y_predict))

    # Precision and recall
    tp = np.diag(cm)  # True positives from confusion matrix
    precision = np.nan_to_num(np.diag(cm) / np.sum(cm, axis=0))  # Avoid div/0
    recall = np.nan_to_num(np.diag(cm) / np.sum(cm, axis=1))

    print(f'\nPrecision: {precision}\nRecall: {recall}')

    # Calculate F1 score
    F1_score = 2 * ((precision * recall) / np.clip(precision + recall, a_min=1e-10, a_max=None))  # Avoid div/0
    print('F1_score= ', F1_score)

    # Overall precision, recall, and F1
    PR = np.mean(precision)
    RE = np.mean(recall)
    F1 = 2 * ((PR * RE) / np.clip(PR + RE, a_min=1e-10, a_max=None))
    
    print(f'Overall Precision= {PR}\nOverall Recall= {RE}\nOverall F1_score= {F1}')
    
    # Save results to CSV
    report= {'g':[folder_name], 'ACC':[accuracy_score(y_true, y_predict)], 
             'precision':[PR], 'recall':[RE], 'F1_score':[F1]}             
    
    df = pd.DataFrame(report) 

    df.to_csv(os.path.join(save_here, 'confusion_reports.csv'), mode='a', index=False)


In [15]:
class The_proccess():
  
    def first(self):
        print("Transfer learning with a pretrained-imagenet")
        Level=-1
        ImageNet_model(Dataset_path, G, index, Level, Process= 'pretrained-imagenet')
        
    def second(self):
        print("Training CURVET (WO/CLCD)")
        Level=0
        CURVET (G, index, Level, Process='CURVET (WO_CLCD)')

    def third(self):
        print("Training CURVET (WO/CL, W/CD)")
        Level=0
        CURVET (G, index, Level, Process='CURVET (WO_CL W_CD)')
        
    def forth(self):
        print("Training CURVET (W/CLCD)")
        Level=0
        CURVET (G, index, Level, Process='CURVET (W_CLCD)')
        
        
    def __init__(self):
        self.method = input("Which process do you want to use? \n\n 1) pretrained-imagenet. \n\n 2) CURVET (WO/CLCD). \n\n 3) CURVET (WO/CL, W/CD). \n\n 4) CURVET (W/CLCD) \n\n Please enter the corresponding number and hit enter >>>>> ")

        if self.method == str(1):
            self.first()
        elif self.method == str(2):
            self.second()
        elif self.method == str(3):
            self.third()
        elif self.method == str(4):
            self.forth()   

In [16]:
# Paths to the dataset and the directory where results will be saved

Dataset_path = 'V:/....../brain dataset/'  
save_to_dir = 'C:/....../granularity datasets/' 

# Determine the decomposition granularity by listing all the folders in the dataset path
# Each folder represents a granularity level (e.g., g1, g2, ..., g9_max)
decomposition_granularity = os.listdir(Dataset_path)


# Sort the granularity levels in descending order (from g_max to g_min)
# Example: [g5, g4, ..., g1]
decomposition_granularity.sort(reverse=True)

# Initialize lists to store the granularity levels (G) and their corresponding indices
G = []      
index = []  

# Loop through the sorted granularity levels and assign them to the lists
for i, folder in enumerate(decomposition_granularity, 1):
    G.append(folder)      
    index.append(i)       

# Sort the indices in descending order to maintain the order with G
index.sort(reverse=True)

# Display the sorted granularity levels and corresponding indices
print('Granularity Levels (G):', G)
print('Corresponding Indices (index):', index)


Granularity Levels (G): ['g_5', 'g_4', 'g_3', 'g_2', 'g_1']
Corresponding Indices (index): [5, 4, 3, 2, 1]


In [None]:
CURVETE = The_proccess()