In [1]:
#Step 1: Import Required Libraries and Define Functions

In [None]:
import numpy as np
import random
import cv2
import os
os.environ["PATH"] += os.pathsep + r"C:\Program Files\Graphviz\bin"
from imutils import paths
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer
from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score
from imblearn.over_sampling import SMOTE
import pydot
import graphviz
from tensorflow.keras.utils import plot_model
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras import backend as K

def load_image_paths(img_path):
    return list(paths.list_images(img_path))

def load_data(image_paths, verbose=-1):
    data = []
    labels = []
    for (i, imgpath) in enumerate(image_paths):
        im_gray = cv2.imread(imgpath, cv2.IMREAD_GRAYSCALE)
        image = cv2.resize(im_gray, (96, 96))
        label = imgpath.split(os.path.sep)[-2]
        data.append(image / 255.0)
        labels.append(label)
        if verbose > 0 and i > 0 and (i + 1) % verbose == 0:
            print("[INFO] processed {}/{}".format(i + 1, len(image_paths)))
    return data, labels

def count_images(directory):
    counts = {}
    for label in os.listdir(directory):
        counts[label] = len(os.listdir(os.path.join(directory, label)))
    return counts

In [2]:
#Step 2: Load Image Paths and Data

In [None]:
CLASSES = [ 'NPD',
            'PD']

In [3]:
#Step 3: Print Shapes

In [None]:
import os
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer

# Define path to your data folder
img_path = r'C:\Users\ArnaB\Downloads\Compressed\ntua-parkinson-dataset-master_2\NTUA\FINAL DATASET(PREPROCESSED)\train'

# Get the path list using the path object
image_paths = load_image_paths(img_path)

# Apply our function
image_list, label_list = load_data(image_paths, verbose=2)

# Binarize the labels
lb = LabelBinarizer()
label_list = lb.fit_transform(label_list)

# Ensure the labels are one-hot encoded correctly
if label_list.shape[1] == 1:
    label_list = np.hstack((1 - label_list, label_list))

# Split data into training and test set
X_train, X_test, y_train, y_test = train_test_split(image_list, label_list, test_size=0.1, random_state=42)

print("Train Directory Contents:", os.listdir(img_path))

# Define classes
CLASSES = ['NPD', 'PD']

# Step 4: Print Shapes
print(np.shape(X_train))  # Expected: (16900, 96, 96)
print(np.shape(X_test))   # Expected: (1878, 96, 96)
print(np.shape(y_train))  # Expected: (16900, 2)
print(np.shape(y_test))   # Expected: (1878, 2)

In [None]:
#Step 4: Convert One-Hot Labels to Single Labels 

In [None]:
# Convert one-hot encoded labels to single labels
y_train_single = np.argmax(y_train, axis=1)
y_test_single = np.argmax(y_test, axis=1)

print(np.shape(y_train_single))  # Should be (447,)
print(np.shape(y_test_single))   # Should be (50,)

In [None]:
#Step 5: Apply SMOTE with Reduced Number of Neighbors

In [None]:
# Ensure X_train is a numpy array
X_train = np.array(X_train)

# Adjust k_neighbors based on your smallest class size
sm = SMOTE(random_state=42, k_neighbors=1)
X_train_resampled, y_train_resampled = sm.fit_resample(X_train.reshape(-1, 96*96), y_train_single)
X_train_resampled = X_train_resampled.reshape(-1, 96, 96, 1)

print(np.shape(X_train_resampled))  # Should be (new_size, 96, 96, 1)
print(np.shape(y_train_resampled))  # Should be (new_size,)

In [None]:
#Step 6: Convert Single Labels Back to One-Hot Encoding

In [None]:
# Define the number of classes
num_classes = y_train.shape[1]  # Should be 71

# Convert single labels back to one-hot encoding
y_train_resampled_one_hot = np.eye(num_classes)[y_train_resampled]
y_test_one_hot = np.eye(num_classes)[y_test_single]

print(np.shape(y_train_resampled_one_hot))  # Should be (new_size, num_classes)
print(np.shape(y_test_one_hot))            # Should be (50, num_classes)

In [None]:
#Step 7: Create Clients

In [None]:
def create_clients(image_list, label_list, num_clients=6, initial='clients'):
    ''' return: a dictionary with keys clients' names and value as
                data shards - tuple of images and label lists.
        args:
            image_list: a list of numpy arrays of training images
            label_list: a list of binarized labels for each image
            num_client: number of federated members (clients)
            initials: the clients' name prefix, e.g., clients_1
    '''

    # create a list of client names
    client_names = ['{}_{}'.format(initial, i+1) for i in range(num_clients)]

    # randomize the data
    data = list(zip(image_list, label_list))
    random.shuffle(data)

    # shard data and place at each client
    size = len(data)//num_clients
    shards = [data[i:i + size] for i in range(0, size*num_clients, size)]

    # number of clients must equal the number of shards
    assert(len(shards) == len(client_names))

    return {client_names[i]: shards[i] for i in range(len(client_names))}

# create clients
clients = create_clients(X_train_resampled, y_train_resampled_one_hot, num_clients=6, initial='client')

# Print the client names and the size of their data shards
for client_name, data_shard in clients.items():
    print(f"{client_name}: {len(data_shard)} samples")

In [None]:
#Step 8: Batch Data

In [None]:
def batch_data(data_shard, bs=4):
    '''Takes in a client's data shard and create a tfds object off it
    args:
        data_shard: a data, label constituting a client's data shard
        bs: batch size
    return:
        tfds object'''
    # separate shard into data and labels lists
    data, label = zip(*data_shard)
    dataset = tf.data.Dataset.from_tensor_slices((list(data), list(label)))
    return dataset.shuffle(len(label)).batch(bs)

# process and batch the training data for each client
clients_batched = dict()
for (client_name, data) in clients.items():
    clients_batched[client_name] = batch_data(data)

# process and batch the test set
test_batched = tf.data.Dataset.from_tensor_slices((X_test, y_test_one_hot)).batch(len(y_test_one_hot))

# Print the size of batched data for each client
for client_name, data in clients_batched.items():
    print(f"{client_name}: {tf.data.experimental.cardinality(data).numpy()} batches")

In [None]:
#Step 9: Define CNN Model

In [None]:
class CNN:
    @staticmethod
    def build(width, height, channels, classes):
        model = Sequential()

        model.add(Conv2D(filters=16, kernel_size=(3, 3), strides=(1, 1), activation="relu", kernel_initializer='he_normal',
                         input_shape=(width, height, channels)))
        model.add(MaxPooling2D(pool_size=(2, 2)))

        model.add(Conv2D(filters=32, kernel_size=(3, 3), strides=(1, 1), activation="relu", kernel_initializer='he_normal'))
        model.add(MaxPooling2D(pool_size=(2, 2)))

        model.add(Conv2D(filters=128, kernel_size=(3, 3), strides=(1, 1), activation="relu", kernel_initializer='he_normal'))
        model.add(MaxPooling2D(pool_size=(2, 2)))

        model.add(Flatten())
        model.add(Dense(128, activation="relu", kernel_initializer='he_normal'))
        model.add(Dense(64, activation="relu"))
        model.add(Dense(classes, activation="softmax"))

        model.compile(optimizer='adam', loss="categorical_crossentropy", metrics=['accuracy'])

        return model

In [None]:
Step 10: Define the cnn model

In [None]:
# Define CNN model
model = CNN.build(96, 96, 1, 2)
model.summary()  # Print the model summary to verify the structure

In [None]:
Step 11: Train the Federated Model(Genetic Algorithm) For 10 generation

In [None]:
import numpy as np
import tensorflow as tf

# GA Hyperparameters
num_generations = 10
population_size = len(clients_batched)  # Population size matches the number of clients
num_parents = population_size // 2  # Number of top-performing parents each generation
crossover_rate = 0.8  # Probability of crossover
mutation_rate = 0.02  # Reduced mutation rate to limit drastic weight changes

# Initialize global model weights
global_weights = model.get_weights()

# Compile the global model
model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy'])

# Function to select parents based on fitness scores
def select_parents(population, fitness_scores, num_parents):
    parents = [population[i] for i in np.argsort(fitness_scores)[-num_parents:]]
    return parents

# Crossover function to blend weights of two parent models
def crossover(parent1, parent2, crossover_rate=0.8):
    child = tf.keras.models.clone_model(parent1)
    child_weights = []
    for w1, w2 in zip(parent1.get_weights(), parent2.get_weights()):
        mask = np.random.rand(*w1.shape) < crossover_rate
        child_weights.append(np.where(mask, w1, w2))
    child.set_weights(child_weights)
    return child

# Mutation function to apply small random noise to the weights
def mutate(model, mutation_rate=0.02):
    mutated_weights = []
    for w in model.get_weights():
        if np.random.rand() < mutation_rate:
            noise = np.random.normal(scale=0.005, size=w.shape)
            mutated_weights.append(w + noise)
        else:
            mutated_weights.append(w)
    model.set_weights(mutated_weights)
    return model

# Replacement function to replace less fit models with new offspring
def replace_population(population, offspring, fitness_scores):
    sorted_indices = np.argsort(fitness_scores)
    num_replace = min(len(offspring), len(sorted_indices))
    for i in range(num_replace):
        population[sorted_indices[i]] = offspring[i]
    return population

# Initialize population with each client model
population = [tf.keras.models.clone_model(model) for _ in range(population_size)]
for individual in population:
    individual.compile(optimizer=tf.keras.optimizers.Adam(),
                       loss=tf.keras.losses.CategoricalCrossentropy(),
                       metrics=['accuracy'])

# Training with GA over multiple generations
for generation in range(num_generations):
    print(f"Generation {generation + 1}/{num_generations}")
    
    # Step 1: Evaluate fitness of each client model in the population
    fitness_scores = []
    for individual, (client_name, client_data) in zip(population, clients_batched.items()):
        print(f"Evaluating Client: {client_name}")
        
        individual.set_weights(global_weights)  # Start from global weights
        
        # Compile model after setting weights
        individual.compile(optimizer=tf.keras.optimizers.Adam(),
                           loss=tf.keras.losses.CategoricalCrossentropy(),
                           metrics=['accuracy'])
        
        for epoch in range(5):  # You can adjust the number of epochs here
            for x_batch, y_batch in client_data:
                individual.train_on_batch(x_batch, y_batch)
        
        _, accuracy = individual.evaluate(client_data, verbose=0)
        fitness_scores.append(accuracy)
    
    # Step 2: Selection - Select top-performing parents
    parents = select_parents(population, fitness_scores, num_parents)
    
    # Step 3: Crossover and Mutation - Generate offspring
    offspring = []
    for i in range(0, len(parents), 2):
        parent1, parent2 = parents[i], parents[(i + 1) % len(parents)]
        child = crossover(parent1, parent2, crossover_rate)
        child = mutate(child, mutation_rate)
        offspring.append(child)
    
    # Step 4: Replacement - Replace less fit individuals with new offspring
    population = replace_population(population, offspring, fitness_scores)
    
    # Step 5: Update global weights as the average of top models
    top_indices = np.argsort(fitness_scores)[-num_parents:]
    top_weights = [population[i].get_weights() for i in top_indices]
    new_global_weights = [np.mean([weights[layer] for weights in top_weights], axis=0) for layer in range(len(global_weights))]
    global_weights = new_global_weights

    # Log the average fitness of the generation for stability tracking
    avg_fitness = np.mean([fitness_scores[i] for i in top_indices])
    print(f"Average top fitness in generation {generation + 1}: {avg_fitness}")

# After training, evaluate the global model on test data
test_batched = test_batched.map(lambda x, y: (tf.expand_dims(x, -1), y))

# Set global weights to model for final evaluation
model.set_weights(global_weights)

# Compile the model again before evaluating
model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy'])

# Evaluate the global model on the test data
test_loss, test_accuracy = model.evaluate(test_batched, verbose=0)
print(f"Test loss: {test_loss}")
print(f"Test accuracy: {test_accuracy}")

In [None]:
Step 12: Save Model

In [None]:
model.save(r'C:\Users\ArnaB\Downloads\Compressed\ntua-parkinson-dataset-master_2\NTUA\FINAL DATASET(PREPROCESSED)\Model\my_model_GA4(94.83).h5')