In [None]:
pip install numpy keras SimpleITK scikit-learn deap h5py

In [None]:
pip install tensorflow-gpu

In [None]:
pip install tensorflow

In [None]:
pip install opencv-python

In [None]:
from deap import base, creator, tools, algorithms

In [None]:
# Import necessary libraries
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GlobalAveragePooling3D, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import LearningRateScheduler, EarlyStopping
from sklearn.model_selection import train_test_split
from google.colab import drive
from deap import base, creator, tools, algorithms
import gc  # Import the garbage collection module

# Mount Google Drive
drive.mount('/content/drive')

# Define 3D data dimensions
width, height, depth = (224, 224, 64)

# Data paths
data_folder = "/content/drive/MyDrive/Normalization_output_file2"
output_folder = "/content/drive/MyDrive/Models"
num_epochs = 20
batch_size = 1

# Mapping class names to labels
class_name_to_label = {'VeryMildDemented': 0, 'MildDemented': 1, 'ModerateDemented': 2, 'NonDemented': 3}

# Load and preprocess data
data, labels = [], []
for class_folder in os.listdir(data_folder):
    if os.path.isdir(os.path.join(data_folder, class_folder)):
        class_label = class_name_to_label.get(class_folder, -1)
        if class_label != -1:
            for file_name in os.listdir(os.path.join(data_folder, class_folder)):
                if file_name.endswith(".npy"):
                    data.append(np.load(os.path.join(data_folder, class_folder, file_name)))
                    labels.append(class_label)

data = np.array(data)
labels = np.array(labels)

# Train-validation split
train_data, val_data, train_labels, val_labels = train_test_split(data, labels, test_size=0.2, random_state=42)

# Define data augmentation generator
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest",
    rescale=1.0 / 255
)

# Create the Xception-based 3D model
num_classes = len(np.unique(train_labels))

def create_model(learning_rate):
    input_shape = (width, height, depth, 1 if len(train_data.shape) == 4 else train_data.shape[4])
    base_model = tf.keras.applications.Xception(weights='imagenet', include_top=False, input_shape=input_shape)
    model = Sequential([
        base_model,
        GlobalAveragePooling3D(),
        Dense(16, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    optimizer = Adam(lr=learning_rate)
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Distributed training strategy
strategy = tf.distribute.experimental.CentralStorageStrategy()

# Define data input pipelines for distributed training
with strategy.scope():
    train_dataset = tf.data.Dataset.from_tensor_slices((train_data, train_labels)).batch(batch_size)
    val_dataset = tf.data.Dataset.from_tensor_slices((val_data, val_labels)).batch(batch_size)

# Learning rate schedule and early stopping
def lr_schedule(epoch):
    return 0.001 * (0.1 ** int(epoch / 10))

early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Define a function to evaluate the model's fitness
def evaluate_model(individual):
    learning_rate = individual[0]
    model = create_model(learning_rate)

    # Training
    model.fit(train_dataset, epochs=num_epochs, validation_data=val_dataset, callbacks=[LearningRateScheduler(lr_schedule), early_stopping], verbose=0)

    # Evaluation
    _, val_accuracy = model.evaluate(val_dataset, verbose=0)

    # Clear unnecessary variables
    del model
    gc.collect()  # Manually trigger garbage collection to clear memory

    # Minimize the negative validation accuracy
    return -val_accuracy

# Clear unnecessary variables
del data
del labels
del train_data
del val_data

# Create the DEAP toolbox
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)
toolbox = base.Toolbox()
toolbox.register("attr_float", np.random.uniform, 1e-5, 1e-3)  # Learning rate range
toolbox.register("individual", tools.initCycle, creator.Individual, (toolbox.attr_float,), n=1)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# Define the genetic operators
toolbox.register("mate", tools.cxBlend, alpha=0.5)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=0.1, indpb=0.1)
toolbox.register("select", tools.selTournament, tournsize=3)

# Create an initial population of 10 individuals
population = toolbox.population(n=10)

# Create the hall of fame to store the best individual
hall_of_fame = tools.HallOfFame(1)

# Create the statistics object
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("max", np.max)
stats.register("avg", np.mean)

# Run the genetic algorithm
NGEN = 10  # Number of generations
CXPB = 0.7  # Crossover probability
MUTPB = 0.2  # Mutation probability

# Create a logbook to keep track of the evolution
logbook = tools.Logbook()
logbook.header = ["gen", "evals", "max", "avg"]

for gen in range(NGEN):
    # Evaluate the entire population
    fitnesses = map(evaluate_model, population)
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    # Update the hall of fame with the best individual
    hall_of_fame.update(population)

    # Clone the population
    offspring = list(map(toolbox.clone, population))

    # Apply crossover and mutation on the offspring
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if np.random.rand() < CXPB:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values

    for mutant in offspring:
        if np.random.rand() < MUTPB:
            toolbox.mutate(mutant)
            del mutant.fitness.values

    # Evaluate the offspring
    fitnesses = map(evaluate_model, offspring)
    for ind, fit in zip(offspring, fitnesses):
        ind.fitness.values = fit

    # Replace the old population by the offspring
    population[:] = offspring

    # Record statistics
    record = stats.compile(population)
    logbook.record(gen=gen, evals=len(population), **record)
    print(logbook.stream)

# Get the best individual and its fitness value from the hall of fame
best_ind = hall_of_fame[0]
best_fitness = best_ind.fitness.values[0]
print("Best Individual:")
print("Learning Rate =", best_ind[0])
print("Best Fitness =", best_fitness)

# Use the best learning rate to create and train the final model
best_learning_rate = best_ind[0]
with strategy.scope():
    final_model = create_model(best_learning_rate)

# Training
final_model.fit(train_dataset, epochs=num_epochs, validation_data=val_dataset, callbacks=[LearningRateScheduler(lr_schedule), early_stopping])

# Evaluation
val_loss, val_accuracy = final_model.evaluate(val_dataset, verbose=0)
print(f"Validation Loss: {val_loss:.4f}")
print(f"Validation Accuracy: {val_accuracy * 100:.2f}%")

# Save the final model
final_model.save(os.path.join(output_folder, "Xception_model_3d.h5"))

# Clear unnecessary variables
del data
del labels
del train_data
del val_data
