In [4]:
from collections import OrderedDict
from typing import List, Tuple, Dict, Optional, Callable, Union
from PIL import Image, ImageEnhance, ImageFilter
import matplotlib.pyplot as plt

import sys,os,os.path

import logging

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets
import torchvision.transforms as transforms
from datasets.utils.logging import disable_progress_bar
from torch.utils.data import DataLoader, TensorDataset, Subset, random_split
from sklearn.cluster import KMeans

from collections import OrderedDict

import flwr as fl
from flwr.client import Client, ClientApp, NumPyClient
from flwr.common import Metrics, Context
from flwr.server import ServerApp, ServerConfig, ServerAppComponents, ClientManager
from flwr.server.strategy import Strategy, FedAvg
from flwr.simulation import run_simulation
from flwr_datasets import FederatedDataset
from flwr.common import ndarrays_to_parameters, NDArrays, Scalar, Context
from flwr.common import FitRes, Parameters, parameters_to_ndarrays
from flwr.server.client_proxy import ClientProxy
from flwr.common.logger import set_logger_propagation

from enum import Enum

device = "cuda" if torch.cuda.is_available() else "cpu"
DEVICE = torch.device(device)  # Try "cuda" to train on GPU
print(f"Training on {DEVICE}")
print(f"Flower {fl.__version__} / PyTorch {torch.__version__}")
disable_progress_bar()

  from .autonotebook import tqdm as notebook_tqdm
2025-01-09 14:03:56,354	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


Training on cpu
Flower 1.14.0 / PyTorch 2.5.1+cpu


In [52]:
# Wrap the above ugly code of flower client into a class

# Helper Functions

class TaskType(Enum):

    CLASSFICATION = 0
    REGRESSION = 1


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

def train(net, trainloader, epochs: int, verbose=False, device = "cpu", task_type = TaskType.CLASSFICATION):
    """Train the network on the training set."""
    if task_type == TaskType.CLASSFICATION:    
        criterion = torch.nn.CrossEntropyLoss()
    elif task_type == TaskType.REGRESSION:
        criterion = nn.MSELoss(reduction='sum')
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    net.to(device)
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for images, labels in trainloader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = net(images)
            if task_type == TaskType.REGRESSION:
                outputs = outputs.squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss.item()
            total += labels.size(0)
            if task_type == TaskType.CLASSFICATION:
                correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        if verbose:
            if task_type == TaskType.CLASSFICATION:
                epoch_acc = correct / total
                print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")
            elif task_type == TaskType.REGRESSION:
                print(f"Epoch {epoch+1}: train loss {epoch_loss}")

def test(net, testloader, device = "cpu"):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    net.to(device)
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

def test_regression(net, testloader, device="cpu"):
    """Evaluate the regression model on the entire test set."""
    criterion = nn.MSELoss(reduction="sum")
    sum_of_squares, total_samples = 0.0, 0
    net.eval()
    net.to(device)
    with torch.no_grad():
        for x, y in testloader:
            x, y = x.to(device), y.to(device)
            target = y.view(-1)
            outputs = net(x).view(-1) # NOTE: You maight want to modify this part
            sum_of_squares += criterion(outputs, target).item()
            total_samples += len(y)

    if total_samples > 0:
        avg_mse = sum_of_squares / total_samples
    else:
        avg_mse = 0.0
    avg_loss = avg_mse

    # Note that to make sure the consistence,
    # we return mse twice to match {loss, accurancy as the test function}
    return avg_loss, avg_mse ** 0.5

# Custom Client Class
class FLClient(NumPyClient):
    """A Flower client that holds its own model and training data."""

    def __init__(
        self,
        net: nn.Module,
        trainloader: DataLoader,
        valloader: DataLoader,
        device: torch.device,
        client_id: int,
        epochs: int = 1,
        task_type: TaskType = TaskType.CLASSFICATION
    ):
        super().__init__()
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader
        self.device = device
        self.client_id = client_id
        self.epochs = epochs
        self.task_type = task_type

    def get_parameters(self, config: Dict[str, Scalar]) -> List[np.ndarray]:
        """Return the current local model parameters."""
        return get_parameters(self.net)

    def fit(
        self, parameters: List[np.ndarray], config: Dict[str, Scalar]
    ) -> Tuple[List[np.ndarray], int, Dict[str, Scalar]]:
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, device=self.device, epochs=self.epochs, verbose=False, task_type = self.task_type)
        new_params = get_parameters(self.net)
        # Return partition-id in the metrics
        # The simplest way to store the model
        return new_params, len(self.trainloader.dataset), {"partition-id": self.client_id}

    def evaluate(
        self, parameters: List[np.ndarray], config: Dict[str, Scalar]
    ) -> Tuple[float, int, Dict[str, Scalar]]:
        set_parameters(self.net, parameters)
        if self.task_type == TaskType.CLASSFICATION:
            loss, accuracy = test(self.net, self.valloader, self.device)
            print(f"[Client {self.client_id}] Evaluate -> Loss: {loss:.4f}, Accuracy: {accuracy:.4f}")
            return float(loss), len(self.valloader.dataset), {"accuracy": float(accuracy)}
        elif self.task_type == TaskType.REGRESSION:
            loss, mse = test_regression(self.net, self.valloader, self.device)
            print(f"[Client {self.client_id}] Evaluate -> Loss: {mse:.4f}")
            return float(loss), len(self.valloader.dataset), {"MSE": float(mse)}

# Custom Client for House Pricing Dataset

class HousePricingClient(fl.client.NumPyClient):
    def __init__(
        self,
        net: nn.Module,
        train_loader: DataLoader,
        val_loader: DataLoader,
        test_loader: DataLoader,
        device: torch.device,
        client_id: int,
        epochs: int = 1.
    ):
        super().__init__()
        self.net = net
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader
        self.device = device
        self.client_id = client_id
        self.epochs = epochs

    def get_parameters(self, config: Dict[str, Scalar]) -> List[np.ndarray]:
        return get_parameters(self.net)

    '''def set_parameters(self, parameters):
        state_dict = dict(zip(self.model.state_dict().keys(), parameters))
        self.model.load_state_dict({k: torch.tensor(v) for k, v in state_dict.items()})'''

    def fit(
        self, parameters: List[np.ndarray], config: Dict[str, Scalar]
    ) -> Tuple[List[np.ndarray], int, Dict[str, Scalar]]:
        set_parameters(self.net, parameters)
        train(self.net, self.train_loader, device=self.device, epochs=self.epochs, verbose=False)
        new_params = get_parameters(self.net)
        # Return partition-id in the metrics
        # The simplest way to store the model
        return new_params, len(self.train_loader.dataset), {"partition-id": self.client_id}
        

    def evaluate(
        self, parameters: List[np.ndarray], config: Dict[str, Scalar]
    ) -> Tuple[float, int, Dict[str, Scalar]]:
        set_parameters(self.net, parameters)
        loss, rmse = test_regression(self.net, self.val_loader, self.device)
        print(f"[Client {self.client_id}] Evaluate -> Loss: {loss:.4f}, RMSE: {rmse:.4f}")
        return float(loss), len(self.val_loader.dataset), {"RMSE": float(rmse)}


class DefaultStrategy(FedAvg):

    # A custom strategy to store all the parameters.
    # https://github.com/adap/flower/issues/487
    # https://flower.ai/docs/framework/how-to-save-and-load-model-checkpoints.html

    def __init__(self, model: type, total_round: int, only_last: bool = True, save_dir: str = "models", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.save_dir = save_dir
        os.makedirs(self.save_dir, exist_ok=True)
        self.model = model
        self.total_round = total_round
        self.only_last = only_last

    def aggregate_fit(
        self,
        server_round: int,
        results: list[tuple[ClientProxy, FitRes]],
        failures: list[Union[tuple[ClientProxy, FitRes], BaseException]],
    ) -> tuple[Optional[Parameters], dict[str, Scalar]]:
        """
        Aggregate model weights using weighted average.
        Also save each client's model and the global server model.
        """

        if self.only_last and server_round < self.total_round:
            return super().aggregate_fit(server_round, results, failures)

        # Call aggregate_fit from base class (FedAvg) to aggregate parameters and metrics
        aggregated_parameters, aggregated_metrics = super().aggregate_fit(
            server_round, results, failures
        )

        # For each client which returned FitRes, save the client model
        for (_, fit_res) in results:
            id_ = fit_res.metrics["partition-id"]

            client_parameters: Optional[Parameters] = fit_res.parameters
            if client_parameters is not None:
                net = self.model()
                print(f"[Round {server_round}] Saving model for client {id_}...")

                # Convert `Parameters` to `list[np.ndarray]`
                client_ndarrays : list[np.ndarray] = parameters_to_ndarrays(
                    client_parameters
                )

                # Convert `list[np.ndarray]` to PyTorch `state_dict`
                params_dict = zip(net.state_dict().keys(), client_ndarrays)
                state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
                net.load_state_dict(state_dict, strict=True)

                # Save the model to disk
                torch.save(net.state_dict(), f"{self.save_dir}/client-{server_round}-{id_}.pth")

        # If `aggregated_parameters` is not None, update the global net and save it
        if aggregated_parameters is not None:
            net = self.model()
            print(f"Saving round {server_round} aggregated_parameters...")

            # Convert `Parameters` to `list[np.ndarray]`
            aggregated_ndarrays: list[np.ndarray] = parameters_to_ndarrays(
                aggregated_parameters
            )

            # Convert `list[np.ndarray]` to PyTorch `state_dict`
            params_dict = zip(net.state_dict().keys(), aggregated_ndarrays)
            state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
            net.load_state_dict(state_dict, strict=True)

            # Save the model to disk
            torch.save(net.state_dict(), f"{self.save_dir}/server-{server_round}.pth")

        return aggregated_parameters, aggregated_metrics

In [29]:
# Use the following class to run the experiment

# You need to provide the following information:
# 1. The Network class (dont instantiate it)
#       (assume we use the same network for all clients and server)
# 2. The list of data loaders for each client,
#       where loaders is a list of loader tuples (train, val, test)
#       i.e. loaders = [ (train_loader_0, val_loader_0, test_loader_0), ... ]
#       NOTE: In fit and evaluate, we ONLY use the train_loader and val_loader,
#             But we ask you to pyt them together for simplicity for any future test use.
#       NOTE: we assume the number of clients == number of data loaders
# 3. Number of clients

# See next block for an example of how to use this class

# Set up logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
set_logger_propagation(logger, True)

class FLExperiment:
    """
    A federated learning experiment interface class.

    NOTE: For each client, we now expect a tuple of three DataLoaders:
    (train_loader, val_loader, test_loader).
    """

    def __init__(
        self,
        model_cls: type,
        client_loaders: List[Tuple[DataLoader, DataLoader, DataLoader]],
        num_clients: int,
        device: torch.device = torch.device("cpu"),
        local_epochs: int = 1,
        num_rounds: int = 5,
        task_type: TaskType = TaskType.REGRESSION,
        # strategy: Optional[Strategy] = None, # Is not supported yet. and may not be needed
    ):
        """
        Args:
            model_cls (type): A PyTorch nn.Module class (not an instance).
                We'll instantiate `model_cls()` for each client and server.
            client_loaders (List[(DataLoader, DataLoader, DataLoader)]):
                A list of (train_loader, val_loader, test_loader) for each client.
            num_clients (int): Number of clients to simulate.
            device (torch.device): CPU or GPU device.
            local_epochs (int): Local epochs on each client per round.
            num_rounds (int): How many global training rounds.
            strategy (Optional[Strategy]): Use a custom Flower strategy or fallback to default FedAvg.
        """
        logger.info("Initializing FLExperiment")
        if len(client_loaders) != num_clients:
            raise ValueError(
                f"Number of client loader tuples ({len(client_loaders)}) does not match "
                f"the number of clients ({num_clients})."
            )

        self.model_cls = model_cls
        self.client_loaders = client_loaders
        self.num_clients = num_clients
        self.local_epochs = local_epochs
        self.num_rounds = num_rounds
        self.device = device
        self.task_type = task_type

        # Store final trained models
        self._client_models: List[Optional[nn.Module]] = [None] * self.num_clients
        self._server_model: Optional[nn.Module] = None

        # Create one model per client (instantiate model_cls)
        self.client_nets = [self.model_cls().to(self.device) for _ in range(self.num_clients)]

        self.strategy = self._create_default_strategy(save_only_last=True)
        logger.info("FLExperiment initialized successfully")
        # # Use user-provided strategy or create a default one
        # if strategy is None:
        #     self.strategy = self._create_default_strategy()
        # else:
        #     self.strategy = strategy

    def _create_default_strategy(self, save_only_last: bool) -> Strategy:
        """Create a default FedAvg strategy with a minimal server_evaluate."""

        logger.debug("Creating default strategy")

        def server_evaluate(
            server_round: int,
            parameters: NDArrays,
            config: Dict[str, Scalar]
        ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
            # Minimal server eval (no real evaluation)
            net = self.model_cls().to(self.device)
            set_parameters(net, parameters)
            print(f"[Server] Round {server_round} - no global evaluation implemented.")
            return None
        
        def weighted_average(metrics: List[Tuple[int, Dict[str, Scalar]]]) -> Dict[str, Scalar]:
            accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
            examples = [num_examples for num_examples, _ in metrics]
            if sum(examples) == 0:
                return {"accuracy": 0.0}
            return {"accuracy": sum(accuracies) / sum(examples)}

        def weighted_average_regression(metrics: List[Tuple[int, Dict[str, Scalar]]]) -> Dict[str, Scalar]:
            total_sum_of_squares = 0.0
            total_samples = 0
            for (num_examples, m) in metrics:
                #print(f'Test m contents: {m}')
                if "RMSE" in m:
                    total_sum_of_squares += m["RMSE"]
                    total_samples += num_examples
            if total_samples == 0:
                return {"rmse": 0.0}
            rmse = (total_sum_of_squares / total_samples) ** 0.5
            return {"rmse": rmse}
        
        if self.task_type == TaskType.CLASSFICATION:
            aggregation_fn = weighted_average
        else:
            aggregation_fn = weighted_average_regression

        default_strategy = DefaultStrategy(
            model = self.model_cls,
            total_round = self.num_rounds,
            only_last = True,
            fraction_fit=1.0,
            fraction_evaluate=1.0,
            min_fit_clients=self.num_clients,
            min_evaluate_clients=self.num_clients,
            min_available_clients=self.num_clients,
            evaluate_fn=server_evaluate,
            evaluate_metrics_aggregation_fn=aggregation_fn,
        )
        return default_strategy

    def _client_fn(self, context: Context) -> Client:
        """Construct one Flower client using the partition_id to pick (train, val, test)."""
        partition_id = context.node_config["partition-id"]
        trainloader, valloader, testloader = self.client_loaders[partition_id]
        net = self.client_nets[partition_id]
        logger.info(f"Creating client {partition_id}")

        client = HousePricingClient(
            net=net,
            train_loader=trainloader,
            val_loader=valloader,
            test_loader = testloader,
            device=self.device,
            client_id=partition_id,
            epochs=self.local_epochs
        )
        return client.to_client()

    def _server_fn(self, context: Context) -> ServerAppComponents:
        """Server-side: configure strategy and server config."""
        config = ServerConfig(num_rounds=self.num_rounds)
        return ServerAppComponents(strategy=self.strategy, config=config)

    def run(self, save_only_last: bool = True) -> None:
        """Run the federated learning simulation and store final client/server models.
        
        Args:
            save_only_last (bool): Save only the last round of models.
                Default True. If False, all models will be saved.
        """
        print("[FLExperiment] Starting federated training...")
        self.strategy = self._create_default_strategy(save_only_last=save_only_last)
        client_app = ClientApp(client_fn=self._client_fn)
        server_app = ServerApp(server_fn=self._server_fn)

        # Resource allocation
        if self.device.type == "cuda":
            backend_config = {"client_resources": {"num_cpus": 1, "num_gpus": 1.0}}
        else:
            backend_config = {"client_resources": {"num_cpus": 1, "num_gpus": 0.0}}

        # Run the simulation
        run_simulation(
            client_app=client_app,
            server_app=server_app,
            num_supernodes=self.num_clients,
            backend_config=backend_config,
        )
        print("[FLExperiment] Federated training finished.")

    def get_clients(self, round_num: int = 0) -> List[nn.Module]:
        """Return final trained models for all clients (if they have been saved).
        
        Args:
            round_num (int): Round number to fetch models from. Default 0 (last round).
        
        Returns:
            List[nn.Module]: List of final trained models for all clients.
                The index of the list corresponds to the client ID 
                and the index of dataloader. 
        """
        assert round_num <= self.num_rounds, f"Round {round_num} not available, only {self.num_rounds} rounds."
        if round_num <= 0:
            round_num = self.num_rounds
        try:
            return [
                torch.load(f"models/client-{round_num}-{cid}.pth", map_location=self.device, weights_only=True)
                for cid in range(self.num_clients)
            ]
        except FileNotFoundError:
            raise RuntimeError("Client models are not available. Have you called run() or set only_last=True?")
    
    def get_client_dataloader_tuples(self, round_num: int = 0) -> List[Tuple[nn.Module, Tuple[DataLoader, DataLoader, DataLoader]]]:
        """Return the dataloaders for all clients.
         
        Args:
            round_num (int): Round number to fetch models from. Default 0 (last round).
        
        Returns:
            List[Tuple[nn.Module, Tuple[DataLoader, DataLoader, DataLoader]]]:
                List of (client_model, (train_loader, val_loader, test_loader))
        """
        assert round_num <= self.num_rounds, f"Round {round_num} not available, only {self.num_rounds} rounds."
        if round_num <= 0:
            round_num = self.num_rounds
        try:
            clients = self.get_clients(round_num)
            return list(zip(clients, self.client_loaders)) 
        except FileNotFoundError:
            raise RuntimeError("Client dataloaders are not available. Have you called run() or set only_last=True?")

    def get_server(self, round_num: int = 0) -> nn.Module:
        """Return the final server model (if stored).

        Args:
            round_num (int): Round number to fetch models from. Default 0 (last round).
        
        Returns:
            nn.Module: The final server model.
        """
        assert round_num <= self.num_rounds, f"Round {round_num} not available, only {self.num_rounds} rounds."
        if round_num <= 0:
            round_num = self.num_rounds
        try:
            return torch.load(f"models/server-{round_num}.pth", map_location=self.device, weights_only=True)
        except FileNotFoundError:
            raise RuntimeError("Server model is not available. Have you called run() or set only_last=True?")

In [32]:
# Updated Neural Network for Tabular Data

class Net(nn.Module):
    def __init__(self, input_size = 8):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class EZNet(nn.Module):
    def __init__(self, in_features=8):
        super().__init__()
        self.fc = nn.Linear(in_features, 1)

    def forward(self, x):
        # x shape: [batch_size, in_features]
        return self.fc(x)

In [58]:
# Data Loaders for House Pricing Dataset

def load_data(client_id, seed=42):
    data_path = f"house_pricing_datasets_0_rouge/client_{client_id}.csv"
    df = pd.read_csv(data_path)

    # Encode categorical features
    for col in df.select_dtypes(include=['object']).columns:
        df[col] = pd.Categorical(df[col]).codes

    # Separate features and target
    features = df.drop(columns=["House Price"]).values
    target = df["House Price"].values.reshape(-1, 1)

    # Convert to PyTorch tensors
    features_tensor = torch.tensor(features, dtype=torch.float32)
    target_tensor = torch.tensor(target, dtype=torch.float32)

    # Create a full dataset
    full_dataset = TensorDataset(features_tensor, target_tensor)

    # Determine lengths for splits
    total_len = len(full_dataset)
    train_len = int(0.7 * total_len)
    val_len = int(0.2 * total_len)
    test_len = total_len - train_len - val_len

    # Use random_split for reproducible splits
    generator = torch.Generator().manual_seed(seed + client_id)  # Client-specific seed
    train_dataset, val_dataset, test_dataset = random_split(
        full_dataset,
        lengths=[train_len, val_len, test_len],
        generator=generator
    )

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    '''# Split into train, validation, and test sets
    train_size = int(0.7 * len(features))
    val_size = int(0.2 * len(features))
    test_size = len(features) - train_size - val_size
    indices = np.random.permutation(len(features))
    train_idx, val_idx, test_idx = indices[:train_size], indices[train_size:train_size + val_size], indices[train_size + val_size:]

    X_train, y_train = features[train_idx], target[train_idx]
    X_val, y_val = features[val_idx], target[val_idx]
    X_test, y_test = features[test_idx], target[test_idx]

    # Convert to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

    # Create data loaders
    train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=32, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val_tensor, y_val_tensor), batch_size=32, shuffle=False)
    test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=32, shuffle=False)'''

    print(f"Client {client_id} stats: mean={df['House Price'].mean()}, std={df['House Price'].std()}")

    return train_loader, val_loader, test_loader

In [59]:
# Define loaders for all clients
NUM_CLIENTS = 5
loaders = [
    load_data(client_id) for client_id in range(1, NUM_CLIENTS + 1)
]

Client 1 stats: mean=2655.6000458962008, std=996.1192688513912
Client 2 stats: mean=2574.611936598437, std=1004.0983782459657
Client 3 stats: mean=2679.56855169879, std=1063.9333406747944
Client 4 stats: mean=2576.9200760471967, std=1064.3892717656709
Client 5 stats: mean=3010.5561693498457, std=986.2380537998968


In [62]:
fl_exp = FLExperiment(
    model_cls=EZNet,
    client_loaders=loaders,
    num_clients=5,
    num_rounds=20,
    local_epochs=40,
    task_type = TaskType.REGRESSION
)


fl_exp.run(False)

2025-01-09 17:28:03,364 - INFO - Initializing FLExperiment
2025-01-09 17:28:03,375 - DEBUG - Creating default strategy
2025-01-09 17:28:03,389 - INFO - FLExperiment initialized successfully
2025-01-09 17:28:03,392 - DEBUG - Creating default strategy
[92mINFO [0m:      Starting Flower ServerApp, config: num_rounds=20, no round_timeout
[92mINFO [0m:      
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Requesting initial parameters from one random client


[FLExperiment] Starting federated training...


[92mINFO [0m:      Received initial parameters from one random client
[92mINFO [0m:      Starting evaluation of initial global parameters
[92mINFO [0m:      Evaluation returned no results (`None`)
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 1]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)


[Server] Round 0 - no global evaluation implemented.


[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 1 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 2]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)


[36m(ClientAppActor pid=43892)[0m [Client 3] Evaluate -> Loss: 8128071.0857, RMSE: 2850.9772


[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 2 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 3]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 3 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 4]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 4 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 5]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 5 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 6]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)


[36m(ClientAppActor pid=43892)[0m [Client 3] Evaluate -> Loss: 8128071.0857, RMSE: 2850.9772[32m [repeated 20x across cluster][0m


[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 6 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 7]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 7 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 8]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 8 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 9]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 9 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 10]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)


[36m(ClientAppActor pid=42384)[0m [Client 4] Evaluate -> Loss: 11612237.8667, RMSE: 3407.6734[32m [repeated 20x across cluster][0m


[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 10 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 11]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 11 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 12]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 12 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 13]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 13 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 14]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)


[36m(ClientAppActor pid=43892)[0m [Client 0] Evaluate -> Loss: 5877642.0000, RMSE: 2424.3849[32m [repeated 20x across cluster][0m


[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 14 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 15]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 15 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 16]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 16 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 17]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 17 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 18]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)


[36m(ClientAppActor pid=43892)[0m [Client 2] Evaluate -> Loss: 5562179.6923, RMSE: 2358.4274[32m [repeated 20x across cluster][0m


[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 18 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 19]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Server] Round 19 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 20]
[92mINFO [0m:      configure_fit: strategy sampled 5 clients (out of 5)
[92mINFO [0m:      aggregate_fit: received 5 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 5 clients (out of 5)


[Round 20] Saving model for client 1...
[Round 20] Saving model for client 3...
[Round 20] Saving model for client 2...
[Round 20] Saving model for client 0...
[Round 20] Saving model for client 4...
Saving round 20 aggregated_parameters...
[Server] Round 20 - no global evaluation implemented.


[92mINFO [0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [SUMMARY]
[92mINFO [0m:      Run finished 20 round(s) in 54.47s
[92mINFO [0m:      	History (loss, distributed):
[92mINFO [0m:      		round 1: 7335468.571428572
[92mINFO [0m:      		round 2: 7335468.571428572
[92mINFO [0m:      		round 3: 7335468.571428572
[92mINFO [0m:      		round 4: 7335468.571428572
[92mINFO [0m:      		round 5: 7335468.571428572
[92mINFO [0m:      		round 6: 7335468.571428572
[92mINFO [0m:      		round 7: 7335468.571428572
[92mINFO [0m:      		round 8: 7335468.571428572
[92mINFO [0m:      		round 9: 7335468.571428572
[92mINFO [0m:      		round 10: 7335468.571428572
[92mINFO [0m:      		round 11: 7335468.571428572
[92mINFO [0m:      		round 12: 7335468.571428572
[92mINFO [0m:      		round 13: 7335468.571428572
[92mINFO [0m:      		round 14: 7335468.571428572
[92mINFO [0m:      		round 15: 7335468.571428572
[92mINFO

[36m(ClientAppActor pid=42880)[0m [Client 1] Evaluate -> Loss: 6277592.5000, RMSE: 2505.5124[32m [repeated 19x across cluster][0m
[FLExperiment] Federated training finished.


In [None]:
# Test the client model 0 on the corresponding test set

clients = fl_exp.get_clients()
model0_1 = Net().to(DEVICE)
model0_1.load_state_dict(clients[0])
model0_1.eval()

_, _, test_loader_1 = loaders[0]
loss, rmse = test_regression(model0_1, test_loader_1)
print(f"Client 0 rmse: {rmse}")

# Or Alternatively
client_dls = fl_exp.get_client_dataloader_tuples()
model0_2 = Net().to(DEVICE)
model0_2.load_state_dict(client_dls[0][0])
model0_2.eval()

_, _, test_loader_2 = client_dls[0][1]
loss, rmse = test_regression(model0_2, test_loader_2)
print(f"Client 0 rmse: {rmse}")

Client 0 rmse: 2721.624882308361
Client 0 rmse: 2721.624882308361


In [49]:
clients = fl_exp.get_clients()
for i, client in enumerate(clients):
    model = Net().to(DEVICE)
    model.load_state_dict(client)
    model.eval()

    _, _, test_loader = loaders[i]
    loss, rmse = test_regression(model, test_loader)
    print(f'Client_{i} loss: {loss}, RMSE: {rmse}')

Client_0 loss: 10418487.384615384, RMSE: 3227.7681739268983
Client_1 loss: 8274908.235294118, RMSE: 2876.614022647828
Client_2 loss: 5841198.285714285, RMSE: 2416.857109080776
Client_3 loss: 5873791.555555556, RMSE: 2423.590632832937
Client_4 loss: 6741237.777777778, RMSE: 2596.389373298577


In [63]:
clients = fl_exp.get_clients()
for i, client in enumerate(clients):
    model = EZNet().to(DEVICE)
    model.load_state_dict(client)
    model.eval()

    _, _, test_loader = loaders[i]
    loss, rmse = test_regression(model, test_loader)
    print(f'Client_{i} loss: {loss}, RMSE: {rmse}')

Client_0 loss: 6709510.153846154, RMSE: 2590.2722161668944
Client_1 loss: 5882296.94117647, RMSE: 2425.3447056401014
Client_2 loss: 6273306.285714285, RMSE: 2504.656919762522
Client_3 loss: 8848950.222222222, RMSE: 2974.7185114262866
Client_4 loss: 7861413.333333333, RMSE: 2803.821202097832


In [61]:
eznet = EZNet()
train(eznet, loaders[0][0], epochs=100, verbose=True, task_type=TaskType.REGRESSION)
print(test_regression(eznet, loaders[0][1]))

Epoch 1: train loss 302928038.6976744
Epoch 2: train loss 305517008.372093
Epoch 3: train loss 304177497.3023256
Epoch 4: train loss 302526589.0232558
Epoch 5: train loss 294715213.39534885
Epoch 6: train loss 298931036.2790698
Epoch 7: train loss 288432360.18604654
Epoch 8: train loss 292556859.53488374
Epoch 9: train loss 293066195.3488372
Epoch 10: train loss 287413200.372093
Epoch 11: train loss 288174312.18604654
Epoch 12: train loss 286486980.46511626
Epoch 13: train loss 285489866.4186047
Epoch 14: train loss 275262684.2790698
Epoch 15: train loss 277892608.0
Epoch 16: train loss 281704296.18604654
Epoch 17: train loss 277394807.0697674
Epoch 18: train loss 269158060.6511628
Epoch 19: train loss 265689790.5116279
Epoch 20: train loss 269790848.0
Epoch 21: train loss 266794287.62790698
Epoch 22: train loss 264259595.90697673
Epoch 23: train loss 262091609.30232558
Epoch 24: train loss 260540026.04651162
Epoch 25: train loss 254112077.39534885
Epoch 26: train loss 251303566.883720

In [39]:
def generate_random_dataset(n_samples=10000, n_features=5):
    X = torch.randn(n_samples, n_features)
    # Y = torch.zeros(n_samples) 
    Y = 1.5 * X.sum(dim=1)
    print(X[0], Y[0])
    dataset = TensorDataset(X, Y)
    return dataset


def create_random_client_loaders(
    num_clients=4,
    n_samples_per_client=1000,
    n_features=8,
    batch_size=16,
    train_ratio=0.7,
    val_ratio=0.2,
    seed=42
):
    torch.manual_seed(seed)

    client_loaders = []
    for client_id in range(num_clients):
        dataset = generate_random_dataset(n_samples_per_client, n_features)
        total_len = len(dataset)
        train_len = int(train_ratio * total_len)
        val_len = int(val_ratio * total_len)
        test_len = total_len - train_len - val_len

        train_ds, val_ds, test_ds = random_split(
            dataset,
            lengths=[train_len, val_len, test_len],
            generator=torch.Generator().manual_seed(seed + client_id)
        )

        train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
        val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False)
        test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False)
        client_loaders.append((train_loader, val_loader, test_loader))

    return client_loaders

loaders_regression = create_random_client_loaders(num_clients=4)

eznet = EZNet()
train(eznet, loaders_regression[0][0], epochs=20, verbose=True, task_type=TaskType.REGRESSION)
print(test_regression(eznet, loaders_regression[0][1]))

tensor([ 1.9269,  1.4873,  0.9007, -2.1055,  0.6784, -1.2345, -0.0431, -1.6047]) tensor(0.0083)
tensor([ 0.2779,  0.7342, -0.3736, -0.3952, -1.2449, -0.4260, -0.9261,  0.3349]) tensor(-3.0282)
tensor([-0.9757, -1.3057,  1.8161,  1.0204, -0.5522,  0.5234,  0.4719, -0.2912]) tensor(1.0606)
tensor([-0.6554, -0.1144, -1.0761, -0.3204,  0.0177, -0.8664, -0.1684, -1.2356]) tensor(-6.6284)
Epoch 1: train loss 16.926950923374722
Epoch 2: train loss 16.10869864327567
Epoch 3: train loss 15.310064054216657
Epoch 4: train loss 14.559073726109096
Epoch 5: train loss 13.840103334699359
Epoch 6: train loss 13.141866607666016
Epoch 7: train loss 12.47857789175851
Epoch 8: train loss 11.84029914855957
Epoch 9: train loss 11.2280201285226
Epoch 10: train loss 10.639702268327985
Epoch 11: train loss 10.07326446533203
Epoch 12: train loss 9.534879259381976
Epoch 13: train loss 9.01893990107945
Epoch 14: train loss 8.519811940874373
Epoch 15: train loss 8.043475701468331
Epoch 16: train loss 7.58717914036