### Import necessary libraries and modules

In [1]:
# %%bash

# kaggle datasets download -d nageshsingh/dna-sequence-dataset
# mkdir data data/DNA
# unzip dna-sequence-dataset.zip -d data/DNA
# rm dna-sequence-dataset.zip

In [2]:
import os
import pickle
import time
from collections import OrderedDict
from typing import (
    List, Tuple, Dict, Optional, Callable, Union
)
import tenseal as ts

import numpy as np
import torchvision
import torch
from torch import nn
import torch.nn.functional as F
import flwr as fl
from flwr.common import (
    Metrics, EvaluateIns, EvaluateRes, FitIns, FitRes, MetricsAggregationFn, 
    Scalar, logger, ndarrays_to_parameters_custom, parameters_to_ndarrays_custom,
    Parameters, NDArrays
)
from flwr.server.client_proxy import ClientProxy
from flwr.server.client_manager import ClientManager
from flwr.server.strategy.aggregate import weighted_loss_avg
from logging import WARNING

from utils import *

os.environ['TOKENIZERS_PARALLELISM'] = 'false'

### Creation of FHE Keys

In [3]:
def combo_keys(client_path="secret.pkl", server_path="server_key.pkl"):
    """
    To create the public/private keys combination
    args:
        client_path: path to save the secret key (str)
        server_path: path to save the server public key (str)
    """
    context_client = security.context()
    security.write_query(client_path, {"contexte": context_client.serialize(save_secret_key=True)})
    security.write_query(server_path, {"contexte": context_client.serialize()})

    _, context_client = security.read_query(client_path)
    _, context_server = security.read_query(server_path)

    context_client = ts.context_from(context_client)
    context_server = ts.context_from(context_server)
    print("Is the client context private?", ("Yes" if context_client.is_private() else "No"))
    print("Is the server context private?", ("Yes" if context_server.is_private() else "No"))


secret_path = "secret.pkl"
public_path = "server_key.pkl"
if os.path.exists(secret_path):
    print("it exists")
    _, context_client = security.read_query(secret_path)

else:
    combo_keys(client_path=secret_path, server_path=public_path)

it exists


### Model Architecture Creation

In [4]:
class Net(nn.Module):
    """
    A better CNN model with improvements such as dropout and batch normalization.

    Args:
        num_classes: An integer indicating the number of classes in the dataset.
    """
    def __init__(self, num_classes=10) -> None:
        super(Net, self).__init__()
        self.fc1 = nn.Linear(384, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 128)
        self.fc4 = nn.Linear(128, num_classes)        

        # Dropout layers
        self.dropout = nn.Dropout(p=0.5)

        # Weight initialization
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                nn.init.zeros_(m.bias)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass of the neural network
        """
        x = F.relu(self.fc1(x))
        x = self.dropout(x)

        x = F.relu(self.fc2(x))
        x = self.dropout(x)

        x = F.relu(self.fc3(x))
        x = self.dropout(x)

        x = self.fc4(x)
        return x

### Define the FlowerClient class for federated learning

In [5]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, cid, net, trainloader, valloader, device, batch_size, save_results, matrix_path, roc_path,
                 yaml_path, he, classes, context_client):
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader
        self.cid = cid
        self.device = device
        self.batch_size = batch_size
        self.save_results = save_results
        self.matrix_path = matrix_path
        self.roc_path = roc_path
        self.yaml_path = yaml_path
        self.he = he
        self.classes = classes
        self.context_client = context_client

    def get_parameters(self, config):
        print(f"[Client {self.cid}] get_parameters")
        return get_parameters2(self.net, self.context_client)

    def fit(self, parameters, config):
        server_round = config['server_round']
        local_epochs = config['local_epochs']
        lr = float(config["learning_rate"])

        print(f'[Client {self.cid}, round {server_round}] fit, config: {config}')

        set_parameters(self.net, parameters, self.context_client)

        criterion = torch.nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(self.net.parameters(), lr=lr, momentum=0.9)

        results = engine.train(self.net, self.trainloader, self.valloader, optimizer=optimizer, loss_fn=criterion,
                               epochs=local_epochs, device=self.device)

        if self.save_results:
            save_graphs(self.save_results, local_epochs, results, f"_Client {self.cid}")

        return get_parameters2(self.net, self.context_client), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.cid}] evaluate, config: {config}")
        set_parameters(self.net, parameters, self.context_client)

        loss, accuracy, y_pred, y_true, y_proba = engine.test(self.net, self.valloader,
                                                              loss_fn=torch.nn.CrossEntropyLoss(), device=self.device)

        if self.save_results:
            os.makedirs(self.save_results, exist_ok=True)
            if self.matrix_path:
                save_matrix(y_true, y_pred, self.save_results + self.matrix_path, self.classes)
            if self.roc_path:
                save_roc(y_true, y_proba, self.save_results + self.roc_path, len(self.classes))

        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}

### Define the client_common function to set up the Flower client

In [6]:
def client_common(cid, model_save, path_yaml, path_roc, results_save, path_matrix,
                  batch_size, trainloaders, valloaders, DEVICE, CLASSES,
                  he=False, secret_path="", server_path=""):
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]

    context_client = None
    net = Net(num_classes=len(CLASSES)).to(DEVICE)

    if he:
        print("Run with homomorphic encryption")
        if os.path.exists(secret_path):
            with open(secret_path, 'rb') as f:
                query = pickle.load(f)
            context_client = ts.context_from(query["contexte"])
        else:
            context_client = security.context()
            with open(secret_path, 'wb') as f:
                encode = pickle.dumps({"contexte": context_client.serialize(save_secret_key=True)})
                f.write(encode)
        secret_key = context_client.secret_key()
    else:
        print("Run WITHOUT homomorphic encryption")

    if os.path.exists(model_save):
        print(" To get the checkpoint")
        checkpoint = torch.load(model_save, map_location=DEVICE)['model_state_dict']
        if he:
            print("to decrypt model")
            server_query, server_context = security.read_query(server_path)
            server_context = ts.context_from(server_context)
            for name in checkpoint:
                print(name)
                checkpoint[name] = torch.tensor(
                    security.deserialized_layer(name, server_query[name], server_context).decrypt(secret_key)
                )
        net.load_state_dict(checkpoint)

    return FlowerClient(cid, net, trainloader, valloader, device=DEVICE, batch_size=batch_size,
                        matrix_path=path_matrix, roc_path=path_roc, save_results=results_save, yaml_path=path_yaml,
                        he=he, context_client=context_client, classes=CLASSES)

### Define utility functions for federated learning

In [7]:
def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]
    return {"accuracy": sum(accuracies) / sum(examples)}

def evaluate2(server_round: int, parameters: NDArrays,
              config: Dict[str, Scalar]) -> Optional[Tuple[float, Dict[str, Scalar]]]:
    set_parameters(central, parameters)
    loss, accuracy, y_pred, y_true, y_proba = engine.test(central, testloader, loss_fn=torch.nn.CrossEntropyLoss(),
                                                          device=DEVICE)
    print(f"Server-side evaluation loss {loss} / accuracy {accuracy}")
    return loss, {"accuracy": accuracy}

def get_on_fit_config_fn(epoch=2, lr=0.001, batch_size=32) -> Callable[[int], Dict[str, str]]:
    def fit_config(server_round: int) -> Dict[str, str]:
        config = {
            "learning_rate": str(lr),
            "batch_size": str(batch_size),
            "server_round": server_round,
            "local_epochs": epoch
        }
        return config
    return fit_config

def aggreg_fit_checkpoint(server_round, aggregated_parameters, central_model, path_checkpoint,
                          context_client=None, server_path=""):
    if aggregated_parameters is not None:
        print(f"Saving round {server_round} aggregated_parameters...")
        aggregated_ndarrays: List[np.ndarray] = parameters_to_ndarrays_custom(aggregated_parameters, context_client)
        if context_client:
            def serialized(key, matrix):
                if key == 'fcevaluate23.weight':
                    return matrix.serialize()
                else:
                    return matrix
            server_response = {
                **{key: serialized(key, aggregated_ndarrays[i]) for i, key in enumerate(central_model.state_dict().keys())},
                "contexte": server_context.serialize()
            }
            security.write_query(server_path, server_response)
        else:
            params_dict = zip(central_model.state_dict().keys(), aggregated_ndarrays)
            state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
            central_model.load_state_dict(state_dict, strict=True)
            if path_checkpoint:
                torch.save({
                    'model_state_dict': central_model.state_dict(),
                }, path_checkpoint)

### Define the FedCustom strategy class

In [8]:
# A Strategy from scratch with the same sampling of the clients as it is in FedAvg
# and then change the configuration dictionary
class FedCustom(fl.server.strategy.Strategy):
    def __init__(
            self,
            fraction_fit: float = 1.0,
            fraction_evaluate: float = 1.0,
            min_fit_clients: int = 2,
            min_evaluate_clients: int = 2,
            min_available_clients: int = 2,
            evaluate_fn: Optional[
                    Callable[[int, NDArrays, Dict[str, Scalar]], Optional[Tuple[float, Dict[str, Scalar]]]]
                ] = None,
            on_fit_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
            on_evaluate_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
            accept_failures: bool = True,
            initial_parameters: Optional[Parameters] = None,
            fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
            evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
            context_client=None
    ) -> None:
        super().__init__()
        self.fraction_fit = fraction_fit
        self.fraction_evaluate = fraction_evaluate
        self.min_fit_clients = min_fit_clients
        self.min_evaluate_clients = min_evaluate_clients
        self.min_available_clients = min_available_clients
        self.evaluate_fn = evaluate_fn
        self.on_fit_config_fn = on_fit_config_fn
        self.on_evaluate_config_fn = on_evaluate_config_fn,
        self.accept_failures = accept_failures
        self.initial_parameters = initial_parameters
        self.fit_metrics_aggregation_fn = fit_metrics_aggregation_fn
        self.evaluate_metrics_aggregation_fn = evaluate_metrics_aggregation_fn
        self.context_client = context_client

    def __repr__(self) -> str:
        # Same function as FedAvg(Strategy)
        return f"FedCustom (accept_failures={self.accept_failures})"

    def initialize_parameters(
        self, client_manager: ClientManager
    ) -> Optional[Parameters]:
        """Initialize global model parameters."""
        # Same function as FedAvg(Strategy)
        initial_parameters = self.initial_parameters
        self.initial_parameters = None  # Don't keep initial parameters in memory
        return initial_parameters

    def num_fit_clients(self, num_available_clients: int) -> Tuple[int, int]:
        """Return sample size and required number of clients."""
        # Same function as FedAvg(Strategy)
        num_clients = int(num_available_clients * self.fraction_fit)
        return max(num_clients, self.min_fit_clients), self.min_available_clients

    def configure_fit(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, FitIns]]:
        """Configure the next round of training."""
        # Sample clients
        sample_size, min_num_clients = self.num_fit_clients(
            client_manager.num_available()
        )

        clients = client_manager.sample(
            num_clients=sample_size, min_num_clients=min_num_clients
        )
        # Create custom configs
        n_clients = len(clients)
        half_clients = n_clients // 2
        # Custom fit config function provided
        standard_lr = lr
        higher_lr = 0.003
        config = {"server_round": server_round, "local_epochs": 1}
        if self.on_fit_config_fn is not None:
            # Custom fit config function provided
            config = self.on_fit_config_fn(server_round)

        # fit_ins = FitIns(parameters, config)
        # Return client/config pairs
        fit_configurations = []
        for idx, client in enumerate(clients):
            config["learning_rate"] = standard_lr if idx < half_clients else higher_lr
            """
            Each pair of (ClientProxy, FitRes) constitutes 
            a successful update from one of the previously selected clients.
            """
            fit_configurations.append(
                (
                    client,
                    FitIns(
                        parameters,
                        config
                    )
                )
            )
        # Successful updates from the previously selected and configured clients
        return fit_configurations

    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        """Aggregate fit results using weighted average. (each round)"""
        # Same function as FedAvg(Strategy)
        if not results:
            return None, {}

        # Do not aggregate if there are failures and failures are not accepted
        if not self.accept_failures and failures:
            return None, {}

        # Convert results parameters --> array matrix
        weights_results = [
            (parameters_to_ndarrays_custom(fit_res.parameters, self.context_client), fit_res.num_examples)
            for _, fit_res in results
        ]

        # Aggregate parameters using weighted average between the clients and convert back to parameters object (bytes)
        parameters_aggregated = ndarrays_to_parameters_custom(aggregate_custom(weights_results))

        metrics_aggregated = {}
        # Aggregate custom metrics if aggregation fn was provided
        if self.fit_metrics_aggregation_fn:
            fit_metrics = [(res.num_examples, res.metrics) for _, res in results]
            metrics_aggregated = self.fit_metrics_aggregation_fn(fit_metrics)

        elif server_round == 1:  # Only log this warning once
            logger.log(WARNING, "No fit_metrics_aggregation_fn provided")

        # Same function as SaveModelStrategy(fl.server.strategy.FedAvg)
        """Aggregate model weights using weighted average and store checkpoint"""
        aggreg_fit_checkpoint(server_round, parameters_aggregated, central, model_save,
                              self.context_client, path_crypted)
        return parameters_aggregated, metrics_aggregated

    def num_evaluation_clients(self, num_available_clients: int) -> Tuple[int, int]:
        """Use a fraction of available clients for evaluation."""
        # Same function as FedAvg(Strategy)
        num_clients = int(num_available_clients * self.fraction_evaluate)
        return max(num_clients, self.min_evaluate_clients), self.min_available_clients

    def configure_evaluate(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, EvaluateIns]]:
        """Configure the next round of evaluation."""
        # Same function as FedAvg(Strategy)
        # Do not configure federated evaluation if fraction eval is 0.
        if self.fraction_evaluate == 0.0:
            return []

        # Parameters and config
        config = {}  # {"server_round": server_round, "local_epochs": 1}

        evaluate_ins = EvaluateIns(parameters, config)

        # Sample clients
        sample_size, min_num_clients = self.num_evaluation_clients(
            client_manager.num_available()
        )

        clients = client_manager.sample(
            num_clients=sample_size, min_num_clients=min_num_clients
        )

        # Return client/config pairs
        # Each pair of (ClientProxy, FitRes) constitutes a successful update from one of the previously selected clients
        return [(client, evaluate_ins) for client in clients]

    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        """Aggregate evaluation losses using weighted average."""
        # Same function as FedAvg(Strategy)
        if not results:
            return None, {}

        # Do not aggregate if there are failures and failures are not accepted
        if not self.accept_failures and failures:
            return None, {}

        # Aggregate loss
        loss_aggregated = weighted_loss_avg(
            [
                (evaluate_res.num_examples, evaluate_res.loss)
                for _, evaluate_res in results
            ]
        )

        metrics_aggregated = {}
        # Aggregate custom metrics if aggregation fn was provided
        if self.evaluate_metrics_aggregation_fn:
            eval_metrics = [(res.num_examples, res.metrics) for _, res in results]
            metrics_aggregated = self.evaluate_metrics_aggregation_fn(eval_metrics)

        # Only log this warning once
        elif server_round == 1:
            logger.log(WARNING, "No evaluate_metrics_aggregation_fn provided")

        return loss_aggregated, metrics_aggregated

    def evaluate(
        self, server_round: int, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        """Evaluate global model parameters using an evaluation function."""
        # Same function as FedAvg(Strategy)
        if self.evaluate_fn is None:
            # Let's assume we won't perform the global model evaluation on the server side.
            return None

        # if we have a global model evaluation on the server side :
        parameters_ndarrays = parameters_to_ndarrays_custom(parameters, self.context_client)
        eval_res = self.evaluate_fn(server_round, parameters_ndarrays, {})

        # if you haven't results
        if eval_res is None:
            return None

        loss, metrics = eval_res
        return loss, metrics

### Set up the federated learning strategy

In [9]:
# Set up your variables directly
he = True
data_path = 'data/'
dataset = 'DNA'
yaml_path = './results/FL/results.yml'
seed = 42
num_workers = 0
max_epochs = 5
batch_size = 32
length = None
split = 10
device = 'cpu'
number_clients = 10
save_results = 'results/FL/'
matrix_path = 'confusion_matrix.png'
roc_path = 'roc.png'
model_save = 'DNA_fl.pt'
min_fit_clients = 10
min_avail_clients = 10
min_eval_clients = 10
rounds = 2
frac_fit = 1.0
frac_eval = 0.5
lr = 1e-4
path_public_key = 'server_key.pkl'

In [10]:
print("get public key : ", path_public_key)
_, server_context = security.read_query(path_public_key)
server_context = ts.context_from(server_context)
DEVICE = torch.device(choice_device(device))
CLASSES = classes_string(dataset)
central = Net(num_classes=len(CLASSES)).to(DEVICE)

get public key :  server_key.pkl


In [11]:
strategy = FedCustom(
    fraction_fit=frac_fit,
    fraction_evaluate=frac_eval,
    min_fit_clients=min_fit_clients,
    min_evaluate_clients=min_eval_clients if min_eval_clients else number_clients // 2,
    min_available_clients=min_avail_clients,
    evaluate_metrics_aggregation_fn=weighted_average,
    initial_parameters=ndarrays_to_parameters_custom(get_parameters2(central)),
    evaluate_fn=None if he else evaluate2,
    on_fit_config_fn=get_on_fit_config_fn(epoch=max_epochs, batch_size=batch_size),
    context_client=server_context
)

In [12]:
trainloaders, valloaders, testloader = data_setup.load_datasets(num_clients=number_clients,
                                                                batch_size=batch_size,
                                                                resize=None,
                                                                seed=seed,
                                                                num_workers=num_workers,
                                                                splitter=10,
                                                                dataset=dataset,  # Use the specified dataset
                                                                data_path=data_path,
                                                                data_path_val=None)  # Use the same path for validation data

def client_fn(cid: str) -> FlowerClient:
    return client_common(cid,
                         model_save, path_yaml, path_roc, results_save, path_matrix,
                         batch_size, trainloaders, valloaders, DEVICE, CLASSES, he, secret_path, server_path)

DNA
The training set is created for the classes: ('0', '1', '2', '3', '4', '5', '6')


### Define the client_fn function and set up the simulation

In [13]:
import warnings
warnings.simplefilter("ignore")

print("flwr", fl.__version__)
print("numpy", np.__version__)
print("torch", torch.__version__)
print("torchvision", torchvision.__version__)
print(f"Training on {DEVICE}")

client_resources = None

if DEVICE.type == "cuda":
    client_resources = {"num_gpus": 1}

model_save = model_save
path_yaml = yaml_path
path_roc = roc_path
results_save = save_results
path_matrix = matrix_path
batch_size = batch_size
he = he
secret_path = 'secret.pkl'
server_path = 'secret.pkl'
path_crypted = 'server.pkl'

print("Start simulation")
start_simulation = time.time()
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=number_clients,
    config=fl.server.ServerConfig(num_rounds=rounds),
    strategy=strategy,
    client_resources=client_resources
)
print(f"Simulation Time = {time.time() - start_simulation} seconds")

INFO flwr 2024-07-13 18:50:04,647 | app.py:145 | Starting Flower simulation, config: ServerConfig(num_rounds=2, round_timeout=None)


flwr 1.5.0
numpy 1.26.4
torch 2.3.1+cpu
torchvision 0.18.1+cpu
Training on cpu
Start simulation


2024-07-13 18:50:07,764	INFO worker.py:1788 -- Started a local Ray instance.
INFO flwr 2024-07-13 18:50:09,277 | app.py:179 | Flower VCE: Ray initialized with resources: {'memory': 2904573543.0, 'node:__internal_head__': 1.0, 'node:172.16.5.4': 1.0, 'object_store_memory': 1452286771.0, 'CPU': 2.0}
INFO flwr 2024-07-13 18:50:09,279 | server.py:89 | Initializing global parameters
INFO flwr 2024-07-13 18:50:09,280 | server.py:272 | Using initial parameters provided by strategy
INFO flwr 2024-07-13 18:50:09,282 | server.py:91 | Evaluating initial parameters
INFO flwr 2024-07-13 18:50:09,283 | server.py:104 | FL starting
DEBUG flwr 2024-07-13 18:50:09,284 | server.py:222 | fit_round 1: strategy sampled 10 clients (out of 10)


[36m(launch_and_fit pid=32401)[0m Run with homomorphic encryption


  0%|[34m          [0m| 0/5 [00:00<?, ?it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 33.91it/s]


[36m(launch_and_fit pid=32401)[0m [Client 3, round 1] fit, config: {'learning_rate': 0.003, 'batch_size': '32', 'server_round': 1, 'local_epochs': 5}
[36m(launch_and_fit pid=32401)[0m Updated model
[36m(launch_and_fit pid=32401)[0m 	Train Epoch: 1 	Train_loss: 2.5850 | Train_acc: 19.6644 % | Validation_loss: 1.8358 | Validation_acc: 42.7083 %
[36m(launch_and_fit pid=32401)[0m 	Train Epoch: 2 	Train_loss: 2.1106 | Train_acc: 22.0255 % | Validation_loss: 1.7954 | Validation_acc: 47.3958 %
[36m(launch_and_fit pid=32401)[0m 	Train Epoch: 3 	Train_loss: 1.9249 | Train_acc: 20.6597 % | Validation_loss: 1.8173 | Validation_acc: 30.7292 %
[36m(launch_and_fit pid=32401)[0m 	Train Epoch: 4 	Train_loss: 1.9555 | Train_acc: 22.5347 % | Validation_loss: 1.8188 | Validation_acc: 30.7292 %


 80%|[34m████████  [0m| 4/5 [00:00<00:00, 31.69it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00, 14.50it/s]


[36m(launch_and_fit pid=32401)[0m save graph in  results/FL/


100%|[34m██████████[0m| 5/5 [00:00<00:00, 13.50it/s]


[36m(launch_and_fit pid=32401)[0m fc1.weight 4.5299530029296875e-06
[36m(launch_and_fit pid=32401)[0m fc1.bias 1.430511474609375e-06
[36m(launch_and_fit pid=32401)[0m fc2.weight 9.5367431640625e-07
[36m(launch_and_fit pid=32401)[0m fc2.bias 1.1920928955078125e-06
[36m(launch_and_fit pid=32400)[0m fc1.weight 4.5299530029296875e-06
[36m(launch_and_fit pid=32400)[0m fc1.bias 1.430511474609375e-06
[36m(launch_and_fit pid=32400)[0m fc2.weight 7.152557373046875e-07
[36m(launch_and_fit pid=32400)[0m fc2.bias 7.152557373046875e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: a92f4f1acde0e93f0c481f64bf268d43ac0c6e9601000000 Worker ID: 5c8a9c18b9fec4d95eb00550ede896eea77ad82e693cab8dc7fa4bbf Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Worker port: 37273 Worker PID: 32401 Worker exit type: SYSTEM_ERR

  0%|[34m          [0m| 0/5 [00:00<?, ?it/s][32m [repeated 2x across cluster][0m
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.15it/s]
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.33it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 10.77it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.50it/s]


[36m(launch_and_fit pid=32675)[0m 	Train Epoch: 1 	Train_loss: 2.4810 | Train_acc: 24.1551 % | Validation_loss: 1.9560 | Validation_acc: 26.0417 %
[36m(launch_and_fit pid=32675)[0m 	Train Epoch: 2 	Train_loss: 2.0112 | Train_acc: 24.0162 % | Validation_loss: 1.9028 | Validation_acc: 26.0417 %


100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.72it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  8.03it/s]


[36m(launch_and_fit pid=32675)[0m save graph in  results/FL/
[36m(launch_and_fit pid=32723)[0m fc1.weight 5.245208740234375e-06
[36m(launch_and_fit pid=32723)[0m fc1.bias 1.9073486328125e-06
[36m(launch_and_fit pid=32723)[0m fc2.weight 1.6689300537109375e-06
[36m(launch_and_fit pid=32723)[0m fc2.bias 1.1920928955078125e-06
[36m(launch_and_fit pid=32675)[0m fc1.weight 5.245208740234375e-06
[36m(launch_and_fit pid=32675)[0m fc1.bias 1.1920928955078125e-06
[36m(launch_and_fit pid=32675)[0m fc2.weight 1.1920928955078125e-06
[36m(launch_and_fit pid=32675)[0m fc2.bias 7.152557373046875e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 3bef8b7312bb4eea265943724db7351dd71c29c501000000 Worker ID: 8b5cd1df4d7fd85c3bf212b336ed51b1ad2f7e5031bfe4cf23912ad5 Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 W

  0%|[34m          [0m| 0/5 [00:00<?, ?it/s][32m [repeated 2x across cluster][0m


[36m(launch_and_fit pid=33097)[0m 	Train Epoch: 1 	Train_loss: 2.4638 | Train_acc: 19.6644 % | Validation_loss: 1.9441 | Validation_acc: 30.7292 %


 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.12it/s]
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.08it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 10.91it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.22it/s]


[36m(launch_and_fit pid=33142)[0m save graph in  results/FL/


100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.63it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  6.94it/s]


[36m(launch_and_fit pid=33097)[0m fc1.weight 4.291534423828125e-06
[36m(launch_and_fit pid=33097)[0m fc1.bias 1.6689300537109375e-06
[36m(launch_and_fit pid=33097)[0m fc2.weight 1.1920928955078125e-06
[36m(launch_and_fit pid=33097)[0m fc2.bias 1.1920928955078125e-06
[36m(launch_and_fit pid=33142)[0m fc1.weight 5.245208740234375e-06
[36m(launch_and_fit pid=33142)[0m fc1.bias 1.430511474609375e-06
[36m(launch_and_fit pid=33142)[0m fc2.weight 9.5367431640625e-07
[36m(launch_and_fit pid=33142)[0m fc2.bias 7.152557373046875e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 88407a376942323196c8b89163e7d51daaf910ea01000000 Worker ID: 79555a4db2f94c321aa7caad754dd92538579c60656eda99310050f0 Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Worker port: 38549 Worker PID: 33097 Worker exit type: SYSTEM_ERR

  0%|[34m          [0m| 0/5 [00:00<?, ?it/s][32m [repeated 2x across cluster][0m


[36m(launch_and_fit pid=33503)[0m 	Train Epoch: 1 	Train_loss: 2.4289 | Train_acc: 19.4676 % | Validation_loss: 1.8035 | Validation_acc: 22.9167 %


 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.27it/s]
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.27it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.15it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.45it/s]


[36m(launch_and_fit pid=33503)[0m save graph in  results/FL/


100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.75it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.92it/s]


[36m(launch_and_fit pid=33503)[0m fc1.weight 4.76837158203125e-06
[36m(launch_and_fit pid=33503)[0m fc1.bias 1.9073486328125e-06
[36m(launch_and_fit pid=33503)[0m fc2.weight 1.6689300537109375e-06
[36m(launch_and_fit pid=33503)[0m fc2.bias 9.5367431640625e-07
[36m(launch_and_fit pid=33535)[0m fc1.weight 4.76837158203125e-06
[36m(launch_and_fit pid=33535)[0m fc1.bias 1.430511474609375e-06
[36m(launch_and_fit pid=33535)[0m fc2.weight 9.5367431640625e-07
[36m(launch_and_fit pid=33535)[0m fc2.bias 7.152557373046875e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 9c007ce2bf9e404d8bac99130974395c8e1e8f1d01000000 Worker ID: fd1d0dfe28a72762cda52fc3efc5ce1330ddcc592e4dee2b3619d467 Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Worker port: 43163 Worker PID: 33535 Worker exit type: SYSTEM_ERROR Worke

  0%|[34m          [0m| 0/5 [00:00<?, ?it/s][32m [repeated 2x across cluster][0m


[36m(launch_and_fit pid=33899)[0m 	Train Epoch: 1 	Train_loss: 2.3987 | Train_acc: 19.2940 % | Validation_loss: 1.9716 | Validation_acc: 17.1875 %
[36m(launch_and_fit pid=33899)[0m 	Train Epoch: 2 	Train_loss: 2.0555 | Train_acc: 19.3519 % | Validation_loss: 1.9162 | Validation_acc: 17.1875 %
[36m(launch_and_fit pid=33899)[0m 	Train Epoch: 3 	Train_loss: 2.0071 | Train_acc: 23.2755 % | Validation_loss: 1.8826 | Validation_acc: 18.7500 %


 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.03it/s]
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.03it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 10.89it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.01it/s]


[36m(launch_and_fit pid=33899)[0m save graph in  results/FL/


100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.63it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.69it/s]


[36m(launch_and_fit pid=33899)[0m fc1.weight 4.76837158203125e-06
[36m(launch_and_fit pid=33899)[0m fc1.bias 9.5367431640625e-07
[36m(launch_and_fit pid=33899)[0m fc2.weight 7.152557373046875e-07
[36m(launch_and_fit pid=33899)[0m fc2.bias 7.152557373046875e-07
[36m(launch_and_fit pid=33956)[0m fc1.weight 4.291534423828125e-06
[36m(launch_and_fit pid=33956)[0m fc1.bias 1.430511474609375e-06
[36m(launch_and_fit pid=33956)[0m fc2.weight 9.5367431640625e-07
[36m(launch_and_fit pid=33956)[0m fc2.bias 7.152557373046875e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 6c852a68c4c1cd194eb3ba4c3577a8aa009af25701000000 Worker ID: 9689d7e39c748fa6594bbcc7ba7da6d74ea6d97f9fceec74ae814f0c Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Worker port: 37141 Worker PID: 33956 Worker exit type: SYSTEM_ERROR Wor

  0%|[34m          [0m| 0/5 [00:00<?, ?it/s][32m [repeated 2x across cluster][0m


[36m(launch_and_fit pid=34323)[0m 	Train Epoch: 1 	Train_loss: 2.3625 | Train_acc: 19.2940 % | Validation_loss: 2.0211 | Validation_acc: 30.7292 %


 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.13it/s]
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.11it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.25it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 10.92it/s]


[36m(launch_and_fit pid=34323)[0m save graph in  results/FL/


100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.79it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.47it/s]


[36m(launch_and_fit pid=34323)[0m fc1.weight 5.245208740234375e-06
[36m(launch_and_fit pid=34323)[0m fc1.bias 1.430511474609375e-06
[36m(launch_and_fit pid=34323)[0m fc2.weight 7.152557373046875e-07
[36m(launch_and_fit pid=34323)[0m fc2.bias 9.5367431640625e-07
[36m(launch_and_fit pid=34366)[0m fc1.weight 4.5299530029296875e-06
[36m(launch_and_fit pid=34366)[0m fc1.bias 1.430511474609375e-06
[36m(launch_and_fit pid=34366)[0m fc2.weight 9.5367431640625e-07
[36m(launch_and_fit pid=34366)[0m fc2.bias 4.76837158203125e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: d2f85a08e4f386e4b1ce7e6593c364ad9503ce0101000000 Worker ID: 08c84fc6790ff6994ba858547000a4c077042b0c4cdf27d8d3f1dd0e Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Worker port: 38365 Worker PID: 34366 Worker exit type: SYSTEM_ERROR Wo

  0%|[34m          [0m| 0/5 [00:00<?, ?it/s][32m [repeated 2x across cluster][0m


[36m(launch_and_fit pid=34762)[0m 	Train Epoch: 1 	Train_loss: 2.5403 | Train_acc: 19.3519 % | Validation_loss: 1.8006 | Validation_acc: 24.4792 %


 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.32it/s]
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.19it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.70it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.10it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.61it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  8.00it/s]


[36m(launch_and_fit pid=34715)[0m save graph in  results/FL/
[36m(launch_and_fit pid=34715)[0m fc1.weight 4.5299530029296875e-06
[36m(launch_and_fit pid=34715)[0m fc1.bias 1.6689300537109375e-06
[36m(launch_and_fit pid=34715)[0m fc2.weight 1.1920928955078125e-06
[36m(launch_and_fit pid=34715)[0m fc2.bias 1.1920928955078125e-06
[36m(launch_and_fit pid=34762)[0m fc1.weight 4.76837158203125e-06
[36m(launch_and_fit pid=34762)[0m fc1.bias 9.5367431640625e-07
[36m(launch_and_fit pid=34762)[0m fc2.weight 7.152557373046875e-07
[36m(launch_and_fit pid=34762)[0m fc2.bias 7.152557373046875e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 55b91ed9e2eecf17df30994b7e7826dbc6b05cb901000000 Worker ID: c0ef3db70cb387ccab829fd6c266b0b4f1372efaa263e2cc1dc4cc35 Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Wo

  0%|[34m          [0m| 0/5 [00:00<?, ?it/s][32m [repeated 2x across cluster][0m


[36m(launch_and_fit pid=35105)[0m 	Train Epoch: 1 	Train_loss: 2.4522 | Train_acc: 18.4144 % | Validation_loss: 1.9414 | Validation_acc: 24.4792 %
[36m(launch_and_fit pid=35105)[0m 	Train Epoch: 2 	Train_loss: 2.0147 | Train_acc: 24.6412 % | Validation_loss: 1.8830 | Validation_acc: 26.0417 %


 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.26it/s]
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.29it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.20it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 11.41it/s]


[36m(launch_and_fit pid=35105)[0m save graph in  results/FL/


100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.85it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.89it/s]


[36m(launch_and_fit pid=35105)[0m fc1.weight 4.76837158203125e-06
[36m(launch_and_fit pid=35105)[0m fc1.bias 1.430511474609375e-06
[36m(launch_and_fit pid=35105)[0m fc2.weight 7.152557373046875e-07
[36m(launch_and_fit pid=35105)[0m fc2.bias 9.5367431640625e-07
[36m(launch_and_fit pid=35148)[0m fc1.weight 4.5299530029296875e-06
[36m(launch_and_fit pid=35148)[0m fc1.bias 1.1920928955078125e-06
[36m(launch_and_fit pid=35148)[0m fc2.weight 9.5367431640625e-07
[36m(launch_and_fit pid=35148)[0m fc2.bias 7.152557373046875e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: c7d2e4a3e80cc9d7ca88a1d59031a6cefe6e1dd501000000 Worker ID: cde7d96e3fc2db62b07467bec4e608464354d89e50d434c17116f73a Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Worker port: 43145 Worker PID: 35148 Worker exit type: SYSTEM_ERROR W

  0%|[34m          [0m| 0/5 [00:00<?, ?it/s][32m [repeated 2x across cluster][0m
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.17it/s]
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.12it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 10.31it/s]
 60%|[34m██████    [0m| 3/5 [00:00<00:00,  7.76it/s]


[36m(launch_and_fit pid=35480)[0m 	Train Epoch: 1 	Train_loss: 2.5013 | Train_acc: 17.3611 % | Validation_loss: 1.7410 | Validation_acc: 32.2917 %
[36m(launch_and_fit pid=35480)[0m 	Train Epoch: 2 	Train_loss: 2.0126 | Train_acc: 21.5394 % | Validation_loss: 1.7621 | Validation_acc: 32.2917 %


100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.03it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  6.95it/s]


[36m(launch_and_fit pid=35480)[0m save graph in  results/FL/
[36m(launch_and_fit pid=35480)[0m fc1.weight 5.245208740234375e-06
[36m(launch_and_fit pid=35480)[0m fc1.bias 1.430511474609375e-06
[36m(launch_and_fit pid=35480)[0m fc2.weight 9.5367431640625e-07
[36m(launch_and_fit pid=35480)[0m fc2.bias 1.430511474609375e-06
[36m(launch_and_fit pid=35520)[0m fc1.weight 4.76837158203125e-06
[36m(launch_and_fit pid=35520)[0m fc1.bias 1.1920928955078125e-06
[36m(launch_and_fit pid=35520)[0m fc2.weight 9.5367431640625e-07
[36m(launch_and_fit pid=35520)[0m fc2.bias 7.152557373046875e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 5bf4e19e1be99c941635e9212f37f424faf561de01000000 Worker ID: a20a3ef34759a0565e3a7b9bc181f9ed042a579b876516e87ffb6caf Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Worker 

  0%|[34m          [0m| 0/5 [00:00<?, ?it/s][32m [repeated 2x across cluster][0m


[36m(launch_and_fit pid=35848)[0m [Client 1, round 1] fit, config: {'learning_rate': 0.003, 'batch_size': '32', 'server_round': 1, 'local_epochs': 5}
[36m(launch_and_fit pid=35848)[0m Updated model


 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.23it/s]
 20%|[34m██        [0m| 1/5 [00:00<00:01,  3.23it/s]


[36m(launch_and_fit pid=35848)[0m 	Train Epoch: 1 	Train_loss: 2.5104 | Train_acc: 15.6597 % | Validation_loss: 1.7874 | Validation_acc: 24.4792 %


 80%|[34m████████  [0m| 4/5 [00:00<00:00, 10.91it/s]
 80%|[34m████████  [0m| 4/5 [00:00<00:00, 10.89it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.65it/s]
100%|[34m██████████[0m| 5/5 [00:00<00:00,  7.66it/s]


[36m(launch_and_fit pid=35848)[0m save graph in  results/FL/
[36m(launch_and_fit pid=35848)[0m fc1.weight 4.0531158447265625e-06
[36m(launch_and_fit pid=35848)[0m fc1.bias 1.1920928955078125e-06
[36m(launch_and_fit pid=35848)[0m fc2.weight 9.5367431640625e-07
[36m(launch_and_fit pid=35848)[0m fc2.bias 7.152557373046875e-07
[36m(launch_and_fit pid=35891)[0m fc1.weight 5.0067901611328125e-06
[36m(launch_and_fit pid=35891)[0m fc1.bias 1.6689300537109375e-06
[36m(launch_and_fit pid=35891)[0m fc2.weight 1.430511474609375e-06
[36m(launch_and_fit pid=35891)[0m fc2.bias 9.5367431640625e-07
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 832dfb995c2f864a411d6e7d7a0074c3d36e945e01000000 Worker ID: 545bed0f5be9df94fae7a6888b659d0c4d317ace3ece3e7c191846b2 Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Wor

ERROR flwr 2024-07-13 19:01:17,339 | ray_client_proxy.py:87 | The worker died unexpectedly while executing this task. Check python-core-worker-*.log files for more information.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 2c23213c61a86553534875621b1d7457490e5ee201000000 Worker ID: 52e51028c6a8c31eea5539d993c36f0297d889f54813051caa49bfe2 Node ID: e287c513c118cce8a943a6b5059545ce2ae2b16fcc6f91ed932fa4bb Worker IP address: 172.16.5.4 Worker port: 39653 Worker PID: 35891 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.


ERROR flwr 2024-07-13 19:01:32,368 | ray_client_proxy.py:87 | The worker died unexpectedly while executing this task. Check python-core-worker-*.log files for more information.
ERROR flwr 2024-07-13 19:02:04,787 | ray_client_proxy.py:87 | [36mray::launch_and_fit()[39m (pid=36261, ip=172.16.5.4)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: No module named 'utils.security'
traceback: Traceback (most recent call last):
  File "/workspaces/QFML-QF-2024/src/utils/__init__.py", line 1, in <module>
  File "/workspaces/QFML-QF-2024/src/utils/common.py", line 12, in <module>
ModuleNotFoundError: No module named 'utils.security'
ERROR flwr 2024-07-13 19:02:04,789 | ray_client_proxy.py:87 | [36mray::launch_and_fit()[39m (pid=36213, ip=172.16.5.4)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: No module named 'utils.security'
traceback: Traceback (

Simulation Time = 721.1223657131195 seconds


[36m(launch_and_evaluate pid=36213)[0m No module named 'utils'
[36m(launch_and_evaluate pid=36213)[0m Traceback (most recent call last):
[36m(launch_and_evaluate pid=36213)[0m   File "/opt/conda/envs/fed/lib/python3.10/site-packages/ray/_private/serialization.py", line 423, in deserialize_objects
[36m(launch_and_evaluate pid=36213)[0m     obj = self._deserialize_object(data, metadata, object_ref)
[36m(launch_and_evaluate pid=36213)[0m   File "/opt/conda/envs/fed/lib/python3.10/site-packages/ray/_private/serialization.py", line 280, in _deserialize_object
[36m(launch_and_evaluate pid=36213)[0m     return self._deserialize_msgpack_data(data, metadata_fields)
[36m(launch_and_evaluate pid=36213)[0m   File "/opt/conda/envs/fed/lib/python3.10/site-packages/ray/_private/serialization.py", line 235, in _deserialize_msgpack_data
[36m(launch_and_evaluate pid=36213)[0m     python_objects = self._deserialize_pickle5_data(pickle5_data)
[36m(launch_and_evaluate pid=36213)[0m   File