Let's convert the entire script to use Flower, a popular framework for federated learning. Flower will handle all the complex server-client communication, so our code will become much simpler and more organized.

We'll follow the same step-by-step process, keeping your original model, data loading functions, and variable names so you can easily see what's changed.

## Step 1: The Foundation (Imports and Setup)
Every Python script starts with importing the necessary libraries and setting up the environment. This is like laying the foundation for a house.

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from collections import OrderedDict
import flwr as fl
import numpy as np
import pandas as pd
import os
import random
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm
from flwr.common import Context # <-- ADD THIS LINE


# --- Set Device ---
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Define dataset loader

PyTorch uses a Dataset object to handle data loading. Since our model will take three different kinds of input (sensor data, image 1, and image 2), we need to create a special class that tells PyTorch how to retrieve one sample of each, along with its corresponding label.

This class will have three essential methods:

__init__: Initializes the dataset by storing our feature and label arrays.

__len__: Returns the total number of samples in the dataset.

__getitem__: Fetches a single data sample at a given index.

Here is the code for it. Add this to your script:

In [15]:
# Define dataset loader
class CustomDatasetRes(Dataset):
    def __init__(self, features1, features2, features3, labels):
        self.features1 = features1
        self.features2 = features2
        self.features3 = features3
        self.labels = labels

    def __len__(self):
        return len(self.features1)
    
    def __getitem__(self, index):
        return self.features1[index], self.features2[index], self.features3[index], self.labels[index]

### Helper Functions
Next, we'll add a few helper functions. These functions will perform common tasks that we'll need later, like displaying results, scaling data, and ensuring our experiments are reproducible.

1. display_result

This function takes the true labels (y_test) and the model's predicted labels (y_pred) and prints out standard performance metrics like accuracy, precision, recall, and F1-score.

In [16]:
def display_result(y_test, y_pred):
    print('Accuracy score : ', accuracy_score(y_test, y_pred))
    print('Precision score : ', precision_score(y_test, y_pred, average='weighted'))
    print('Recall score : ', recall_score(y_test, y_pred, average='weighted'))
    print('F1 score : ', f1_score(y_test, y_pred, average='weighted'))

2. scaled_data

This function uses Scikit-learn's StandardScaler to normalize the sensor (CSV) data. Scaling is crucial because it ensures that features with larger value ranges don't dominate the learning process. Notice there are two functions with the same name in the original code. In Python, the last definition of a function is the one that gets used. We will add both for completeness, but just know that the first one is effectively overwritten by the second.

In [17]:
def scale_data(X_train, X_test):
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    return X_train_scaled, X_test_scaled

def scaled_data(X_train):
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    return X_train_scaled

3. set_seed

This is a very important function for reproducibility. Machine learning involves a lot of randomness (e.g., initializing model weights, shuffling data). By setting a "seed," we ensure that the sequence of random numbers is the same every time we run the code, which means we'll get the exact same results.

In [18]:
def set_seed(seed=0):
    # Sets the environment variable for Python's hash seed
    os.environ['PYTHONHASHSEED'] = str(seed)
    # Sets the seed for NumPy's random number generator
    np.random.seed(seed)
    # Sets the seed for Python's built-in random module
    random.seed(seed)
    # Sets the seed for PyTorch's random number generator
    torch.manual_seed(seed)
    # If using a GPU, sets the seed for all CUDA devices
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # For multi-GPU setups
    # Ensures deterministic behavior in cuDNN (CUDA Deep Neural Network library)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

### loading and preprocessing the data.

The function loadClientsData is designed for a federated learning scenario. It reads data from separate files for each participant (or "client"), cleans it, aligns the different data types (sensor vs. image), and splits it into training and testing sets for each client.

Because this function is quite long, we'll build it in a few parts.

#### Part 1: Initializing and Processing Training Data
First, we'll define the function, list the subject IDs we want to load, and create empty dictionaries to store each client's data. Then, we'll start a loop to process each subject one by one. Inside the loop, we'll begin by loading and cleaning the training data.

This involves:

Reading the sensor data from a CSV file.

Removing rows with missing values and any duplicate rows.

Dropping columns that we don't need (like the 'Infrared' sensor readings).

Loading the corresponding image, label, and timestamp data from .npy files.

#### Part 2: Aligning and Preparing Training Data
After loading the raw data, we face a common problem: the datasets don't perfectly match. Because we dropped rows with missing values from the sensor (CSV) data, there are now timestamps in our image data that no longer have a corresponding entry in the sensor data.

We need to align them by removing the image samples that don't have a matching sensor reading.

After alignment, we'll prepare the data for the model:

Set the seed for reproducibility.

Separate features from labels.

One-hot encode the labels, converting them into a format suitable for the model's output layer (e.g., class 3 becomes [0, 0, 0, 1, 0, ...]).

Scale the numeric sensor data and the image pixel values.

Reshape the images to the format expected by the convolutional layers.

#### Part 3: Processing the Test Data and Finalizing the Function
The logic here is identical to what we just did for the training data:

Load the test sensor data (_test.csv) and test image data (_test.npy).

Clean the sensor data by removing missing values and unnecessary columns.

Align the test image data with the cleaned test sensor data.

Prepare the aligned test data (one-hot encode labels, scale features, reshape images).

Store all the processed training and test arrays into our dictionaries.

Increment the clint_index and repeat the process for the next subject.

After the loop finishes, the function returns all the dictionaries containing the data for every client.

In [19]:
def loadClientsData():
    #subs = [1, 3, 4, 7, 10, 11, 12, 13, 14, 15, 16, 17]
    subs = [1, 4, 7, 11, 12, 14, 15, 17]
    X_train_csv_scaled_splits = {}
    X_test_csv_scaled_splits = {}
    Y_train_csv_splits = {}
    Y_test_csv_splits = {}
    X_train_1_scaled_splits = {}
    X_test_1_scaled_splits = {}
    Y_train_1_splits = {}
    Y_test_1_splits = {}
    X_train_2_scaled_splits = {}
    X_test_2_scaled_splits = {}
    Y_train_2_splits = {}
    Y_test_2_splits = {}
    clint_index = 0
    for sub_ in subs:
        # --- Load and clean TRAINING sensor data (CSV) ---
        SUB_train = pd.read_csv('./dataset/Sensor + Image/{}_sensor_train.csv'.format(sub_), skiprows=1)
        SUB_train.head()
        
        SUB_train.isnull().sum()
        NA_cols = SUB_train.columns[SUB_train.isnull().any()]
        SUB_train.dropna(inplace=True)
        SUB_train.drop_duplicates(inplace=True)
        
        times_train = SUB_train['Time']
        list_DROP = ['Infrared 1',
                     'Infrared 2',
                     'Infrared 3',
                     'Infrared 4',
                     'Infrared 5',
                     'Infrared 6']
        SUB_train.drop(list_DROP, axis=1, inplace=True)
        SUB_train.drop(NA_cols, axis=1, inplace=True)  # drop NAN COLS

        SUB_train.set_index('Time', inplace=True)
        SUB_train.head()

        # --- Load TRAINING image data from both cameras ---
        cam = '1'
        image_train = './dataset/Sensor + Image' + '/' + '{}_image_1_train.npy'.format(sub_)
        name_train = './dataset/Sensor + Image' + '/' + '{}_name_1_train.npy'.format(sub_)
        label_train = './dataset/Sensor + Image' + '/' + '{}_label_1_train.npy'.format(sub_)

        img_1_train = np.load(image_train)
        label_1_train = np.load(label_train)
        name_1_train = np.load(name_train)

        cam = '2'
        image_train = './dataset/Sensor + Image' + '/' + '{}_image_2_train.npy'.format(sub_)
        name_train = './dataset/Sensor + Image' + '/' + '{}_name_2_train.npy'.format(sub_)
        label_train = './dataset/Sensor + Image' + '/' + '{}_label_2_train.npy'.format(sub_)

        img_2_train = np.load(image_train)
        label_2_train = np.load(label_train)
        name_2_train = np.load(name_train)

        # --- Align the training data by removing samples not present in the cleaned CSV ---
        redundant_1 = list(set(name_1_train) - set(times_train))
        redundant_2 = list(set(name_2_train) - set(times_train))
        
        ind = np.arange(0, len(img_1_train))

        red_in1 = ind[np.isin(name_1_train, redundant_1)]
        name_1_train = np.delete(name_1_train, red_in1)
        img_1_train = np.delete(img_1_train, red_in1, axis=0)
        label_1_train = np.delete(label_1_train, red_in1)

        red_in2 = ind[np.isin(name_2_train, redundant_2)]
        name_2_train = np.delete(name_2_train, red_in2)
        img_2_train = np.delete(img_2_train, red_in2, axis=0)
        label_2_train = np.delete(label_2_train, red_in2)
        
        # --- Prepare the final aligned training data ---
        data_train = SUB_train.loc[name_1_train].values

        set_seed()
        X_csv_train, y_csv_train = data_train[:, :-1], data_train[:, -1]
        
        # Remap label 20 to 0 for consistency
        y_csv_train = np.where(y_csv_train == 20, 0, y_csv_train)
        label_1_train = np.where(label_1_train == 20, 0, label_1_train)
        label_2_train = np.where(label_2_train == 20, 0, label_2_train)

        # One-hot encode the labels for PyTorch
        Y_csv_train = torch.nn.functional.one_hot(torch.from_numpy(y_csv_train).long(), 12).float()
        Y_train_1 = torch.nn.functional.one_hot(torch.from_numpy(label_1_train).long(), 12).float()
        Y_train_2 = torch.nn.functional.one_hot(torch.from_numpy(label_2_train).long(), 12).float()

        

        X_train_1 = img_1_train
        y_train_1 = label_1_train
        
        X_train_2 = img_2_train
        y_train_2 = label_2_train

        # Reshape images to (samples, height, width, channels)
        shape1, shape2 = 32, 32
        X_train_1 = X_train_1.reshape(X_train_1.shape[0], shape1, shape2, 1)
        X_train_2 = X_train_2.reshape(X_train_2.shape[0], shape1, shape2, 1)

        # Scale image pixel values to be between 0 and 1
        X_train_1_scaled = X_train_1 / 255.0
        X_train_2_scaled = X_train_2 / 255.0

        # --- Load and clean TEST sensor data (CSV) ---
        SUB_test = pd.read_csv('./dataset/Sensor + Image/{}_sensor_test.csv'.format(sub_), skiprows=1)
        SUB_test.head()
        
        SUB_test.isnull().sum()
        NA_cols = SUB_test.columns[SUB_test.isnull().any()]
        SUB_test.dropna(inplace=True)
        SUB_test.drop_duplicates(inplace=True)

        times_test = SUB_test['Time']
        SUB_test.drop(list_DROP, axis=1, inplace=True)
        SUB_test.drop(NA_cols, axis=1, inplace=True)

        SUB_test.set_index('Time', inplace=True)
        SUB_test.head()

        # --- Load TEST image data from both cameras ---
        image_test = './dataset/Sensor + Image' + '/' + '{}_image_1_test.npy'.format(sub_)
        name_test = './dataset/Sensor + Image' + '/' + '{}_name_1_test.npy'.format(sub_)
        label_test = './dataset/Sensor + Image' + '/' + '{}_label_1_test.npy'.format(sub_)
        img_1_test = np.load(image_test)
        label_1_test = np.load(label_test)
        name_1_test = np.load(name_test)

        image_test = './dataset/Sensor + Image' + '/' + '{}_image_2_test.npy'.format(sub_)
        name_test = './dataset/Sensor + Image' + '/' + '{}_name_2_test.npy'.format(sub_)
        label_test = './dataset/Sensor + Image' + '/' + '{}_label_2_test.npy'.format(sub_)
        img_2_test = np.load(image_test)
        label_2_test = np.load(label_test)
        name_2_test = np.load(name_test)

        # --- Align the test data ---
        redundant_1 = list(set(name_1_test) - set(times_test))
        redundant_2 = list(set(name_2_test) - set(times_test))
        
        ind = np.arange(0, len(img_1_test))

        red_in1 = ind[np.isin(name_1_test, redundant_1)]
        name_1_test = np.delete(name_1_test, red_in1)
        img_1_test = np.delete(img_1_test, red_in1, axis=0)
        label_1_test = np.delete(label_1_test, red_in1)

        red_in2 = ind[np.isin(name_2_test, redundant_2)]
        name_2_test = np.delete(name_2_test, red_in2)
        img_2_test = np.delete(img_2_test, red_in2, axis=0)
        label_2_test = np.delete(label_2_test, red_in2)

        # --- Prepare the final aligned test data ---
        data_test = SUB_test.loc[name_1_test].values

        set_seed()
        X_csv_test, y_csv_test = data_test[:, :-1], data_test[:, -1]
        y_csv_test = np.where(y_csv_test == 20, 0, y_csv_test)
        label_1_test = np.where(label_1_test == 20, 0, label_1_test)
        label_2_test = np.where(label_2_test == 20, 0, label_2_test)

        Y_csv_test = torch.nn.functional.one_hot(torch.from_numpy(y_csv_test).long(), 12).float()
        X_csv_train_scaled, X_csv_test_scaled = scale_data(X_csv_train, X_csv_test)

        X_test_1 = img_1_test
        y_test_1 = label_1_test
        Y_test_1 = torch.nn.functional.one_hot(torch.from_numpy(y_test_1).long(), 12).float()

        X_test_2 = img_2_test
        y_test_2 = label_2_test
        Y_test_2 = torch.nn.functional.one_hot(torch.from_numpy(y_test_2).long(), 12).float()

        X_test_1 = X_test_1.reshape(X_test_1.shape[0], shape1, shape2, 1)
        X_test_2 = X_test_2.reshape(X_test_2.shape[0], shape1, shape2, 1)

        X_test_1_scaled = X_test_1 / 255.0
        X_test_2_scaled = X_test_2 / 255.0

        # --- Store all processed data for the current client ---
        X_train_csv_scaled_splits[clint_index] = X_csv_train_scaled
        X_test_csv_scaled_splits[clint_index] = X_csv_test_scaled
        Y_train_csv_splits[clint_index] = Y_csv_train
        Y_test_csv_splits[clint_index] = Y_csv_test
        X_train_1_scaled_splits[clint_index] = X_train_1_scaled
        X_test_1_scaled_splits[clint_index] = X_test_1_scaled
        Y_train_1_splits[clint_index] = Y_train_1
        Y_test_1_splits[clint_index] = Y_test_1
        X_train_2_scaled_splits[clint_index] = X_train_2_scaled # This line had a bug in the original code
        X_test_2_scaled_splits[clint_index] = X_test_2_scaled
        Y_train_2_splits[clint_index] = Y_train_2
        Y_test_2_splits[clint_index] = Y_test_2
        clint_index += 1
        
    # --- After loop, return all dictionaries ---
    return X_train_csv_scaled_splits,X_test_csv_scaled_splits, Y_train_csv_splits,Y_test_csv_splits,X_train_1_scaled_splits,X_test_1_scaled_splits,Y_train_1_splits,Y_test_1_splits,X_train_2_scaled_splits,X_test_2_scaled_splits,Y_train_2_splits,Y_test_2_splits

## Step 2: Client Selection

We're making great progress. We've handled all the data loading and preparation. Now, we'll add the functions that form the "intelligence" of our federated learning system: client selection.

Instead of blindly averaging updates from every client in every round, these methods evaluate each client's performance and contribution. This allows the server to select the most promising or reliable clients to participate in the global model update, potentially leading to faster convergence and a more robust final model.

We'll add a series of functions, each calculating a specific metric to judge the clients.

### Client Evaluation Metrics
Add all the following functions to your script. Each one calculates a different score based on a client's performance.

1. Relative Loss Reduction (RF_loss)

This measures how much a client's training loss has dropped from the beginning to the end of a local training round, relative to the client with the biggest drop. A higher score means the client is learning effectively.

2. Relative Training Accuracy (RF_ACC_Train)

This measures a client's local training accuracy relative to the client with the highest accuracy. It's a straightforward measure of performance on local data.

3. Global Validation Accuracy (RF_ACC_Global)

This is a more sophisticated metric. It rewards clients for high accuracy on a global test set but penalizes them if their global accuracy is much worse than their local training accuracy (which is a sign of overfitting).

4. Loss Outliers (P_loss)

This function flags clients that are potential negative contributors. If a client's final training loss is significantly higher than the average loss of all clients, it gets a high penalty score. Otherwise, its penalty is zero.

5. Performance Bias (P_bias)

This metric calculates the gap between a client's performance on its own validation data versus its performance on the global validation data. A large gap might indicate that the client's local data is not representative of the overall data distribution.

##### Replace above Metric Helper Functions by below updated version that use disctionaries instead of Lists

In [20]:
# --- Replace ALL old metric helpers with these dictionary-based versions ---

def calculate_relative_loss_reduction_as_list(client_losses):
    """Calculates RF_loss. Returns a DICTIONARY {cid: score}."""
    loss_reductions = {}
    for cid, losses in client_losses.items():
        if losses and len(losses) >= 2:
            loss_reductions[cid] = losses[0] - losses[-1]

    if not loss_reductions: return {cid: 0.0 for cid in client_losses.keys()}
    max_loss_reduction = max(loss_reductions.values())
    if max_loss_reduction == 0: return {cid: 0.0 for cid in client_losses.keys()}
    
    return {cid: loss_reductions.get(cid, 0.0) / max_loss_reduction for cid in client_losses.keys()}

def calculate_relative_train_accuracy(client_acc):
    """Calculates RF_ACC_Train. Returns a DICTIONARY {cid: score}."""
    if not client_acc: return {}
    max_acc = max(client_acc.values())
    if max_acc == 0: return {cid: 0.0 for cid in client_acc.keys()}
    return {cid: acc / max_acc for cid, acc in client_acc.items()}

def calculate_global_validation_accuracy(train_acc, global_acc):
    """Calculates RF_ACC_Global. Returns a DICTIONARY {cid: score}."""
    if not train_acc or not global_acc: return {}
    max_global_acc = max(global_acc.values()) if global_acc else 0
    if max_global_acc == 0: max_global_acc = 1.0

    global_train_diff = {cid: global_acc.get(cid, 0) - train_acc.get(cid, 0) for cid in train_acc.keys()}
    max_global_train_diff = max(global_train_diff.values()) if global_train_diff else 0
    if max_global_train_diff == 0: max_global_train_diff = 1.0
    
    return {cid: (global_acc.get(cid, 0) / max_global_acc) - (diff / max_global_train_diff) for cid, diff in global_train_diff.items()}

def calculate_loss_outliers(client_losses, lambda_loss=1.5):
    """Calculates P_loss. Returns a DICTIONARY {cid: score}."""
    final_losses = {cid: losses[-1] for cid, losses in client_losses.items() if losses}
    if not final_losses: return {cid: 0.0 for cid in client_losses.keys()}

    loss_values = np.array(list(final_losses.values()))
    mean_loss, std_loss = np.mean(loss_values), np.std(loss_values)
    threshold = mean_loss + lambda_loss * std_loss
    max_loss = np.max(loss_values)
    if max_loss == 0: return {cid: 0.0 for cid in client_losses.keys()}
    
    all_client_scores = {}
    for cid in client_losses.keys():
        final_loss = final_losses.get(cid, 0.0)
        score = final_loss / max_loss if final_loss > threshold else 0.0
        all_client_scores[cid] = score
    return all_client_scores

def calculate_performance_bias(val_acc, global_acc):
    """Calculates P_bias. Returns a DICTIONARY {cid: score}."""
    if not val_acc: return {}
    
    bias_dict = {}
    for cid, val in val_acc.items():
        global_val = global_acc.get(cid, 0.0)
        max_val = max(val, global_val)
        bias = 0.0 if max_val == 0 else abs(val - global_val) / max_val
        bias_dict[cid] = bias
    return bias_dict

Excellent. Now that we have the functions to score each client, we need the final step: the algorithms that use these scores to select which clients will participate in a given round.

### Client Selection Algorithms
1. Pareto Optimization

This is a powerful technique used when you have multiple, often conflicting, objectives. Instead of combining all metrics into one score, it tries to find a set of clients that represent the best possible trade-offs.

A client is considered "Pareto optimal" if no other client is better than it across all metrics. The algorithm first finds this set of optimal clients.

If there are more optimal clients than needed, it selects a random subset.

If there are fewer, it fills the remaining spots by picking the clients with the best-combined performance score.

In [21]:
def pareto_optimization(rf_loss, rf_acc_train, rf_acc_val, rf_acc_global, p_loss, p_bias, client_num, client_ids):
    """Implements Pareto optimization to select clients."""
    # Convert metric dicts to a numpy array, ensuring a consistent order via client_ids
    data_points = [
        np.array([rf_loss.get(cid, 0), rf_acc_train.get(cid, 0), rf_acc_val.get(cid, 0),
                  rf_acc_global.get(cid, 0), -p_loss.get(cid, 0), -p_bias.get(cid, 0)])
        for cid in client_ids
    ]
    data = np.array(data_points)

    def is_dominated(point, others):
        """Checks if a point is dominated by any other point in the set."""
        return any(np.all(other >= point) and np.any(other > point) for other in others)

    pareto_indices = [i for i, point in enumerate(data) if not is_dominated(point, np.delete(data, i, axis=0))]
    # Map indices back to the original string client IDs
    pareto_cids = [client_ids[i] for i in pareto_indices]

    if len(pareto_cids) >= client_num:
        return random.sample(pareto_cids, client_num)
    
    # CORRECTED: Use a dictionary comprehension with string CIDs
    pareto_scores = {cid: 0.4 * rf_loss.get(cid, 0) + 0.6 * rf_acc_global.get(cid, 0) for cid in client_ids}
    
    # Sort the original client IDs based on their scores
    sorted_cids = sorted(client_ids, key=lambda cid: pareto_scores.get(cid, 0), reverse=True)
    
    selected_clients = set(pareto_cids)
    for cid in sorted_cids:
        if len(selected_clients) >= client_num:
            break
        selected_clients.add(cid)
        
    # CORRECTED: Return a list of strings
    return list(selected_clients)

2. Weighted Sum Method (5RF)

This is a more straightforward approach. It calculates a single comprehensive score for each client by taking a weighted sum of all the metrics. Clients with the highest final scores are selected. The weights (0.2, 0.1, 0.3, etc.) determine the importance of each metric.

## Step 2: The AI's Brain (The Model Definition)
We have the data pipeline and the client selection logic. Now it's time to build the brain of the operation: the neural network model itself.

The model, ModelCSVIMG, is a multi-modal neural network. This means it's designed to accept and process multiple types of data at once. It has three distinct input branches:

One for the numerical sensor (CSV) data.

One for the images from camera 1.

One for the images from camera 2.

The features extracted from each branch are then combined (fused) and passed to a final set of layers that perform the classification. The original code contains a few versions of the architecture; we will use the final, most complex one.

Add the complete model class to your script:

In [22]:
class ModelCSVIMG(nn.Module):
    def __init__(self, num_csv_features, img_shape1, img_shape2):
        super(ModelCSVIMG, self).__init__()

        # --- Branch 1: For processing numerical CSV data ---
        self.csv_fc_1 = nn.Linear(num_csv_features, 2000)
        self.csv_bn_1 = nn.BatchNorm1d(2000)
        self.csv_fc_2 = nn.Linear(2000, 600)
        self.csv_bn_2 = nn.BatchNorm1d(600)
        self.csv_dropout = nn.Dropout(0.2)

        # --- Branch 2: For processing images from Camera 1 (CNN) ---
        self.img1_conv_1 = nn.Conv2d(in_channels=1, out_channels=18, kernel_size=3, stride=1, padding=1)
        self.img1_batch_norm = nn.BatchNorm2d(18)
        self.img1_pool = nn.MaxPool2d(kernel_size=2, stride=2)
        # Flattened features from the CNN go into a fully connected layer
        self.img1_fc1 = nn.Linear(18 * 16 * 16, 100)
        self.img1_dropout = nn.Dropout(0.2)

        # --- Branch 3: For processing images from Camera 2 (identical to Branch 2) ---
        self.img2_conv = nn.Conv2d(in_channels=1, out_channels=18, kernel_size=3, stride=1, padding=1)
        self.img2_batch_norm = nn.BatchNorm2d(18)
        self.img2_pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.img2_fc1 = nn.Linear(18 * 16 * 16, 100)
        self.img2_dropout = nn.Dropout(0.2)

        # --- Fusion and Final Classification Layers ---
        # The input size is 600 (from CSV) + 100 (from Image 1) + 100 (from Image 2) = 800
        self.fc1 = nn.Linear(800, 1200)
        self.dr1 = nn.Dropout(0.2)
        # A residual connection is used here: input to fc2 is the original 800 + output of fc1 (1200) = 2000
        self.fc2 = nn.Linear(2000, 12) # 12 output classes

    def forward(self, x_csv, x_img1, x_img2):
        # --- Process CSV data ---
        x_csv = F.relu(self.csv_bn_1(self.csv_fc_1(x_csv)))
        x_csv = F.relu(self.csv_bn_2(self.csv_fc_2(x_csv)))
        x_csv = self.csv_dropout(x_csv)

        # --- Process Image 1 data ---
        # Reshape image from (batch, height, width, channels) to (batch, channels, height, width)
        x_img1 = x_img1.permute(0, 3, 1, 2)
        x_img1 = F.relu(self.img1_conv_1(x_img1))
        x_img1 = self.img1_batch_norm(x_img1)
        x_img1 = self.img1_pool(x_img1)
        x_img1 = x_img1.contiguous().view(x_img1.size(0), -1) # Flatten
        x_img1 = F.relu(self.img1_fc1(x_img1))
        x_img1 = self.img1_dropout(x_img1)

        # --- Process Image 2 data ---
        x_img2 = x_img2.permute(0, 3, 1, 2)
        x_img2 = F.relu(self.img2_conv(x_img2))
        x_img2 = self.img2_batch_norm(x_img2)
        x_img2 = self.img2_pool(x_img2)
        x_img2 = x_img2.contiguous().view(x_img2.size(0), -1) # Flatten
        x_img2 = F.relu(self.img2_fc1(x_img2))
        x_img2 = self.img2_dropout(x_img2)

        # --- Fusion ---
        x = torch.cat((x_csv, x_img1, x_img2), dim=1)
        residual = x # Keep a copy for the residual connection
        
        # --- Final layers ---
        x = F.relu(self.fc1(x))
        x = self.dr1(x)
        # Concatenate the residual connection
        x = torch.cat((residual, x), dim=1)
        # Final output with softmax for classification
        x = F.softmax(self.fc2(x), dim=1)

        return x

## Step 3: Define Training and Testing Functions
Instead of methods inside a Client class, we'll create standalone train and test functions. This makes the code cleaner. This logic is taken directly from your train_one_epoch and validate functions.

In [23]:
def train(net, trainloader, epochs):
    """Train the model and return the list of losses."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=0.001)
    net.train()
    losses = []
    for _ in range(epochs):
        epoch_loss = 0.0
        for batch in trainloader:
            data1, data2, data3, target = batch
            data1, data2, data3, target = data1.to(DEVICE).float(), data2.to(DEVICE).float(), data3.to(DEVICE).float(), target.to(DEVICE).float()
            optimizer.zero_grad()
            loss = criterion(net(data1, data2, data3), target)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        losses.append(epoch_loss / len(trainloader))
    return losses # Return the history of losses

def test(net, testloader):
    """Validate the model on the test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for batch in tqdm(testloader, "Testing"):
            data1, data2, data3, target = batch
            data1, data2, data3, target = data1.to(DEVICE).float(), data2.to(DEVICE).float(), data3.to(DEVICE).float(), target.to(DEVICE).float()
            outputs = net(data1, data2, data3)
            loss += criterion(outputs, target).item()
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == torch.max(target, 1)[1]).sum().item()
    accuracy = correct / total
    return loss / len(testloader), accuracy

## Step 4: Create the Flower Client
This is where Flower really comes in. We'll replace your custom Client class with a FlowerClient that inherits from Flower's NumPyClient. This class tells Flower how each client should handle parameters received from the server and how to perform local training and evaluation.

In [24]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, cid, net, trainloader, valloader):
        self.cid = cid
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        return [val.cpu().numpy() for _, val in self.net.state_dict().items()]

    def set_parameters(self, parameters):
        params_dict = zip(self.net.state_dict().keys(), parameters)
        state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
        self.net.load_state_dict(state_dict, strict=True)

    def fit(self, parameters, config):
        # NEW: Print when a client starts training
        print(f"✅ [Client {self.cid}] Starting training for round {config['server_round']}...")

        self.set_parameters(parameters)
        
        # Train and get the list of losses
        losses = train(self.net, self.trainloader, epochs=3)
        
        # Also get the local validation accuracy
        _, val_acc = test(self.net, self.valloader)

        # NEW: Announce completion of training
        print(f"✅ [Client {self.cid}] Finished training.")
        
        # Return parameters, dataset size, and our custom metrics dictionary
        return self.get_parameters(config={}), len(self.trainloader.dataset), {
            "train_losses": losses, 
            "val_acc": val_acc
        }

    def evaluate(self, parameters, config):
        # NEW: Print when a client starts evaluation
        print(f"[Client {self.cid}] Starting evaluation...")

        self.set_parameters(parameters)
        loss, accuracy = test(self.net, self.valloader)
        
        # NEW: Print the client's evaluation results
        print(f"✅ [Client {self.cid}] Evaluation results - Loss: {loss:.4f}, Accuracy: {accuracy:.4f}")

        return float(loss), len(self.valloader.dataset), {"accuracy": float(accuracy)}

## Step 5: Create the ParetoStrategy
Now for the main event. We create a new strategy class that collects all the client metrics and uses them to run your Pareto selection logic.

In [25]:
class ParetoStrategy(fl.server.strategy.FedAvg):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client_fit_metrics = {}
        self.client_eval_metrics = {}

        # NEW: Define a function to create the config for clients
        def on_fit_config_fn(server_round: int):
            """Return a configuration dictionary for the clients."""
            config = {"server_round": server_round}
            return config
        
        # NEW: Assign this function to the strategy
        self.on_fit_config_fn = on_fit_config_fn

    def aggregate_fit(self, server_round, results, failures):
        for _, fit_res in results:
            if fit_res.metrics:
                self.client_fit_metrics[fit_res.client.cid] = fit_res.metrics
        return super().aggregate_fit(server_round, results, failures)

    def aggregate_evaluate(self, server_round, results, failures):
        # NEW: Announce that the server has received evaluation results
        print(f"✅ --- Server received {len(results)} evaluation results in round {server_round} ---")
        for client, eval_res in results:
            self.client_eval_metrics[client.cid] = {"accuracy": eval_res.metrics["accuracy"]}
        return super().aggregate_evaluate(server_round, results, failures)

    def configure_fit(self, server_round, parameters, client_manager):
        if server_round == 1:
            print("--- Round 1: Selecting clients randomly ---")
            return super().configure_fit(server_round, parameters, client_manager)

        all_cids = list(client_manager.clients.keys())
        train_losses = {cid: self.client_fit_metrics.get(cid, {}).get("train_losses", []) for cid in all_cids}
        local_val_acc = {cid: self.client_fit_metrics.get(cid, {}).get("val_acc", 0.0) for cid in all_cids}
        local_train_acc_for_rf = local_val_acc
        global_eval_acc = {cid: self.client_eval_metrics.get(cid, {}).get("accuracy", 0.0) for cid in all_cids}
        
        rf_loss = calculate_relative_loss_reduction_as_list(train_losses)
        rf_acc_train = calculate_relative_train_accuracy(local_train_acc_for_rf)
        rf_acc_val = calculate_relative_train_accuracy(local_val_acc)
        rf_acc_global = calculate_global_validation_accuracy(local_train_acc_for_rf, global_eval_acc)
        p_loss = calculate_loss_outliers(train_losses)
        p_bias = calculate_performance_bias(local_val_acc, global_eval_acc)
        
        num_to_select = int(self.fraction_fit * len(all_cids))
        selected_cids = pareto_optimization(
            rf_loss, rf_acc_train, rf_acc_val, rf_acc_global, p_loss, p_bias,
            num_to_select, all_cids
        )
        
        # NEW: Announce which clients were selected by the strategy
        print(f"✅ --- Pareto selection for round {server_round} chose clients: {selected_cids} ---")

        clients = [client_manager.clients[cid] for cid in selected_cids]
        config = self.on_fit_config_fn(server_round)
        return [(client, config) for client in clients]

## Step 5: The main Function - Bringing It All Together
Finally, we rewrite the main function. This is where we will:

Load all the client data just once.

Define a client_fn. Flower uses this function to create clients on demand for the simulation.

Define a server-side evaluation function (get_evaluate_fn) so the server can test the global model's performance on a held-out test set after each round.

Configure and start the Flower simulation.

In [None]:
def main():
    # --- Load Data ---
    print("✅ Loading data for all clients...")
    all_data = loadClientsData()
    X_train_csv, X_test_csv, Y_train_csv, Y_test_csv, \
    X_train_1, X_test_1, Y_train_1, Y_test_1, \
    X_train_2, X_test_2, Y_train_2, Y_test_2 = all_data
    TOTAL_CLIENTS = len(X_train_csv)
    print(f"✅ Data for {TOTAL_CLIENTS} clients loaded successfully.")

    # --- Client Factory ---
    # In your main function

    def client_fn(cid: str) -> fl.client.Client: # Return type is now fl.client.Client
        client_id = int(cid)
        net = ModelCSVIMG(num_csv_features=X_train_csv[client_id].shape[1], img_shape1=32, img_shape2=32).to(DEVICE)
        train_dataset = CustomDatasetRes(X_train_csv[client_id], X_train_1[client_id], X_train_2[client_id], Y_train_csv[client_id])
        val_dataset = CustomDatasetRes(X_test_csv[client_id], X_test_1[client_id], X_test_2[client_id], Y_test_csv[client_id])
        trainloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        valloader = DataLoader(val_dataset, batch_size=32)

        # Instantiate your NumPyClient
        numpy_client = FlowerClient(client_id, net, trainloader, valloader)
        
        # NEW: Convert it to a Flower Client and return
        return numpy_client.to_client()
        
    # --- Prepare the data for the server's evaluation function ---
    # Create a tuple containing all the test set arrays for every client
    server_test_data = (X_test_csv, X_test_1, X_test_2, Y_test_csv)
    
    # --- Server-side Evaluation (optional but good practice) ---
    # In your main function, where you define get_evaluate_fn

    def get_evaluate_fn(test_data_splits):
        """Return an evaluation function for server-side evaluation."""
        def evaluate(server_round: int, parameters: fl.common.NDArrays, config: dict):
            net = ModelCSVIMG(num_csv_features=test_data_splits[0][0].shape[1], img_shape1=32, img_shape2=32).to(DEVICE)

            server_test_cid = TOTAL_CLIENTS - 1
            params_dict = zip(net.state_dict().keys(), parameters)
            state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
            net.load_state_dict(state_dict, strict=True)

            test_dataset = CustomDatasetRes(test_data_splits[0][server_test_cid], test_data_splits[1][server_test_cid],
                                          test_data_splits[2][server_test_cid], test_data_splits[3][server_test_cid])
            testloader = DataLoader(test_dataset, batch_size=32)
            loss, accuracy = test(net, testloader)
            
            # NEW: More prominent end-of-round summary
            print("="*60)
            print(f"✅ ROUND {server_round} SUMMARY - Global Model Performance:")
            print(f"   Loss: {loss:.4f} | Accuracy: {accuracy:.4f}")
            print("="*60)
            
            return loss, {"accuracy": accuracy}
        return evaluate

    # --- Instantiate and use the ParetoStrategy ---
    # In your main function
    strategy = ParetoStrategy(
        fraction_fit=0.5,
        min_fit_clients=4,
        fraction_evaluate=0.0,      # <-- CORRECTED
        min_evaluate_clients=0,     # <-- CORRECTED
        min_available_clients=TOTAL_CLIENTS,
        evaluate_fn=get_evaluate_fn(server_test_data),
    )

    # --- Start Simulation ---
    print("✅ Starting Flower simulation with Pareto Strategy...")
    
    # 1. CAPTURE the History object returned by the simulation
    history = fl.simulation.start_simulation(
        client_fn=client_fn,
        num_clients=TOTAL_CLIENTS,
        config=fl.server.ServerConfig(num_rounds=10),
        strategy=strategy,
        client_resources={"num_cpus": 4, "num_gpus": 0.25 if torch.cuda.is_available() else 0},
    )


    # --- NEW: Process, Print, and Save the Final Results ---
    print("\n" + "="*60)
    print("              SIMULATION COMPLETE")
    print("="*60)

    # 2. EXTRACT loss and accuracy from the History object
    # The history object contains metrics from the server-side evaluation (centralized)
    try:
        # Get accuracy values from history
        rounds, accuracies = zip(*history.metrics_centralized["accuracy"])
        # Get loss values from history
        _, losses = zip(*history.losses_centralized)

        # 3. CREATE a pandas DataFrame
        summary_df = pd.DataFrame({
            "Round": rounds,
            "Loss": losses,
            "Accuracy": accuracies
        })

        # 4. PRINT the summary table
        print("\n📈 Global Model Performance Summary:")
        print(summary_df.to_string(index=False))

        # 5. SAVE the summary to a CSV file
        csv_filename = "simulation_summary.csv"
        summary_df.to_csv(csv_filename, index=False)
        print(f"\n✅ Summary successfully saved to: {csv_filename}")

    except Exception as e:
        print(f"\n⚠️ Could not generate summary. No centralized metrics found. Error: {e}")
    


if __name__ == "__main__":
    # Make sure to copy all the helper functions and classes defined above
    main()

✅ Loading data for all clients...


	Instead, use the `flwr run` CLI command to start a local simulation in your Flower app, as shown for example below:

		$ flwr new  # Create a new Flower app from a template

		$ flwr run  # Run the Flower app in Simulation Mode

	Using `start_simulation()` is deprecated.

            This is a deprecated feature. It will be removed
            entirely in future versions of Flower.
        
[92mINFO [0m:      Starting Flower simulation, config: num_rounds=10, no round_timeout


✅ Data for 8 clients loaded successfully.
✅ Starting Flower simulation with Pareto Strategy...


2025-08-22 08:37:01,608	INFO worker.py:1771 -- Started a local Ray instance.
[92mINFO [0m:      Flower VCE: Ray initialized with resources: {'accelerator_type:G': 1.0, 'node:__internal_head__': 1.0, 'node:172.30.170.62': 1.0, 'CPU': 16.0, 'memory': 11178012672.0, 'object_store_memory': 5589006336.0, 'GPU': 1.0}
[92mINFO [0m:      Optimize your simulation with Flower VCE: https://flower.ai/docs/framework/how-to-run-simulations.html
[92mINFO [0m:      Flower VCE: Resources for each Virtual Client: {'num_cpus': 4, 'num_gpus': 0.0}
[92mINFO [0m:      Flower VCE: Creating VirtualClientEngineActorPool with 4 actors
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Requesting initial parameters from one random client
[36m(ClientAppActor pid=5531)[0m 
[36m(ClientAppActor pid=5531)[0m             This is a deprecated feature. It will be removed
[36m(ClientAppActor pid=5531)[0m             entirely in future versions of Flower.
[36m(ClientAppActor pid=5531)[0m         
[36m(Clien

[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffff40c52f704fc6455677250b3d01000000 Worker ID: 805b87486425c7aecfb81408c09757cfc9dc5cfddd160440a45eacee Node ID: 8d2d541bdd230be5861c1ea07ed7e6934890728bcf11694ee4d5d2a8 Worker IP address: 172.30.170.62 Worker port: 39003 Worker PID: 5531 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.


RuntimeError: Simulation crashed.