Makhloughi

In [None]:
import os
import numpy as np
import math
import random
from collections import Counter
from google.colab import files
from PIL import Image
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import KFold
from tensorflow.keras.models import Sequential, Model, clone_model
from keras.optimizers import Adam
from tensorflow.keras.layers import Conv2D, Input, ConvLSTM2D,MaxPooling3D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, Activation, GlobalAveragePooling2D, MaxPool2D
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, Callback, LearningRateScheduler
from tensorflow.keras.regularizers import l2
from tensorflow.keras.applications import VGG16, VGG19, ResNet50, InceptionV3, DenseNet121, EfficientNetB2
from tensorflow.keras.layers import LSTM, GRU, TimeDistributed, Input
from tensorflow.keras.models import load_model
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
import shutil

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Occipital

In [None]:
data_dir = '/content/drive/MyDrive/Final_1Hz/lobes/Occipital/'


def extract_numeric_part(folder_name):
    return int(folder_name.split('_')[-1])

folders = sorted([f for f in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, f))], key=extract_numeric_part)

subject_folders, labels = [], []

for folder_name in folders:
    subject_num = int(folder_name.split('_')[-1])
    subject_folder = os.path.join(data_dir, folder_name)
    subject_folders.append(subject_folder)

    if subject_num <= 36:
        labels.append(0)  # AD group
    elif 37 <= subject_num <= 65:
        labels.append(1)  # HC group

subject_folders = np.array(subject_folders)
labels = np.array(labels)


In [None]:
def load_images_and_labels(subjects, labels):
    images, labels_t = [], []
    for subject_folder, label in zip(subjects, labels):
        for filename in os.listdir(subject_folder):
            img_path = os.path.join(subject_folder, filename.decode())
            try:
                img = Image.open(img_path)

                width, height = img.size
                left = 99
                top = 50
                right = width - 176
                bottom = height - 73
                img = img.crop((left, top, right, bottom))

                img = img.resize((224, 224))
                img_array = img_to_array(img)
                img_array = img_array / 255.0

                images.append(img_array)
                labels_t.append(label)
            except Exception as e:
                print(f"Error loading image: {img_path}, {e}")

    return np.array(images), np.array(labels_t)


In [None]:
def load_and_preprocess_data(subject_folders, labels):
    train_subjects, test_subjects, train_labels, test_labels = train_test_split(subject_folders, labels, test_size=0.2, stratify=labels)

    train_subjects, val_subjects, train_labels, val_labels = train_test_split(train_subjects, train_labels, test_size=0.1, stratify=train_labels)

    train_images, train_labels_t = load_images_and_labels(train_subjects, train_labels)
    val_images, val_labels_t = load_images_and_labels(val_subjects, val_labels)

    test_image_counts_per_subject = [len(os.listdir(folder)) for folder in test_subjects]

    test_images, test_labels_t = load_images_and_labels(test_subjects, test_labels)

    return train_images, train_labels_t, val_images, val_labels_t, test_images, test_labels_t, test_subjects, test_image_counts_per_subject


In [None]:
def create_new_model(base_model):
    model = clone_model(base_model)

    for layer in model.layers:
        layer.trainable = False

    x = model.output
    x = Flatten()(x)
    x = Dense(128)(x)
    x = Dropout(0.2)(x)
    x = Dense(256)(x)
    x = Dropout(0.5)(x)
    x = Dense(512)(x)
    x = Dropout(0.2)(x)

    predictions = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=model.input, outputs=predictions)
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.01), metrics=['accuracy'])

    return model

In [None]:
# Define a custom callback to start saving checkpoints from epoch 30
class CustomCheckpoint(ModelCheckpoint):
    def __init__(self, start_epoch, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_epoch = start_epoch

    def on_epoch_end(self, epoch, logs=None):
        if epoch >= self.start_epoch:
            super().on_epoch_end(epoch, logs)

def train_and_evaluate_model(base_model, subject_folders, labels, num_epochs=50, num_iterations=5, model_name='model'):
    accuracy_list, precision_list, recall_list, f1_list = [], [], [], []

    for iteration in range(num_iterations):
        print(f"\nIteration {iteration + 1}:")

        model = create_new_model(base_model)

        train_images, train_labels_t, val_images, val_labels_t, test_images, test_labels_t, test_subjects, test_image_counts_per_subject = load_and_preprocess_data(subject_folders, labels)

        train_data = list(zip(train_images, train_labels_t))
        np.random.shuffle(train_data)
        train_images, train_labels_t = zip(*train_data)

        val_data = list(zip(val_images, val_labels_t))
        np.random.shuffle(val_data)
        val_images, val_labels_t = zip(*val_data)

        train_images, train_labels_t = np.array(train_images), np.array(train_labels_t)
        val_images, val_labels_t = np.array(val_images), np.array(val_labels_t)

        checkpoint_dir = '/content'
        os.makedirs(checkpoint_dir, exist_ok=True)

        local_checkpoint_filepath = os.path.join(checkpoint_dir, f"{model_name}_best_weights_iteration_{iteration + 1}.h5")
        drive_checkpoint_filepath = f'/content/drive/My Drive/{model_name}_best_weights_iteration_{iteration + 1}.h5'

        checkpoint_callback = CustomCheckpoint(
            start_epoch=30,
            filepath=local_checkpoint_filepath,
            monitor='val_accuracy',
            mode='max',
            save_best_only=True,
            save_weights_only=True,
            verbose=1
        )

        class LearningRateLogger(Callback):
            def on_epoch_end(self, epoch, logs=None):
                logs = logs or {}
                logs['learning_rate'] = self.model.optimizer.learning_rate.numpy()
        lr_logger = LearningRateLogger()

        early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min', baseline=None, restore_best_weights=False, start_from_epoch=30)

        history = model.fit(train_images, train_labels_t, epochs=num_epochs, batch_size=32, validation_data=(val_images, val_labels_t), callbacks=[checkpoint_callback, early_stopping, lr_logger], verbose=1)

        if os.path.exists(local_checkpoint_filepath):
            print(f"Best weights saved locally at: {local_checkpoint_filepath}")

            shutil.copy(local_checkpoint_filepath, drive_checkpoint_filepath)
            if os.path.exists(drive_checkpoint_filepath):
                print(f"Best weights also copied to Google Drive at: {drive_checkpoint_filepath}")
            else:
                print(f"Failed to copy best weights to Google Drive at: {drive_checkpoint_filepath}")
        else:
            print(f"Failed to save best weights locally at: {local_checkpoint_filepath}")

        model.load_weights(local_checkpoint_filepath)

        test_predictions = model.predict(test_images)
        test_predictions = (test_predictions > 0.5).astype(int)

        accuracy = accuracy_score(test_labels_t, test_predictions)
        precision = precision_score(test_labels_t, test_predictions)
        recall = recall_score(test_labels_t, test_predictions)
        f1 = f1_score(test_labels_t, test_predictions)

        accuracy_list.append(accuracy)
        precision_list.append(precision)
        recall_list.append(recall)
        f1_list.append(f1)

        print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")

        print("ModelCheckpoint callback triggered and saved the best weights.")

        plt.plot(history.history['accuracy'])
        plt.plot(history.history['val_accuracy'])
        plt.title('Model accuracy')
        plt.ylabel('Accuracy')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        file_name = f'{model_name}_accuracy_iteration_{iteration + 1}.png'
        plt.savefig(file_name)
        plt.close()
        files.download(file_name)

        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.title('Model loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        file_name = f'{model_name}_loss_iteration_{iteration + 1}.png'
        plt.savefig(file_name)
        plt.close()
        files.download(file_name)

        cm = confusion_matrix(test_labels_t, test_predictions)
        plt.figure(figsize=(6, 5))
        sns.heatmap(cm, annot=True, fmt='d', xticklabels=['AD', 'HC'], yticklabels=['AD', 'HC'])
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.title(f'Confusion Matrix - {model_name} - Iteration {iteration + 1}')
        file_name = f'{model_name}_confusion_matrix_iteration_{iteration + 1}.png'
        plt.savefig(file_name)
        plt.close()
        files.download(file_name)


        print("\nPredictions for Each Subject's Images:")
        start_idx = 0
        for i, (subject_folder, num_images) in enumerate(zip(test_subjects, test_image_counts_per_subject)):
            end_idx = start_idx + num_images
            subject_images = test_images[start_idx:end_idx]
            subject_predictions = (model.predict(subject_images) > 0.5).astype(int)
            ad_count = sum(subject_predictions == 0)
            hc_count = sum(subject_predictions == 1)
            print(f"Subject {i + 1} ({subject_folder}):")
            print(f"AD count: {ad_count}, HC count: {hc_count}")
            start_idx = end_idx




    print("\nAverage Metrics:")
    print(f"Average Accuracy: {np.mean(accuracy_list):.4f}")
    print(f"Average Precision: {np.mean(precision_list):.4f}")
    print(f"Average Recall: {np.mean(recall_list):.4f}")
    print(f"Average F1 Score: {np.mean(f1_list):.4f}")



## VGG16

In [None]:
vgg16_base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

In [None]:
    # x = model.output
    # x = Flatten()(x)
    # x = Dense(128)(x)
    # x = Dropout(0.2)(x)
    # x = Dense(256)(x)
    # x = Dropout(0.2)(x)
model_name = 'vgg16_Occipital'
train_and_evaluate_model(vgg16_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

## ResNet50

In [None]:
ResNet50_base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

In [None]:
model_name = 'ResNet50_Occipital'
train_and_evaluate_model(ResNet50_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

## InceptionV3

In [None]:
Incep_base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

In [None]:
model_name = 'InceptionV3_Occipital'
train_and_evaluate_model(Incep_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

## DenseNet121

In [None]:
Dense_base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/densenet/densenet121_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'DenseNet121_Occipital'
train_and_evaluate_model(Dense_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

# Temporal

In [None]:
data_dir = '/content/drive/MyDrive/Final_1Hz/lobes/Temporal/'


def extract_numeric_part(folder_name):
    return int(folder_name.split('_')[-1])

folders = sorted([f for f in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, f))], key=extract_numeric_part)

subject_folders, labels = [], []

for folder_name in folders:
    subject_num = int(folder_name.split('_')[-1])
    subject_folder = os.path.join(data_dir, folder_name)
    subject_folders.append(subject_folder)

    if subject_num <= 36:
        labels.append(0)  # AD group
    elif 37 <= subject_num <= 65:
        labels.append(1)  # HC group

subject_folders = np.array(subject_folders)
labels = np.array(labels)


In [None]:
def load_images_and_labels(subjects, labels):
    images, labels_t = [], []
    for subject_folder, label in zip(subjects, labels):
        for filename in os.listdir(subject_folder):
            img_path = os.path.join(subject_folder, filename.decode())
            try:
                img = Image.open(img_path)

                width, height = img.size
                left = 99
                top = 50
                right = width - 176
                bottom = height - 73
                img = img.crop((left, top, right, bottom))

                img = img.resize((224, 224))
                img_array = img_to_array(img)
                img_array = img_array / 255.0

                images.append(img_array)
                labels_t.append(label)
            except Exception as e:
                print(f"Error loading image: {img_path}, {e}")

    return np.array(images), np.array(labels_t)


In [None]:
def load_and_preprocess_data(subject_folders, labels):
    train_subjects, test_subjects, train_labels, test_labels = train_test_split(subject_folders, labels, test_size=0.2, stratify=labels)

    train_subjects, val_subjects, train_labels, val_labels = train_test_split(train_subjects, train_labels, test_size=0.1, stratify=train_labels)

    train_images, train_labels_t = load_images_and_labels(train_subjects, train_labels)
    val_images, val_labels_t = load_images_and_labels(val_subjects, val_labels)

    test_image_counts_per_subject = [len(os.listdir(folder)) for folder in test_subjects]

    test_images, test_labels_t = load_images_and_labels(test_subjects, test_labels)

    return train_images, train_labels_t, val_images, val_labels_t, test_images, test_labels_t, test_subjects, test_image_counts_per_subject


In [None]:
def create_new_model(base_model):
    model = clone_model(base_model)

    for layer in model.layers:
        layer.trainable = False

    x = model.output
    x = Flatten()(x)
    x = Dense(128)(x)
    x = Dropout(0.2)(x)
    x = Dense(256)(x)
    x = Dropout(0.5)(x)
    x = Dense(512)(x)
    x = Dropout(0.2)(x)

    predictions = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=model.input, outputs=predictions)
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])

    return model

In [None]:

class CustomCheckpoint(ModelCheckpoint):
    def __init__(self, start_epoch, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_epoch = start_epoch

    def on_epoch_end(self, epoch, logs=None):
        if epoch >= self.start_epoch:
            super().on_epoch_end(epoch, logs)

# def lr_schedule(epoch, lr):
#     if epoch > 0 and epoch % 10 == 0:
#         return lr * (1/3)
#     return lr

def train_and_evaluate_model(base_model, num_epochs=50, num_iterations=5, model_name='model'):
    accuracy_list, precision_list, recall_list, f1_list = [], [], [], []

    for iteration in range(num_iterations):
        print(f"\nIteration {iteration + 1}:")

        model = create_new_model(base_model)

        train_images, train_labels_t, val_images, val_labels_t, test_images, test_labels_t, test_subjects, test_image_counts_per_subject = load_and_preprocess_data(subject_folders, labels)

        train_data = list(zip(train_images, train_labels_t))
        np.random.shuffle(train_data)
        train_images, train_labels_t = zip(*train_data)

        val_data = list(zip(val_images, val_labels_t))
        np.random.shuffle(val_data)
        val_images, val_labels_t = zip(*val_data)

        test_data = list(zip(val_images, test_labels_t))
        np.random.shuffle(test_data)
        test_images, test_labels_t = zip(*test_data)

        train_images, train_labels_t = np.array(train_images), np.array(train_labels_t)
        val_images, val_labels_t = np.array(val_images), np.array(val_labels_t)
        test_images, test_labels_t = np.array(test_images), np.array(test_labels_t)


        checkpoint_dir = '/content'
        os.makedirs(checkpoint_dir, exist_ok=True)

        local_checkpoint_filepath = os.path.join(checkpoint_dir, f"{model_name}_best_weights_iteration_{iteration + 1}.h5")
        drive_checkpoint_filepath = f'/content/drive/My Drive/{model_name}_best_weights_iteration_{iteration + 1}.h5'

        checkpoint_callback = CustomCheckpoint(
            start_epoch=30,
            filepath=local_checkpoint_filepath,
            monitor='val_accuracy',
            mode='max',
            save_best_only=True,
            save_weights_only=True,
            verbose=1
        )

        class LearningRateLogger(Callback):
            def on_epoch_end(self, epoch, logs=None):
                logs = logs or {}
                logs['learning_rate'] = self.model.optimizer.learning_rate.numpy()
        lr_logger = LearningRateLogger()

        early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min', baseline=None, restore_best_weights=False, start_from_epoch=30)

        # lr_scheduler = LearningRateScheduler(lr_schedule)

        history = model.fit(train_images, train_labels_t, epochs=num_epochs, batch_size=128, validation_data=(val_images, val_labels_t), callbacks=[checkpoint_callback, early_stopping, lr_logger], verbose=1)

        if os.path.exists(local_checkpoint_filepath):
            print(f"Best weights saved locally at: {local_checkpoint_filepath}")

            shutil.copy(local_checkpoint_filepath, drive_checkpoint_filepath)
            if os.path.exists(drive_checkpoint_filepath):
                print(f"Best weights also copied to Google Drive at: {drive_checkpoint_filepath}")
            else:
                print(f"Failed to copy best weights to Google Drive at: {drive_checkpoint_filepath}")
        else:
            print(f"Failed to save best weights locally at: {local_checkpoint_filepath}")

        model.load_weights(local_checkpoint_filepath)

        test_predictions = model.predict(test_images)
        test_predictions = (test_predictions > 0.5).astype(int)

        accuracy = accuracy_score(test_labels_t, test_predictions)
        precision = precision_score(test_labels_t, test_predictions)
        recall = recall_score(test_labels_t, test_predictions)
        f1 = f1_score(test_labels_t, test_predictions)

        accuracy_list.append(accuracy)
        precision_list.append(precision)
        recall_list.append(recall)
        f1_list.append(f1)

        print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")

        print("ModelCheckpoint callback triggered and saved the best weights.")

        plt.plot(history.history['accuracy'])
        plt.plot(history.history['val_accuracy'])
        plt.title('Model accuracy')
        plt.ylabel('Accuracy')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        file_name = f'{model_name}_accuracy_iteration_{iteration + 1}.png'
        plt.savefig(file_name)
        plt.close()
        files.download(file_name)

        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.title('Model loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        file_name = f'{model_name}_loss_iteration_{iteration + 1}.png'
        plt.savefig(file_name)
        plt.close()
        files.download(file_name)

        cm = confusion_matrix(test_labels_t, test_predictions)
        plt.figure(figsize=(6, 5))
        sns.heatmap(cm, annot=True, fmt='d', xticklabels=['AD', 'HC'], yticklabels=['AD', 'HC'])
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.title(f'Confusion Matrix - {model_name} - Iteration {iteration + 1}')
        file_name = f'{model_name}_confusion_matrix_iteration_{iteration + 1}.png'
        plt.savefig(file_name)
        plt.close()
        files.download(file_name)

        print("\nPredictions for Each Subject's Images:")
        start_idx = 0
        for i, (subject_folder, num_images) in enumerate(zip(test_subjects, test_image_counts_per_subject)):
            end_idx = start_idx + num_images
            subject_images = test_images[start_idx:end_idx]
            subject_predictions = (model.predict(subject_images) > 0.5).astype(int)
            ad_count = sum(subject_predictions == 0)
            hc_count = sum(subject_predictions == 1)
            print(f"Subject {i + 1} ({subject_folder}):")
            print(f"AD count: {ad_count}, HC count: {hc_count}")
            start_idx = end_idx

    print("\nAverage Metrics:")
    print(f"Average Accuracy: {np.mean(accuracy_list):.4f}")
    print(f"Average Precision: {np.mean(precision_list):.4f}")
    print(f"Average Recall: {np.mean(recall_list):.4f}")
    print(f"Average F1 Score: {np.mean(f1_list):.4f}")


## VGG16

In [None]:
vgg16_base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'vgg16_Temporal'
train_and_evaluate_model(vgg16_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## ResNet50

In [None]:
resnet_base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'resnet50_Temporal_1'
train_and_evaluate_model(resnet_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## InceptionV3

In [None]:
model_name = 'InceptionV3_Temporal'
train_and_evaluate_model(incep_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## DenseNet121

In [None]:
Dense_base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/densenet/densenet121_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'DenseNet121_Temporal'
train_and_evaluate_model(Dense_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

# Parietal

## VGG16

In [None]:
vgg16_base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

In [None]:
    # x = model.output
    # x = Flatten()(x)
    # x = Dense(128)(x)
    # x = Dropout(0.2)(x)
    # x = Dense(256)(x)
    # x = Dropout(0.2)(x)
model_name = 'vgg16_Parietal'
train_and_evaluate_model(vgg16_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

## ResNet50

In [None]:
resnet_base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'resnet50_Parietal'
train_and_evaluate_model(resnet_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## Inception v3

In [None]:
incep_base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'InceptionV3_Parietal'
train_and_evaluate_model(incep_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## DenseNet121

In [None]:
Dense_base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/densenet/densenet121_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'DenseNet121_Parietal'
train_and_evaluate_model(Dense_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

# Central

## VGG16

In [None]:
vgg16_base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

In [None]:
    # x = model.output
    # x = Flatten()(x)
    # x = Dense(128)(x)
    # x = Dropout(0.2)(x)
    # x = Dense(256)(x)
    # x = Dropout(0.2)(x)
model_name = 'vgg16_Central'
train_and_evaluate_model(vgg16_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

## ResNet50

In [None]:
resnet_base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'resnet50_Centrall'
train_and_evaluate_model(resnet_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## Inception v3

In [None]:
incep_base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'InceptionV3_Central'
train_and_evaluate_model(incep_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## DenseNet121

In [None]:
Dense_base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/densenet/densenet121_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'DenseNet121_Central'
train_and_evaluate_model(Dense_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

# Frontal

## VGG16

In [None]:
vgg16_base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

In [None]:
    # x = model.output
    # x = Flatten()(x)
    # x = Dense(128)(x)
    # x = Dropout(0.2)(x)
    # x = Dense(256)(x)
    # x = Dropout(0.2)(x)
model_name = 'vgg16_Frontal'
train_and_evaluate_model(vgg16_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

## ResNet50

In [None]:
resnet_base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'resnet50_Frontal'
train_and_evaluate_model(resnet_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## Inception v3

In [None]:
incep_base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'InceptionV3_Frontal'
train_and_evaluate_model(incep_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## DenseNet121

In [None]:
Dense_base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/densenet/densenet121_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'DenseNet121_Frontal'
train_and_evaluate_model(Dense_base_model, subject_folders, labels, num_epochs=100, num_iterations=5, model_name=model_name)

# Concatenated

In [None]:
data_dir = '/content/drive/MyDrive/Final_1Hz/Final_1Hz_con/'


def extract_numeric_part(folder_name):
    return int(folder_name.split('_')[-1])

folders = sorted([f for f in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, f))], key=extract_numeric_part)

subject_folders, labels = [], []

for folder_name in folders:
    subject_num = int(folder_name.split('_')[-1])
    subject_folder = os.path.join(data_dir, folder_name)
    subject_folders.append(subject_folder)

    if subject_num <= 36:
        labels.append(0)  # AD group
    elif 37 <= subject_num <= 65:
        labels.append(1)  # HC group

subject_folders = np.array(subject_folders)
labels = np.array(labels)


In [None]:
def load_images_and_labels(subjects, labels):
    images, labels_t = [], []
    for subject_folder, label in zip(subjects, labels):
        for filename in os.listdir(subject_folder):
                img_path = os.path.join(subject_folder, filename.decode())
                try:
                    img = load_img(img_path, target_size=(224, 224))
                    img_array = img_to_array(img)
                    img_array = img_array / 255.0
                    images.append(img_array)
                    labels_t.append(label)
                except Exception as e:
                    print(f"Error loading image: {img_path}, {e}")

    return np.array(images), np.array(labels_t)

In [None]:
def load_and_preprocess_data(subject_folders, labels):
    train_subjects, test_subjects, train_labels, test_labels = train_test_split(subject_folders, labels, test_size=0.2, stratify=labels)

    train_subjects, val_subjects, train_labels, val_labels = train_test_split(train_subjects, train_labels, test_size=0.1, stratify=train_labels)

    train_images, train_labels_t = load_images_and_labels(train_subjects, train_labels)
    val_images, val_labels_t = load_images_and_labels(val_subjects, val_labels)

    test_image_counts_per_subject = [len(os.listdir(folder)) for folder in test_subjects]

    test_images, test_labels_t = load_images_and_labels(test_subjects, test_labels)

    return train_images, train_labels_t, val_images, val_labels_t, test_images, test_labels_t, test_subjects, test_image_counts_per_subject


In [None]:
def create_new_model(base_model):
    model = clone_model(base_model)

    for layer in model.layers:
        layer.trainable = False

    x = model.output
    x = Flatten()(x)
    x = Dense(128)(x)
    x = Dropout(0.2)(x)
    x = Dense(256)(x)
    x = Dropout(0.5)(x)
    x = Dense(512)(x)
    x = Dropout(0.2)(x)

    predictions = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=model.input, outputs=predictions)
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.01), metrics=['accuracy'])

    return model

In [None]:
class CustomCheckpoint(ModelCheckpoint):
    def __init__(self, start_epoch, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_epoch = start_epoch

    def on_epoch_end(self, epoch, logs=None):
        if epoch >= self.start_epoch:
            super().on_epoch_end(epoch, logs)

# Define a learning rate scheduler function
def lr_schedule(epoch, lr):
    if epoch > 0 and epoch % 10 == 0:
        return lr * (1/3)
    return lr

def train_and_evaluate_model(base_model, num_epochs=50, num_iterations=5, model_name='model'):
    accuracy_list, precision_list, recall_list, f1_list = [], [], [], []

    for iteration in range(num_iterations):
        print(f"\nIteration {iteration + 1}:")

        model = create_new_model(base_model)

        train_images, train_labels_t, val_images, val_labels_t, test_images, test_labels_t, test_subjects, test_image_counts_per_subject = load_and_preprocess_data(subject_folders, labels)

        train_data = list(zip(train_images, train_labels_t))
        np.random.shuffle(train_data)
        train_images, train_labels_t = zip(*train_data)

        val_data = list(zip(val_images, val_labels_t))
        np.random.shuffle(val_data)
        val_images, val_labels_t = zip(*val_data)

        train_images, train_labels_t = np.array(train_images), np.array(train_labels_t)
        val_images, val_labels_t = np.array(val_images), np.array(val_labels_t)

        checkpoint_dir = '/content'
        os.makedirs(checkpoint_dir, exist_ok=True)

        local_checkpoint_filepath = os.path.join(checkpoint_dir, f"{model_name}_best_weights_iteration_{iteration + 1}.h5")
        drive_checkpoint_filepath = f'/content/drive/My Drive/{model_name}_best_weights_iteration_{iteration + 1}.h5'

        checkpoint_callback = CustomCheckpoint(
            start_epoch=30,
            filepath=local_checkpoint_filepath,
            monitor='val_accuracy',
            mode='max',
            save_best_only=True,
            save_weights_only=True,
            verbose=1
        )

        class LearningRateLogger(Callback):
            def on_epoch_end(self, epoch, logs=None):
                logs = logs or {}
                logs['learning_rate'] = self.model.optimizer.learning_rate.numpy()
        lr_logger = LearningRateLogger()

        early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min', baseline=None, restore_best_weights=False, start_from_epoch=30)

        # Add the learning rate scheduler callback
        lr_scheduler = LearningRateScheduler(lr_schedule)

        history = model.fit(train_images, train_labels_t, epochs=num_epochs, batch_size=128, validation_data=(val_images, val_labels_t), callbacks=[checkpoint_callback, early_stopping, lr_scheduler], verbose=1)

        if os.path.exists(local_checkpoint_filepath):
            print(f"Best weights saved locally at: {local_checkpoint_filepath}")

            shutil.copy(local_checkpoint_filepath, drive_checkpoint_filepath)
            if os.path.exists(drive_checkpoint_filepath):
                print(f"Best weights also copied to Google Drive at: {drive_checkpoint_filepath}")
            else:
                print(f"Failed to copy best weights to Google Drive at: {drive_checkpoint_filepath}")
        else:
            print(f"Failed to save best weights locally at: {local_checkpoint_filepath}")

        model.load_weights(local_checkpoint_filepath)

        test_predictions = model.predict(test_images)
        test_predictions = (test_predictions > 0.5).astype(int)

        accuracy = accuracy_score(test_labels_t, test_predictions)
        precision = precision_score(test_labels_t, test_predictions)
        recall = recall_score(test_labels_t, test_predictions)
        f1 = f1_score(test_labels_t, test_predictions)

        accuracy_list.append(accuracy)
        precision_list.append(precision)
        recall_list.append(recall)
        f1_list.append(f1)

        print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")

        print("ModelCheckpoint callback triggered and saved the best weights.")

        plt.plot(history.history['accuracy'])
        plt.plot(history.history['val_accuracy'])
        plt.title('Model accuracy')
        plt.ylabel('Accuracy')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        file_name = f'{model_name}_accuracy_iteration_{iteration + 1}.png'
        plt.savefig(file_name)
        plt.close()
        files.download(file_name)

        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.title('Model loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        file_name = f'{model_name}_loss_iteration_{iteration + 1}.png'
        plt.savefig(file_name)
        plt.close()
        files.download(file_name)

        cm = confusion_matrix(test_labels_t, test_predictions)
        plt.figure(figsize=(6, 5))
        sns.heatmap(cm, annot=True, fmt='d', xticklabels=['AD', 'HC'], yticklabels=['AD', 'HC'])
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.title(f'Confusion Matrix - {model_name} - Iteration {iteration + 1}')
        file_name = f'{model_name}_confusion_matrix_iteration_{iteration + 1}.png'
        plt.savefig(file_name)
        plt.close()
        files.download(file_name)

        print("\nPredictions for Each Subject's Images:")
        start_idx = 0
        for i, (subject_folder, num_images) in enumerate(zip(test_subjects, test_image_counts_per_subject)):
            end_idx = start_idx + num_images
            subject_images = test_images[start_idx:end_idx]
            subject_predictions = (model.predict(subject_images) > 0.5).astype(int)
            ad_count = sum(subject_predictions == 0)
            hc_count = sum(subject_predictions == 1)
            print(f"Subject {i + 1} ({subject_folder}):")
            print(f"AD count: {ad_count}, HC count: {hc_count}")
            start_idx = end_idx

    print("\nAverage Metrics:")
    print(f"Average Accuracy: {np.mean(accuracy_list):.4f}")
    print(f"Average Precision: {np.mean(precision_list):.4f}")
    print(f"Average Recall: {np.mean(recall_list):.4f}")
    print(f"Average F1 Score: {np.mean(f1_list):.4f}")


## VGG16

In [None]:
vgg16_base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
    # x = model.output
    # x = Flatten()(x)
    # x = Dense(128)(x)
    # x = Dropout(0.2)(x)
    # x = Dense(256)(x)
    # x = Dropout(0.2)(x)
model_name = 'vgg16_con'
train_and_evaluate_model(vgg16_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

In [None]:
model_name = 'vgg16_con_2'
train_and_evaluate_model(vgg16_base_model, num_epochs=100, num_iterations=1, model_name=model_name)

## ResNet50

In [None]:
ResNet50_base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'ResNet50_con'
train_and_evaluate_model(ResNet50_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## InceptionV3

In [None]:
Incep_base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
model_name = 'InceptionV3_con'
train_and_evaluate_model(Incep_base_model, num_epochs=100, num_iterations=5, model_name=model_name)

## DenseNet121

In [None]:
model_name = 'DenseNet121_con'
train_and_evaluate_model(resnet_base_model, num_epochs=100, num_iterations=5, model_name=model_name)