## Dipendencies

In [1]:
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple

import flwr as fl
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import CIFAR10

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}")

Training on cpu using PyTorch 1.13.0 and Flower 1.1.0


## Case: unbalanced data
Idea: set 10 clients with different unbalanced dataset. Each client concur to the final model, so there is not a selection



In [35]:
NUM_CLIENTS = 10

def load_datasets(num_clients: int, unbalanced: bool, spec: bool):
    # Download and transform CIFAR-10 (train and test)
    transform = transforms.Compose(
      [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )
    trainset = CIFAR10("./dataset", train=True, download=True, transform=transform)
    testset = CIFAR10("./dataset", train=False, download=True, transform=transform)

    if unbalanced==True and spec==False:
        lengths = [int(len(trainset) *0.05), int(len(trainset) *0.02),int(len(trainset) *0.1), int(len(trainset) *0.1),int(len(trainset) *0.03), int(len(trainset) *0.04),int(len(trainset) *0.1), int(len(trainset) *0.01 ),int(len(trainset) *0.05), int(len(trainset) *0.5)]
        datasets = random_split(trainset, lengths, torch.Generator().manual_seed(42))
    elif spec==True and unbalanced==False:



    else:
        # Split training set into `num_clients` partitions to simulate different local datasets
        partition_size = len(trainset) // num_clients
        lengths = [partition_size] * num_clients
        datasets = random_split(trainset, lengths, torch.Generator().manual_seed(42))

    # Split each partition into train/val and create DataLoader
    trainloaders = []
    valloaders = []
    for ds in datasets:
        len_val = len(ds) // 10  # 10 % validation set
        len_train = len(ds) - len_val
        lengths = [len_train, len_val]
        ds_train, ds_val = random_split(ds, lengths, torch.Generator().manual_seed(42))
        trainloaders.append(DataLoader(ds_train, batch_size=32, shuffle=True))
        valloaders.append(DataLoader(ds_val, batch_size=32))
    testloader = DataLoader(testset, batch_size=32)
    return trainloaders, valloaders, testloader

trainloaders, valloaders, testloader = load_datasets(NUM_CLIENTS, True, False)

Files already downloaded and verified
Files already downloaded and verified


In [102]:
trainset = CIFAR10("./dataset", train=True, download=True, transform=transform)


trainloaders = []
myiter = iter(trainset)
for x in myiter:
    for y in range(0,9):
        if x[1]==y:


  print(x[1])

6
6
9
9
4
1
1
2
7
8
3
4
7
7
2
9
9
9
3
2
6
4
3
6
6
2
6
3
5
4
0
0
9
1
3
4
0
3
7
3
3
5
2
2
7
1
1
1
2
2
0
9
5
7
9
2
2
5
2
4
3
1
1
8
2
1
1
4
9
7
8
5
9
6
7
3
1
9
0
3
1
3
5
4
5
7
7
4
7
9
4
2
3
8
0
1
6
1
1
4
1
8
3
9
6
6
1
8
5
2
9
9
8
1
7
7
0
0
6
9
1
2
2
9
2
6
6
1
9
5
0
4
7
6
7
1
8
1
1
2
8
1
3
3
6
2
4
9
9
5
4
3
6
7
4
6
8
5
5
4
3
1
8
4
7
6
0
9
5
1
3
8
2
7
5
3
4
1
5
7
0
4
7
5
5
1
0
9
6
9
0
8
7
8
8
2
5
2
3
5
0
6
1
9
3
6
9
1
3
9
6
6
7
1
0
9
5
8
5
2
9
0
8
8
0
6
9
1
1
6
3
7
6
6
0
6
6
1
7
1
5
8
3
6
6
8
6
8
4
6
6
1
3
8
3
4
1
7
1
3
8
5
1
1
4
0
9
3
7
4
9
9
2
4
9
9
1
0
5
9
0
8
2
1
2
0
5
6
3
2
7
8
8
6
0
7
9
4
5
6
4
2
1
1
2
1
5
9
9
0
8
4
1
1
6
3
3
9
0
7
9
7
7
9
1
5
1
6
6
8
7
1
3
0
3
3
2
4
5
7
5
9
0
3
4
0
4
4
6
0
0
6
6
0
8
1
6
2
9
2
5
9
6
7
4
1
8
7
3
6
9
3
0
4
0
5
1
0
3
4
8
5
4
7
2
3
9
7
6
7
1
4
7
0
1
7
3
1
8
4
4
2
0
2
2
0
0
9
0
9
6
8
2
7
7
4
0
3
0
8
9
4
2
7
2
5
2
5
1
9
4
8
5
1
7
4
4
0
6
9
0
7
8
8
9
9
3
3
4
0
4
5
6
6
0
1
0
8
0
4
8
8
1
5
2
6
8
1
0
0
7
7
5
9
6
2
8
3
4
7
3
9
0
1
2
4
8
1
8
6
4
4
5
7
1
3
9
8
0
1


KeyboardInterrupt: 

#### Prepare the function for the workers

In [37]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        #Applies a 2D max pooling over an input signal composed of several input planes.
        # [In una matrice n x n prende il valore massimo - riduce dimensioni e varianza - 2,2  in which case, the first int is used for the height dimension, and the second int for the width dimension]
        self.pool = nn.MaxPool2d(2, 2)
        # 2d convulation - (in_channels: int, out_channels: int, kernel_size: Union[int, tuple[int, int]])
        self.conv2 = nn.Conv2d(6, 16, 5)
        #trasformazioni lineari [ fc2 esempio: input features=120 , output_feature=84]
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def train(net, trainloader, epochs: int, verbose=False):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for images, labels in trainloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(net(images), labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        if verbose:
            print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)



## Prepare the workers


In [38]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, net, trainloader, valloader):
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        return get_parameters(self.net)

    def fit(self, parameters, config):
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def client_fn(cid: str) -> FlowerClient:
    """Create a Flower client representing a single organization."""

    # Load model
    net = Net().to(DEVICE)

    # Load data (CIFAR-10)
    # Note: each client gets a different trainloader/valloader, so each client
    # will train and evaluate on their own unique data
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]

    # Create a  single Flower client representing a single organization
    return FlowerClient(net, trainloader, valloader)


## Train data and metrics

In [43]:

from flwr.common import Metrics


def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    # Multiply accuracy of each client by number of examples used
    accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]

    # Aggregate and return custom metric (weighted average)
    return {"accuracy": sum(accuracies) / sum(examples)}

# The `evaluate` function will be by Flower called after every round
def evaluate(
    server_round: int, parameters: fl.common.NDArrays, config: Dict[str, fl.common.Scalar]
) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]:
    net = Net()
    valloader = valloaders[0]
    set_parameters(net, parameters)  # Update model with the latest parameters
    loss, accuracy = test(net, valloader)
    print(f"Server-side evaluation loss {loss} / accuracy {accuracy}")
    return loss, {"accuracy": accuracy}

# Create FedAvg strategy
strategy = fl.server.strategy.FedAvg(
        fraction_fit=1.0,
        fraction_evaluate=0.5,
        min_fit_clients=10,
        min_evaluate_clients=5,
        min_available_clients=10,
        evaluate_metrics_aggregation_fn=weighted_average,  # <-- pass the metric aggregation function,
        evaluate_fn=evaluate  # Pass the evaluation function
)




# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=5),
    strategy=strategy,
)

INFO flower 2022-12-12 16:05:43,008 | app.py:140 | Starting Flower simulation, config: ServerConfig(num_rounds=5, round_timeout=None)
2022-12-12 16:05:46,767	INFO worker.py:1518 -- Started a local Ray instance.
INFO flower 2022-12-12 16:05:48,355 | app.py:174 | Flower VCE: Ray initialized with resources: {'object_store_memory': 2147483648.0, 'CPU': 8.0, 'memory': 16996905780.0, 'node:127.0.0.1': 1.0}
INFO flower 2022-12-12 16:05:48,357 | server.py:86 | Initializing global parameters
INFO flower 2022-12-12 16:05:48,357 | server.py:270 | Requesting initial parameters from one random client
INFO flower 2022-12-12 16:05:50,746 | server.py:274 | Received initial parameters from one random client
INFO flower 2022-12-12 16:05:50,746 | server.py:88 | Evaluating initial parameters
INFO flower 2022-12-12 16:05:50,836 | server.py:91 | initial parameters (loss, other metrics): 0.07377857208251953, {'accuracy': 0.084}
INFO flower 2022-12-12 16:05:50,836 | server.py:101 | FL starting
DEBUG flower 20

Server-side evaluation loss 0.07377857208251953 / accuracy 0.084


DEBUG flower 2022-12-12 16:06:11,353 | server.py:229 | fit_round 1 received 10 results and 0 failures
INFO flower 2022-12-12 16:06:11,453 | server.py:116 | fit progress: (1, 0.061182955741882324, {'accuracy': 0.324}, 20.616273630999785)
DEBUG flower 2022-12-12 16:06:11,454 | server.py:165 | evaluate_round 1: strategy sampled 5 clients (out of 10)


Server-side evaluation loss 0.061182955741882324 / accuracy 0.324


DEBUG flower 2022-12-12 16:06:18,712 | server.py:179 | evaluate_round 1 received 5 results and 0 failures
DEBUG flower 2022-12-12 16:06:18,712 | server.py:215 | fit_round 2: strategy sampled 10 clients (out of 10)
[2m[36m(raylet)[0m Spilled 2499 MiB, 30 objects, write throughput 1230 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
DEBUG flower 2022-12-12 16:06:37,124 | server.py:229 | fit_round 2 received 10 results and 0 failures
INFO flower 2022-12-12 16:06:37,219 | server.py:116 | fit progress: (2, 0.04846420478820801, {'accuracy': 0.424}, 46.38273490000029)
DEBUG flower 2022-12-12 16:06:37,220 | server.py:165 | evaluate_round 2: strategy sampled 5 clients (out of 10)


Server-side evaluation loss 0.04846420478820801 / accuracy 0.424


[2m[36m(raylet)[0m Spilled 4265 MiB, 53 objects, write throughput 838 MiB/s.
DEBUG flower 2022-12-12 16:06:42,734 | server.py:179 | evaluate_round 2 received 5 results and 0 failures
DEBUG flower 2022-12-12 16:06:42,735 | server.py:215 | fit_round 3: strategy sampled 10 clients (out of 10)
DEBUG flower 2022-12-12 16:07:01,259 | server.py:229 | fit_round 3 received 10 results and 0 failures
INFO flower 2022-12-12 16:07:01,360 | server.py:116 | fit progress: (3, 0.04391970252990723, {'accuracy': 0.476}, 70.52352211499965)
DEBUG flower 2022-12-12 16:07:01,361 | server.py:165 | evaluate_round 3: strategy sampled 5 clients (out of 10)


Server-side evaluation loss 0.04391970252990723 / accuracy 0.476


DEBUG flower 2022-12-12 16:07:06,474 | server.py:179 | evaluate_round 3 received 5 results and 0 failures
DEBUG flower 2022-12-12 16:07:06,475 | server.py:215 | fit_round 4: strategy sampled 10 clients (out of 10)
DEBUG flower 2022-12-12 16:07:24,148 | server.py:229 | fit_round 4 received 10 results and 0 failures
INFO flower 2022-12-12 16:07:24,236 | server.py:116 | fit progress: (4, 0.04088971900939942, {'accuracy': 0.528}, 93.39893978300006)
DEBUG flower 2022-12-12 16:07:24,236 | server.py:165 | evaluate_round 4: strategy sampled 5 clients (out of 10)


Server-side evaluation loss 0.04088971900939942 / accuracy 0.528


DEBUG flower 2022-12-12 16:07:29,357 | server.py:179 | evaluate_round 4 received 5 results and 0 failures
DEBUG flower 2022-12-12 16:07:29,358 | server.py:215 | fit_round 5: strategy sampled 10 clients (out of 10)
[2m[36m(raylet)[0m Spilled 8382 MiB, 102 objects, write throughput 756 MiB/s.
DEBUG flower 2022-12-12 16:07:46,074 | server.py:229 | fit_round 5 received 10 results and 0 failures
INFO flower 2022-12-12 16:07:46,162 | server.py:116 | fit progress: (5, 0.03993689203262329, {'accuracy': 0.552}, 115.32533063000028)
DEBUG flower 2022-12-12 16:07:46,163 | server.py:165 | evaluate_round 5: strategy sampled 5 clients (out of 10)


Server-side evaluation loss 0.03993689203262329 / accuracy 0.552


DEBUG flower 2022-12-12 16:07:51,028 | server.py:179 | evaluate_round 5 received 5 results and 0 failures
INFO flower 2022-12-12 16:07:51,029 | server.py:144 | FL finished in 120.19214101499983
INFO flower 2022-12-12 16:07:51,030 | app.py:192 | app_fit: losses_distributed [(1, 0.06192469973855175), (2, 0.049669974972127065), (3, 0.047532707501971534), (4, 0.044130217824524975), (5, 0.043913874550710746)]
INFO flower 2022-12-12 16:07:51,030 | app.py:193 | app_fit: metrics_distributed {'accuracy': [(1, 0.32585915492957745), (2, 0.45565909090909096), (3, 0.4703703703703704), (4, 0.5201016260162602), (5, 0.5168987341772152)]}
INFO flower 2022-12-12 16:07:51,031 | app.py:194 | app_fit: losses_centralized [(0, 0.07377857208251953), (1, 0.061182955741882324), (2, 0.04846420478820801), (3, 0.04391970252990723), (4, 0.04088971900939942), (5, 0.03993689203262329)]
INFO flower 2022-12-12 16:07:51,031 | app.py:195 | app_fit: metrics_centralized {'accuracy': [(0, 0.084), (1, 0.324), (2, 0.424), (3,

History (loss, distributed):
	round 1: 0.06192469973855175
	round 2: 0.049669974972127065
	round 3: 0.047532707501971534
	round 4: 0.044130217824524975
	round 5: 0.043913874550710746
History (loss, centralized):
	round 0: 0.07377857208251953
	round 1: 0.061182955741882324
	round 2: 0.04846420478820801
	round 3: 0.04391970252990723
	round 4: 0.04088971900939942
	round 5: 0.03993689203262329
History (metrics, distributed):
{'accuracy': [(1, 0.32585915492957745), (2, 0.45565909090909096), (3, 0.4703703703703704), (4, 0.5201016260162602), (5, 0.5168987341772152)]}History (metrics, centralized):
{'accuracy': [(0, 0.084), (1, 0.324), (2, 0.424), (3, 0.476), (4, 0.528), (5, 0.552)]}

## What happens if the clients are super-specialized?