In [None]:
#! pip3 install torch torchvision torchaudio

# Setting up the experiment

In [1]:
import torch
import partition_scripts

from neural_nets import Net, centralized_training, VGG7, get_parameters

In [2]:
DATA_STORE = {
    "CIFAR10_IID": None,
    "CIFAR10_NonIID": None,
    "CIFAR100_IID": None,
    "CIFAR100_NonIID": None,
    "FedFaces_IID": None,
    "FedFaces_NonIID": None,

}

# Centralized training

In [None]:
experiments = ["CIFAR10", "CIFAR100", "CelebA", "FedFaces"]
epochs = 400

def run_centralized(experiment):
    match experiment:
        case "CIFAR10":
            DATA_STORE["CIFAR10"] = partition_scripts.partition_CIFAR_IID(2)
            dataloaders, valloaders, testloaders = DATA_STORE["CIFAR10"]
            net = VGG7(classes=10)
            centralized_training(trainloader=dataloaders[0], valloader=valloaders[0], testloader=testloaders, net=net, epochs=epochs, classes=10, DEVICE="cpu")
        case "CIFAR100":
            DATA_STORE["CIFAR100"] = partition_scripts.partition_CIFAR_IID(2, "CIFAR100")
            dataloaders, valloaders, testloaders = DATA_STORE["CIFAR100"]
            net = VGG7(classes=100)
            centralized_training(trainloader=dataloaders[0], valloader=valloaders[0], testloader=testloaders, epochs=epochs, classes=100)
        case "CelebA":
            DATA_STORE["CelebA"] = partition_scripts.partition_CelebA_IID(2)
            dataloaders, valloaders, testloaders = DATA_STORE["CelebA"]
            net = VGG7(classes=2, shape=(64, 64))
            centralized_training(trainloader=dataloaders[0], valloader=valloaders[0], testloader=testloaders, epochs=epochs, net=net)
        case "FedFaces":
            DATA_STORE["FedFaces"] = partition_scripts.partition_FedFaces_IID(2)
            dataloaders, valloaders, testloaders = DATA_STORE["FedFaces"]
            net = VGG7(classes=4, shape=(64,64))
            centralized_training(trainloader=dataloaders[0], valloader=valloaders[0], net=net, testloader=testloaders, epochs=epochs, classes=3)
        case _:
            pass



In [None]:
run_centralized(experiments[0])

In [None]:
run_centralized(experiments[1])

In [None]:

run_centralized(experiments[2])

In [None]:
run_centralized(experiments[-1])

# Setting up a FLWR environment

In [3]:
import datetime
import flwr as fl
import numpy as np

from logging import INFO
from flwr.common.logger import log
from Clients import FlowerClient, weighted_average, fit_config

In [4]:
today = datetime.datetime.today()
fl.common.logger.configure(identifier="FL Paper Experiment", filename=f"./logs/log_FLWR_{today.timestamp()}.txt")

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

print(
    f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}"
)

NUM_CLIENTS = 20
TRAINING_ROUNDS = 50

# Specify client resources if you need GPU (defaults to 1 CPU and 0 GPU)
def  get_resources():
    if DEVICE.type == "cpu":
        client_resources = {"num_cpus": 4, "num_gpus": 0}
    else:
        client_resources = {"num_cpus": 2, "num_gpus": 1}

    return client_resources

client_resources = get_resources()
print(DEVICE)

Training on cpu using PyTorch 2.2.1+cpu and Flower 1.7.0
cpu


In [5]:
# Needed for initial params in fedAdam and FedYogi
sample_net = VGG7(classes=10)
params = get_parameters(sample_net)

Then, setting up the strategies

In [6]:
fedAvg = fl.server.strategy.FedAvg(
    fraction_fit=0.5,  
    fraction_evaluate=0.5,  
    min_fit_clients=1,  
    min_evaluate_clients=1, 
    min_available_clients=1,
    on_fit_config_fn=fit_config,
    evaluate_metrics_aggregation_fn=weighted_average,
    initial_parameters=fl.common.ndarrays_to_parameters(get_parameters(sample_net))
)

fedProx = fl.server.strategy.FedProx(
    fraction_fit=0.5,  
    fraction_evaluate=0.5,  
    min_fit_clients=1,  
    min_evaluate_clients=1, 
    min_available_clients=1,
    on_fit_config_fn=fit_config,
    evaluate_metrics_aggregation_fn=weighted_average,
    proximal_mu= 0.5
)

fedAvgM = fl.server.strategy.FedAvgM(
    fraction_fit=0.5,  
    fraction_evaluate=0.5,  
    min_fit_clients=1,  
    min_evaluate_clients=1, 
    min_available_clients=1,
    on_fit_config_fn=fit_config,
    evaluate_metrics_aggregation_fn=weighted_average,
)

fedAdam = fl.server.strategy.FedAdam(
    fraction_fit=0.5,  
    fraction_evaluate=0.5,  
    min_fit_clients=1,  
    min_evaluate_clients=1, 
    min_available_clients=1,
    on_fit_config_fn=fit_config,
    evaluate_metrics_aggregation_fn=weighted_average,
    initial_parameters=fl.common.ndarrays_to_parameters(get_parameters(sample_net))
)

fedYogi = fl.server.strategy.FedYogi(
    fraction_fit=0.5,  
    fraction_evaluate=0.5,  
    min_fit_clients=1,  
    min_evaluate_clients=1, 
    min_available_clients=1,
    on_fit_config_fn=fit_config,
    evaluate_metrics_aggregation_fn=weighted_average,
    initial_parameters=fl.common.ndarrays_to_parameters(get_parameters(sample_net))
)


#### CIFAR10 setup: Client FNS ####

In [7]:
# A couple of client_fns for using with Flower, one for each dataset experiment
def client_fn_CIFAR10_IID(cid: str) -> FlowerClient:
    """Create a Flower client representing a single organization."""

    # Create model
    net = VGG7(classes=10).to(DEVICE)

    # Load data (CIFAR-10)
    trainloaders, valloaders,_ =  DATA_STORE["CIFAR10_IID"]
    # Note: each client gets a different trainloader/valloader, so each client
    # will train and evaluate on their own unique data
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]

    # Create a  single Flower client representing a single organization
    return FlowerClient(net, trainloader, valloader, cid).to_client()

def client_fn_CIFAR10_nonIID(cid: str) -> FlowerClient:
    """Create a Flower client representing a single organization."""

    # Create model
    net = VGG7(classes=10).to(DEVICE)

    # Load data (CIFAR-10)
    trainloaders, valloaders,_ =  DATA_STORE["CIFAR10_NonIID"]
    # Note: each client gets a different trainloader/valloader, so each client
    # will train and evaluate on their own unique data
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]

    # Create a  single Flower client representing a single organization
    return FlowerClient(net, trainloader, valloader, cid).to_client()

### FedExperiment Class ###
Ok, so now i'll encapsulate this code to reuse with different strategies and datasets

In [8]:
class FedExperiment():

    def __init__(self, client_fn,strategy, name="New experiment"):
        self.client_fn = client_fn
        self.strategy = strategy
        self.name = name

    def simulate_FL(self, rounds=1):
        log(INFO, "\n" + 10 * "========" + "\n" + self.name + " has started\n" + 10 * "========"  )
        metrics = fl.simulation.start_simulation(
                            client_fn=self.client_fn,
                            num_clients=NUM_CLIENTS,
                            config=fl.server.ServerConfig(num_rounds=rounds),
                            strategy=self.strategy,
                            client_resources=client_resources,
                        )
        log(INFO, "\n" + 10 * "========" + "\n" + self.name + " has ended\n" + 10 * "========"  )
        return metrics

In [9]:
DATA_STORE["CIFAR10_IID"]= partition_scripts.partition_CIFAR_IID(NUM_CLIENTS, "CIFAR10")
exp_CIFAR10_IID = FedExperiment(client_fn=client_fn_CIFAR10_IID, strategy=fedAvg, name="CIFAR 10 - FedAvg - IID Distribution")
metrics = exp_CIFAR10_IID.simulate_FL(rounds=3)
print(metrics)
DATA_STORE["CIFAR10_IID"]= None

INFO flwr 2024-03-11 15:55:00,861 | 527930960.py:9 | 
CIFAR 10 - FedAvg - IID Distribution has started
INFO flwr 2024-03-11 15:55:00,862 | app.py:178 | Starting Flower simulation, config: ServerConfig(num_rounds=3, round_timeout=None)


Shape nonIID: (32, 32, 3)
[0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.04999999899999996]
0.9999999990000003


2024-03-11 15:55:04,229	INFO worker.py:1621 -- Started a local Ray instance.
INFO flwr 2024-03-11 15:55:06,327 | app.py:213 | Flower VCE: Ray initialized with resources: {'node:__internal_head__': 1.0, 'memory': 3484164096.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 1742082048.0, 'CPU': 12.0}
INFO flwr 2024-03-11 15:55:06,328 | app.py:219 | Optimize your simulation with Flower VCE: https://flower.dev/docs/framework/how-to-run-simulations.html
INFO flwr 2024-03-11 15:55:06,328 | app.py:242 | Flower VCE: Resources for each Virtual Client: {'num_cpus': 4, 'num_gpus': 0}
INFO flwr 2024-03-11 15:55:06,328 | app.py:288 | Flower VCE: Creating VirtualClientEngineActorPool with 3 actors
INFO flwr 2024-03-11 15:55:06,342 | server.py:89 | Initializing global parameters
INFO flwr 2024-03-11 15:55:06,343 | server.py:272 | Using initial parameters provided by strategy
INFO flwr 2024-03-11 15:55:06,344 | server.py:91 | Evaluating initial parameters
INFO flwr 2024-03-11 15:55:06,345 | server.py:1

[2m[36m(DefaultActor pid=11728)[0m [Client 11, round 1] fit, config: {'server_round': 1, 'local_epochs': 3}
[2m[36m(DefaultActor pid=5848)[0m [Client 3, round 1] fit, config: {'server_round': 1, 'local_epochs': 3}[32m [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)[0m
[2m[36m(DefaultActor pid=11728)[0m [Client 13, round 1] fit, config: {'server_round': 1, 'local_epochs': 3}
[2m[36m(DefaultActor pid=12328)[0m [Client 5, round 1] fit, config: {'server_round': 1, 'local_epochs': 3}
[2m[36m(DefaultActor pid=5848)[0m [Client 15, round 1] fit, config: {'server_round': 1, 'local_epochs': 3}
[2m[36m(DefaultActor pid=11728)[0m [Client 16, round 1] fit, config: {'server_round': 1, 'local_epochs': 3}
[2m[36m(DefaultActor pid=5848)[0m [Client 7, round 1] fit, config: {'server_round': 1, 'local_epochs':

DEBUG flwr 2024-03-11 16:00:04,795 | server.py:236 | fit_round 1 received 10 results and 0 failures
DEBUG flwr 2024-03-11 16:00:05,785 | server.py:173 | evaluate_round 1: strategy sampled 10 clients (out of 20)


[2m[36m(DefaultActor pid=11728)[0m [Client 13] evaluate, config: {}
[2m[36m(DefaultActor pid=12328)[0m [Client 0] evaluate, config: {}[32m [repeated 6x across cluster][0m


DEBUG flwr 2024-03-11 16:00:14,278 | server.py:187 | evaluate_round 1 received 10 results and 0 failures
DEBUG flwr 2024-03-11 16:00:14,278 | server.py:222 | fit_round 2: strategy sampled 10 clients (out of 20)


[2m[36m(DefaultActor pid=12328)[0m [Client 5, round 2] fit, config: {'server_round': 2, 'local_epochs': 3}
[2m[36m(DefaultActor pid=12328)[0m [Client 18] evaluate, config: {}[32m [repeated 3x across cluster][0m
