# Centralized Learning

In [17]:
from collections import OrderedDict
from typing import List, Tuple
import time
import flwr as fl
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from flwr.server.strategy import FedAvg
from torchvision.models._api import Weights
from torchvision.transforms import transforms
from flwr.common import Metrics, parameters_to_ndarrays, ndarrays_to_parameters, FitRes
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import CIFAR10

from tinysmpc import VirtualMachine, PrivateScalar
from tinysmpc.fixed_point import fixed_point, float_point

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}")

Training on cpu using PyTorch 1.13.1+cpu and Flower 1.1.0


In [2]:
CLASSES = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
NUM_CLIENTS = 5

In [3]:

BATCH_SIZE = 32


def load_datasets():
    # Download and transform CIFAR-10 (train and test)
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )
    trainset = CIFAR10("./dataset", train=True, download=False, transform=transform)
    testset = CIFAR10("./dataset", train=False, download=False, transform=transform)

    # Split training set into 10 partitions to simulate the individual dataset
    partition_size = len(trainset) // NUM_CLIENTS
    print(f"Partition Size {partition_size}")
    lengths = [partition_size] * NUM_CLIENTS
    datasets = random_split(trainset, lengths, torch.Generator().manual_seed(42))

    # Split each partition into train/val and create DataLoader
    trainloaders = []
    valloaders = []
    for ds in datasets:
        len_val = len(ds) // 10  # 10 % validation set
        len_train = len(ds) - len_val
        lengths = [len_train, len_val]
        ds_train, ds_val = random_split(ds, lengths, torch.Generator().manual_seed(42))
        trainloaders.append(DataLoader(ds_train, batch_size=BATCH_SIZE, shuffle=True))
        valloaders.append(DataLoader(ds_val, batch_size=BATCH_SIZE))
    testloader = DataLoader(testset, batch_size=BATCH_SIZE)
    return trainloaders, valloaders, testloader


trainloaders, valloaders, testloader = load_datasets()

Partition Size 10000


In [None]:
images, labels = next(iter(trainloaders[0]))

# Reshape and convert images to a NumPy array
# matplotlib requires images with the shape (height, width, 3)
images = images.permute(0, 2, 3, 1).numpy()
# Denormalize
images = images / 2 + 0.5

# Create a figure and a grid of subplots
fig, axs = plt.subplots(4, 8, figsize=(12, 6))

# Loop over the images and plot them
for i, ax in enumerate(axs.flat):
    ax.imshow(images[i])
    ax.set_title(CLASSES[labels[i]])
    ax.axis('off')

# Show the plot
fig.tight_layout()
plt.show()

In [4]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [5]:
def train(net, trainloader, epochs: int, verbose=False):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    training_start_time = time.time()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for images, labels in trainloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(net(images), labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        if verbose:
            print(
                f"Epoch {epoch + 1}: train loss:  {epoch_loss}, accuracy: {epoch_acc}, time taken: {time.time() - training_start_time}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

In [6]:
trainloader = trainloaders[0]
valloader = valloaders[0]
net = Net().to(DEVICE)

train(net, trainloader, 5, True)

loss, accuracy = test(net, testloader)
print(f"Final test set performance:\n\tloss {loss}\n\taccuracy {accuracy}")

Epoch 1: train loss:  0.06070292741060257, accuracy: 0.289, time taken: 6.787944793701172
Epoch 2: train loss:  0.05182642862200737, accuracy: 0.3993333333333333, time taken: 15.387065649032593
Epoch 3: train loss:  0.0475374199450016, accuracy: 0.451, time taken: 22.307281970977783
Epoch 4: train loss:  0.0448681116104126, accuracy: 0.47988888888888886, time taken: 28.740000009536743
Epoch 5: train loss:  0.04237360879778862, accuracy: 0.5161111111111111, time taken: 34.9176459312439
Final test set performance:
	loss 0.04520877415537834
	accuracy 0.477


In [None]:
print(net.parameters)

In [None]:
net.state_dict()

In [None]:
parameters = [val.cpu().numpy() for _, val in net.state_dict().items()]
len(parameters)

In [None]:
net.state_dict()['fc3.weight']

In [None]:
numpy_weights = net.state_dict()['fc3.weight'].cpu().detach().numpy()
print(numpy_weights.shape)

In [None]:
print(numpy_weights[:3].shape)

In [None]:
from flwr.common import ndarray_to_bytes

tensors = [ndarray_to_bytes(ndarray) for ndarray in net.state_dict()['fc3.weight']]
tensors

# Federated Learning

In [7]:
def get_parameters(net) -> List[np.ndarray]:
    parameters = [val.cpu().numpy() for _, val in net.state_dict().items()]
    return parameters


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)

## Local Client

In [8]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, net, trainloader, valloader):
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        return get_parameters(self.net)

    def fit(self, parameters, config):
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}

In [9]:
def client_fn(cid: str) -> FlowerClient:
    """Create a Flower client representing a single organization."""

    # Load model
    net = Net().to(DEVICE)

    # Load data (CIFAR-10)
    # Note: each client gets a different trainloader/valloader, so each client
    # will train and evaluate on their own unique data
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]

    # Create a  single Flower client representing a single organization
    return FlowerClient(net, trainloader, valloader)

In [10]:
from typing import Optional, Dict


def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    # Multiply accuracy of each client by number of examples used
    accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]

    # Aggregate and return custom metric (weighted average)
    return {"accuracy": sum(accuracies) / sum(examples)}


# The `evaluate` function will be by Flower called after every round
def evaluate(server_round: int, parameters: fl.common.NDArrays, config: Dict[str, fl.common.Scalar]) -> Optional[
    Tuple[float, Dict[str, fl.common.Scalar]]]:
    net = Net()
    valloader = valloaders[0]
    set_parameters(net, parameters)  # Update model with the latest parameters
    loss, accuracy = test(net, valloader)
    print(f"Server-side evaluation loss {loss} / accuracy {accuracy}")
    return loss, {"accuracy": accuracy}

In [14]:
Q = 2657003489534545107915232808830590043
fixedPoint = np.vectorize(fixed_point)
floatPoint = np.vectorize(float_point)


class FedAvgSmc(FedAvg):
    def aggregate_fit(
            self,
            rnd: int,
            results: List[Tuple[fl.server.client_proxy.ClientProxy, FitRes]],
            failures: List[BaseException],
    ) -> Optional[Weights]:
        if not results:
            return None

        clients = []
        fit_results = []

        for client, fit_result in results:
            clients.append(client)
            fit_results.append(fit_result)

        fit_res_ndarray_parameters = [parameters_to_ndarrays(result.parameters) for result in
                                      fit_results]  # list of clients, each with a list of layers(each layer representing weights)

        fl_nodes = [VirtualMachine(f"Client: {client.cid}") for client in clients]

        num_layers = len(fit_results[0].parameters.tensors)

        layers_weights = {}

        layers_weights_smc = {}

        for layer in range(num_layers):  #loop through number of layers
            layers_weights[f"layer_{layer}"] = []
            for weights in fit_res_ndarray_parameters:
                layers_weights[f"layer_{layer}"].append(fixedPoint(weights[layer]))

        for layer in range(num_layers):
            fl_node_values = [PrivateScalar(tensor, node) for tensor, node in
                              zip(layers_weights[f'layer_{layer}'], fl_nodes)]

            fl_exchanged_shares = []
            fl_exchanged_shares_list = []
            layers_weights_smc[f"layer_{layer}"] = []

            for value in fl_node_values:
                fl_exchanged_shares.append(value.share_tensor(fl_nodes, Q))

            for client_shares in fl_exchanged_shares:
                fl_exchanged_shares_list = [share.value for share in client_shares.shares]

            layers_weights_smc[f'layer_{layer}'] = floatPoint(fl_exchanged_shares_list)

        for i, client in enumerate(clients):
            client_weights = []
            for layer in layers_weights_smc.values():
                client_weights.append(np.array(layer[i]))
            fit_results[i].parameters = ndarrays_to_parameters(client_weights)

        results = tuple(zip(clients, fit_results))

        return super().aggregate_fit(rnd, results, failures)


In [19]:
# Create FedAvg strategy
strategy = FedAvgSmc(
    fraction_fit=1.0,  # Sample 100% of available clients for training
    fraction_evaluate=0.5,  # Sample 50% of available clients for evaluation
    min_fit_clients=5,  # Never sample less than 5 clients for training
    min_evaluate_clients=3,  # Never sample less than 3 clients for evaluation
    min_available_clients=5,  # Wait until 5 clients are available
    evaluate_metrics_aggregation_fn=weighted_average,  # pass the metric aggregation function
    evaluate_fn=evaluate,  # Pass the evaluation function
)

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=2),
    strategy=strategy,
)

INFO flower 2023-01-25 16:40:19,568 | app.py:140 | Starting Flower simulation, config: ServerConfig(num_rounds=2, round_timeout=None)
2023-01-25 16:40:25,424	INFO worker.py:1518 -- Started a local Ray instance.
INFO flower 2023-01-25 16:40:29,127 | app.py:174 | Flower VCE: Ray initialized with resources: {'object_store_memory': 1709134233.0, 'memory': 3418268468.0, 'node:127.0.0.1': 1.0, 'CPU': 4.0}
INFO flower 2023-01-25 16:40:29,128 | server.py:86 | Initializing global parameters
INFO flower 2023-01-25 16:40:29,129 | server.py:270 | Requesting initial parameters from one random client
INFO flower 2023-01-25 16:40:33,573 | server.py:274 | Received initial parameters from one random client
INFO flower 2023-01-25 16:40:33,574 | server.py:88 | Evaluating initial parameters
INFO flower 2023-01-25 16:40:34,003 | server.py:91 | initial parameters (loss, other metrics): 0.07369909429550171, {'accuracy': 0.098}
INFO flower 2023-01-25 16:40:34,004 | server.py:101 | FL starting
DEBUG flower 202

Server-side evaluation loss 0.07369909429550171 / accuracy 0.098


DEBUG flower 2023-01-25 16:41:03,629 | server.py:229 | fit_round 1 received 5 results and 0 failures
INFO flower 2023-01-25 16:41:06,399 | server.py:116 | fit progress: (1, 0.059464130878448485, {'accuracy': 0.349}, 32.39383960000009)
DEBUG flower 2023-01-25 16:41:06,400 | server.py:165 | evaluate_round 1: strategy sampled 3 clients (out of 5)


Server-side evaluation loss 0.059464130878448485 / accuracy 0.349


DEBUG flower 2023-01-25 16:41:10,526 | server.py:179 | evaluate_round 1 received 3 results and 0 failures
DEBUG flower 2023-01-25 16:41:10,527 | server.py:215 | fit_round 2: strategy sampled 5 clients (out of 5)
DEBUG flower 2023-01-25 16:41:37,884 | server.py:229 | fit_round 2 received 5 results and 0 failures
INFO flower 2023-01-25 16:41:40,884 | server.py:116 | fit progress: (2, 0.050110796689987185, {'accuracy': 0.453}, 66.87948580000011)
DEBUG flower 2023-01-25 16:41:40,885 | server.py:165 | evaluate_round 2: strategy sampled 3 clients (out of 5)


Server-side evaluation loss 0.050110796689987185 / accuracy 0.453


DEBUG flower 2023-01-25 16:41:47,166 | server.py:179 | evaluate_round 2 received 3 results and 0 failures
INFO flower 2023-01-25 16:41:47,167 | server.py:144 | FL finished in 73.16257710000014
INFO flower 2023-01-25 16:41:47,168 | app.py:192 | app_fit: losses_distributed [(1, 0.059504249890645344), (2, 0.05002546882629394)]
INFO flower 2023-01-25 16:41:47,169 | app.py:193 | app_fit: metrics_distributed {'accuracy': [(1, 0.35433333333333333), (2, 0.42966666666666664)]}
INFO flower 2023-01-25 16:41:47,170 | app.py:194 | app_fit: losses_centralized [(0, 0.07369909429550171), (1, 0.059464130878448485), (2, 0.050110796689987185)]
INFO flower 2023-01-25 16:41:47,171 | app.py:195 | app_fit: metrics_centralized {'accuracy': [(0, 0.098), (1, 0.349), (2, 0.453)]}


History (loss, distributed):
	round 1: 0.059504249890645344
	round 2: 0.05002546882629394
History (loss, centralized):
	round 0: 0.07369909429550171
	round 1: 0.059464130878448485
	round 2: 0.050110796689987185
History (metrics, distributed):
{'accuracy': [(1, 0.35433333333333333), (2, 0.42966666666666664)]}History (metrics, centralized):
{'accuracy': [(0, 0.098), (1, 0.349), (2, 0.453)]}

In [None]:
# Create FedAvg strategy
strategy = FedAvgSmc(
    fraction_fit=1.0,  # Sample 100% of available clients for training
    fraction_evaluate=0.5,  # Sample 50% of available clients for evaluation
    min_fit_clients=5,  # Never sample less than 5 clients for training
    min_evaluate_clients=3,  # Never sample less than 3 clients for evaluation
    min_available_clients=5,  # Wait until 5 clients are available
    evaluate_metrics_aggregation_fn=weighted_average,  # pass the metric aggregation function
    evaluate_fn=evaluate,  # Pass the evaluation function
)

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=10),
    strategy=strategy,
)