# Client and NumPyClient

Welcome to the fourth part of the Flower federated learning tutorial. In the previous parts of this tutorial, we introduced federated learning with PyTorch and Flower ([part 1](https://flower.dev/docs/tutorial/Flower-1-Intro-to-FL-PyTorch.html)), we learned how strategies can be used to customize the execution on both the server and the clients ([part 2](https://flower.dev/docs/tutorial/Flower-2-Strategies-in-FL-PyTorch.html)), and we built our own custom strategy from scratch ([part 3 - WIP](https://flower.dev/docs/tutorial/Flower-3-Building-a-Strategy-PyTorch.html)).

In this notebook, we revisit `NumPyClient` and introduce a new baseclass for building clients, simply named `Client`. In previous parts of this tutorial, we've based our client on `NumPyClient`, a convenience class which makes it easy to work with machine learning libraries that have good NumPy interoperability. With `Client`, we gain a lot of flexibility that we didn't have before, but we'll also have to do a few things the we didn't have to do before.

> Join the Flower community on Slack to connect, ask questions, and get help: [Join Slack](https://flower.dev/join-slack) 🌻 We'd love to hear from you in the `#introductions` channel! If anything is unclear, head over to the `#questions` channel.

Let's go deeper and see what it takes to move from `NumPyClient` to `Client`!

## Step 0: Preparation

Before we begin with the actual code, let's make sure that we have everything we need.

### Installing dependencies

First, we install the necessary packages:

In [1]:
%pip install -q flwr[simulation] torch torchvision scipy

Note: you may need to restart the kernel to use updated packages.


Now that we have all dependencies installed, we can import everything we need for this tutorial:

In [2]:
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import CIFAR10

import flwr as fl

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(
    f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}"
)

Training on cpu using PyTorch 1.13.1+cu117 and Flower 1.3.0


It is possible to switch to a runtime that has GPU acceleration enabled (on Google Colab: `Runtime > Change runtime type > Hardware acclerator: GPU > Save`). Note, however, that Google Colab is not always able to offer GPU acceleration. If you see an error related to GPU availability in one of the following sections, consider switching back to CPU-based execution by setting `DEVICE = torch.device("cpu")`. If the runtime has GPU acceleration enabled, you should see the output `Training on cuda`, otherwise it'll say `Training on cpu`.

### Data loading

Let's now load the CIFAR-10 training and test set, partition them into ten smaller datasets (each split into training and validation set), and wrap everything in their own `DataLoader`.

In [3]:
NUM_CLIENTS = 10


def load_datasets(num_clients: int):
    # Download and transform CIFAR-10 (train and test)
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )
    trainset = CIFAR10("./dataset", train=True, download=True, transform=transform)
    testset = CIFAR10("./dataset", train=False, download=True, transform=transform)

    # Split training set into `num_clients` partitions to simulate different local datasets
    partition_size = len(trainset) // num_clients
    lengths = [partition_size] * num_clients
    datasets = random_split(trainset, lengths, torch.Generator().manual_seed(42))

    # Split each partition into train/val and create DataLoader
    trainloaders = []
    valloaders = []
    for ds in datasets:
        len_val = len(ds) // 10  # 10 % validation set
        len_train = len(ds) - len_val
        lengths = [len_train, len_val]
        ds_train, ds_val = random_split(ds, lengths, torch.Generator().manual_seed(42))
        trainloaders.append(DataLoader(ds_train, batch_size=32, shuffle=True))
        valloaders.append(DataLoader(ds_val, batch_size=32))
    testloader = DataLoader(testset, batch_size=32)
    return trainloaders, valloaders, testloader


trainloaders, valloaders, testloader = load_datasets(NUM_CLIENTS)

Files already downloaded and verified
Files already downloaded and verified


### Model training/evaluation

Let's continue with the usual model definition (including `set_parameters` and `get_parameters`), training and test functions:

In [4]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def train(net, trainloader, epochs: int):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for images, labels in trainloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(net(images), labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

## Step 1: Revisiting NumPyClient

So far, we've implemented our client by subclassing `flwr.client.NumPyClient`. The three methods we implemented are `get_parameters`, `fit`, and `evaluate`. Finally, we wrap the creation of instances of this class in a function called `client_fn`:

In [5]:
class FlowerNumPyClient(fl.client.NumPyClient):
    def __init__(self, cid, net, trainloader, valloader):
        self.cid = cid
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.cid}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        print(f"[Client {self.cid}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.cid}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def numpyclient_fn(cid) -> FlowerNumPyClient:
    net = Net().to(DEVICE)
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]
    return FlowerNumPyClient(cid, net, trainloader, valloader)

We've seen this before, there's nothing new so far. The only *tiny* difference compared to the previous notebook is naming, we've changed `FlowerClient` to `FlowerNumPyClient` and `client_fn` to `numpyclient_fn`. Let's run it to see the output we get:

In [6]:
# Specify client resources if you need GPU (defaults to 1 CPU and 0 GPU)
client_resources = None
if DEVICE.type == "cuda":
    client_resources = {"num_gpus": 1}

fl.simulation.start_simulation(
    client_fn=numpyclient_fn,
    num_clients=2,
    config=fl.server.ServerConfig(num_rounds=3),
    client_resources=client_resources,
)

INFO flwr 2023-02-27 16:11:41,685 | app.py:145 | Starting Flower simulation, config: ServerConfig(num_rounds=3, round_timeout=None)
2023-02-27 16:11:55,039	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
E0227 16:12:01.195913400   22567 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510721.195837100","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
INFO flwr 2023-02-27 16:12:01,273 | app.py:179 | Flower VCE: Ray initialized with resources: {'memory': 736889243.0, 'node:10.246.68.42': 1.0, 'CPU': 8.0, 'object_store_memory': 368444620.0}
INFO flwr 2023-02-27 16:12:01,277 | server.py:86 | Initializing global parameters
INFO flwr 2023-02-27 16:12:01,278 | server.py:270 | Requesting initial parameters from one random client
INF

[2m[36m(launch_and_get_parameters pid=23406)[0m [Client 1] get_parameters


[2m[33m(raylet)[0m E0227 16:12:17.561719300   23757 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510737.561693800","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
[2m[33m(raylet)[0m E0227 16:12:17.562522000   23756 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510737.562491800","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
[2m[33m(raylet)[0m E0227 16:12:18.785518900   23752 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510738.785455700","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/i

[2m[36m(launch_and_fit pid=23404)[0m [Client 1] fit, config: {}


[2m[33m(raylet)[0m E0227 16:12:29.122210200   23938 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510749.122175500","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}


[2m[36m(launch_and_fit pid=23399)[0m [Client 0] fit, config: {}


[2m[33m(raylet)[0m E0227 16:12:49.521757200   24134 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510769.521723700","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
[2m[33m(raylet)[0m [2023-02-27 16:12:55,737 E 23291 23291] (raylet) node_manager.cc:3097: 10 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 07122474f75d006d69ec83b1864a9d954e1ed6b89a51da0f3d576937, IP: 10.246.68.42) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.246.68.42`
[2m[33m(raylet)[0m 
[2m[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisi

[2m[36m(launch_and_fit pid=24186)[0m [Client 0] fit, config: {}
[2m[36m(launch_and_fit pid=24186)[0m Epoch 1: train loss 0.06435199826955795, accuracy 0.236
[2m[36m(launch_and_fit pid=24186)[0m [Client 1] fit, config: {}


DEBUG flwr 2023-02-27 16:13:11,233 | server.py:229 | fit_round 1 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:13:11,257 | server.py:165 | evaluate_round 1: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=24186)[0m Epoch 1: train loss 0.06405399739742279, accuracy 0.24266666666666667


[2m[33m(raylet)[0m E0227 16:13:21.442507600   24290 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510801.442469400","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}


[2m[36m(launch_and_evaluate pid=24181)[0m [Client 0] evaluate, config: {}


DEBUG flwr 2023-02-27 16:13:23,049 | server.py:179 | evaluate_round 1 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:13:23,051 | server.py:215 | fit_round 2: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_evaluate pid=24181)[0m [Client 1] evaluate, config: {}


[2m[33m(raylet)[0m E0227 16:13:32.609665600   24342 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510812.609637400","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}


[2m[36m(launch_and_fit pid=24290)[0m [Client 0] fit, config: {}
[2m[36m(launch_and_fit pid=24290)[0m Epoch 1: train loss 0.056421708315610886, accuracy 0.3382222222222222
[2m[36m(launch_and_fit pid=24290)[0m [Client 1] fit, config: {}


DEBUG flwr 2023-02-27 16:13:41,992 | server.py:229 | fit_round 2 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:13:42,004 | server.py:165 | evaluate_round 2: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=24290)[0m Epoch 1: train loss 0.05615558847784996, accuracy 0.3451111111111111


[2m[33m(raylet)[0m E0227 16:13:51.514430500   24419 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510831.514408600","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}


[2m[36m(launch_and_evaluate pid=24342)[0m [Client 1] evaluate, config: {}


DEBUG flwr 2023-02-27 16:13:53,344 | server.py:179 | evaluate_round 2 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:13:53,345 | server.py:215 | fit_round 3: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_evaluate pid=24342)[0m [Client 0] evaluate, config: {}


[2m[33m(raylet)[0m [2023-02-27 16:13:55,738 E 23291 23291] (raylet) node_manager.cc:3097: 3 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 07122474f75d006d69ec83b1864a9d954e1ed6b89a51da0f3d576937, IP: 10.246.68.42) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.246.68.42`
[2m[33m(raylet)[0m 
[2m[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[2m[36m(launch_and_fit pid=24342)[0m [Client 1] fit, config: {}
[2m[36m(launch_and_fit pid=24342)[0m Epoch 1: train loss 0.05158519744873047, accuracy 0.3988888888888889
[2m[36m(launch_and_fit pid=24342)[0m [Client 0] fit, config: {}


DEBUG flwr 2023-02-27 16:14:01,998 | server.py:229 | fit_round 3 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:14:02,005 | server.py:165 | evaluate_round 3: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=24342)[0m Epoch 1: train loss 0.05219053477048874, accuracy 0.39444444444444443
[2m[36m(launch_and_evaluate pid=24342)[0m [Client 0] evaluate, config: {}


DEBUG flwr 2023-02-27 16:14:06,179 | server.py:179 | evaluate_round 3 received 2 results and 0 failures
INFO flwr 2023-02-27 16:14:06,180 | server.py:144 | FL finished in 117.42788120000023
INFO flwr 2023-02-27 16:14:06,182 | app.py:202 | app_fit: losses_distributed [(1, 0.06062890684604645), (2, 0.05516076898574829), (3, 0.05279857933521271)]
INFO flwr 2023-02-27 16:14:06,183 | app.py:203 | app_fit: metrics_distributed {}
INFO flwr 2023-02-27 16:14:06,185 | app.py:204 | app_fit: losses_centralized []
INFO flwr 2023-02-27 16:14:06,186 | app.py:205 | app_fit: metrics_centralized {}


[2m[36m(launch_and_evaluate pid=24342)[0m [Client 1] evaluate, config: {}


History (loss, distributed):
	round 1: 0.06062890684604645
	round 2: 0.05516076898574829
	round 3: 0.05279857933521271

This works as expected, two clients are training for three rounds of federated learning.

Let's dive a little bit deeper and discuss how Flower executes this simulation. Whenever a client is selected to do some work, `start_simulation` calls the function `numpyclient_fn` to create an instance of our `FlowerNumPyClient` (along with loading the model and the data).

But here's the perhaps surprising part: Flower doesn't actually use the `FlowerNumPyClient` object directly. Instead, it wraps the object to makes it look like a subclass of `flwr.client.Client`, not `flwr.client.NumPyClient`. In fact, the Flower core framework doesn't know how to handle `NumPyClient`'s, it only knows how to handle `Client`'s. `NumPyClient` is just a convenience abstraction built on top of `Client`. 

Instead of building on top of `NumPyClient`, we can directly build on top of `Client`.

## Step 2: Moving from `NumPyClient` to `Client`

Let's try to do the same thing using `Client` instead of `NumPyClient`.

In [7]:
from flwr.common import (
    Code,
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    GetParametersIns,
    GetParametersRes,
    Status,
    ndarrays_to_parameters,
    parameters_to_ndarrays,
)


class FlowerClient(fl.client.Client):
    def __init__(self, cid, net, trainloader, valloader):
        self.cid = cid
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, ins: GetParametersIns) -> GetParametersRes:
        print(f"[Client {self.cid}] get_parameters")

        # Get parameters as a list of NumPy ndarray's
        ndarrays: List[np.ndarray] = get_parameters(self.net)

        # Serialize ndarray's into a Parameters object
        parameters = ndarrays_to_parameters(ndarrays)

        # Build and return response
        status = Status(code=Code.OK, message="Success")
        return GetParametersRes(
            status=status,
            parameters=parameters,
        )

    def fit(self, ins: FitIns) -> FitRes:
        print(f"[Client {self.cid}] fit, config: {ins.config}")

        # Deserialize parameters to NumPy ndarray's
        parameters_original = ins.parameters
        ndarrays_original = parameters_to_ndarrays(parameters_original)

        # Update local model, train, get updated parameters
        set_parameters(self.net, ndarrays_original)
        train(self.net, self.trainloader, epochs=1)
        ndarrays_updated = get_parameters(self.net)

        # Serialize ndarray's into a Parameters object
        parameters_updated = ndarrays_to_parameters(ndarrays_updated)

        # Build and return response
        status = Status(code=Code.OK, message="Success")
        return FitRes(
            status=status,
            parameters=parameters_updated,
            num_examples=len(self.trainloader),
            metrics={},
        )

    def evaluate(self, ins: EvaluateIns) -> EvaluateRes:
        print(f"[Client {self.cid}] evaluate, config: {ins.config}")

        # Deserialize parameters to NumPy ndarray's
        parameters_original = ins.parameters
        ndarrays_original = parameters_to_ndarrays(parameters_original)

        set_parameters(self.net, ndarrays_original)
        loss, accuracy = test(self.net, self.valloader)
        # return float(loss), len(self.valloader), {"accuracy": float(accuracy)}

        # Build and return response
        status = Status(code=Code.OK, message="Success")
        return EvaluateRes(
            status=status,
            loss=float(loss),
            num_examples=len(self.valloader),
            metrics={"accuracy": float(accuracy)},
        )


def client_fn(cid) -> FlowerClient:
    net = Net().to(DEVICE)
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]
    return FlowerClient(cid, net, trainloader, valloader)

Before we discuss the code in more detail, let's try to run it! Gotta make sure our new `Client`-based client works, right?

In [8]:
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=2,
    config=fl.server.ServerConfig(num_rounds=3),
    client_resources=client_resources,
)

INFO flwr 2023-02-27 16:14:06,443 | app.py:145 | Starting Flower simulation, config: ServerConfig(num_rounds=3, round_timeout=None)
2023-02-27 16:14:20,176	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
INFO flwr 2023-02-27 16:14:24,734 | app.py:179 | Flower VCE: Ray initialized with resources: {'node:10.246.68.42': 1.0, 'object_store_memory': 1077184512.0, 'CPU': 8.0, 'memory': 2154369024.0}
INFO flwr 2023-02-27 16:14:24,739 | server.py:86 | Initializing global parameters
INFO flwr 2023-02-27 16:14:24,741 | server.py:270 | Requesting initial parameters from one random client
INFO flwr 2023-02-27 16:14:29,362 | server.py:274 | Received initial parameters from one random client
INFO flwr 2023-02-27 16:14:29,363 | server.py:88 | Evaluating initial parameters
INFO flwr 2023-02-27 16:14:29,364 | server.py:101 | FL starting
DEBUG flwr 2023-02-27 16:14:29,365 | server.py:215 | fit_round 1: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_get_parameters pid=24720)[0m [Client 0] get_parameters
[2m[36m(launch_and_fit pid=24720)[0m [Client 0] fit, config: {}
[2m[36m(launch_and_fit pid=24719)[0m [Client 1] fit, config: {}
[2m[36m(launch_and_fit pid=24720)[0m Epoch 1: train loss 0.06488857418298721, accuracy 0.23977777777777778


DEBUG flwr 2023-02-27 16:14:39,145 | server.py:229 | fit_round 1 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:14:39,153 | server.py:165 | evaluate_round 1: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=24719)[0m Epoch 1: train loss 0.06471448391675949, accuracy 0.23933333333333334


DEBUG flwr 2023-02-27 16:14:43,426 | server.py:179 | evaluate_round 1 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:14:43,428 | server.py:215 | fit_round 2: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_evaluate pid=24720)[0m [Client 1] evaluate, config: {}
[2m[36m(launch_and_evaluate pid=24719)[0m [Client 0] evaluate, config: {}
[2m[36m(launch_and_fit pid=24720)[0m [Client 0] fit, config: {}
[2m[36m(launch_and_fit pid=24719)[0m [Client 1] fit, config: {}


DEBUG flwr 2023-02-27 16:14:50,619 | server.py:229 | fit_round 2 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:14:50,625 | server.py:165 | evaluate_round 2: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=24720)[0m Epoch 1: train loss 0.05635707080364227, accuracy 0.3468888888888889
[2m[36m(launch_and_fit pid=24719)[0m Epoch 1: train loss 0.056402165442705154, accuracy 0.34955555555555556


[2m[33m(raylet)[0m E0227 16:14:57.964381400   25097 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510897.964359200","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
[2m[33m(raylet)[0m E0227 16:14:57.981632700   25091 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510897.981600000","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
[2m[33m(raylet)[0m E0227 16:14:58.090452900   25096 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677510898.090420100","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/i

[2m[36m(launch_and_evaluate pid=24718)[0m [Client 0] evaluate, config: {}


DEBUG flwr 2023-02-27 16:15:09,181 | server.py:179 | evaluate_round 2 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:15:09,182 | server.py:215 | fit_round 3: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_evaluate pid=24716)[0m [Client 1] evaluate, config: {}
[2m[36m(launch_and_fit pid=24716)[0m [Client 1] fit, config: {}
[2m[36m(launch_and_fit pid=24718)[0m [Client 0] fit, config: {}


DEBUG flwr 2023-02-27 16:15:16,695 | server.py:229 | fit_round 3 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:15:16,701 | server.py:165 | evaluate_round 3: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=24716)[0m Epoch 1: train loss 0.05312047526240349, accuracy 0.37022222222222223
[2m[36m(launch_and_fit pid=24718)[0m Epoch 1: train loss 0.052913542836904526, accuracy 0.3808888888888889


[2m[33m(raylet)[0m [2023-02-27 16:15:20,119 E 24621 24621] (raylet) node_manager.cc:3097: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d5cc8c20c62c25abd0feebc37abf6baa3033ead4735724f1c626064d, IP: 10.246.68.42) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.246.68.42`
[2m[33m(raylet)[0m 
[2m[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[2m[36m(launch_and_evaluate pid=24718)[0m [Client 0] evaluate, config: {}
[2m[36m(launch_and_evaluate pid=24716)[0m [Client 1] evaluate, config: {}


DEBUG flwr 2023-02-27 16:15:21,288 | server.py:179 | evaluate_round 3 received 2 results and 0 failures
INFO flwr 2023-02-27 16:15:21,290 | server.py:144 | FL finished in 51.92557250000027
INFO flwr 2023-02-27 16:15:21,291 | app.py:202 | app_fit: losses_distributed [(1, 0.061871119976043706), (2, 0.055656445860862734), (3, 0.05376592624187469)]
INFO flwr 2023-02-27 16:15:21,292 | app.py:203 | app_fit: metrics_distributed {}
INFO flwr 2023-02-27 16:15:21,295 | app.py:204 | app_fit: losses_centralized []
INFO flwr 2023-02-27 16:15:21,297 | app.py:205 | app_fit: metrics_centralized {}


History (loss, distributed):
	round 1: 0.061871119976043706
	round 2: 0.055656445860862734
	round 3: 0.05376592624187469

That's it, we're now using `Client`. It probably looks similar to what we've done with `NumPyClient`. So what's the difference?

First of all, it's more code. But why? The difference comes from the fact that `Client` expects us to take care of parameter serialization and deserialization. For Flower to be able to send parameters over the network, it eventually needs to turn these parameters into `bytes`. Turning parameters (e.g., NumPy `ndarray`'s) into raw bytes is called serialization. Turning raw bytes into something more useful (like NumPy `ndarray`'s) is called deserialization. Flower needs to do both: it needs to serialize parameters on the server-side and send them to the client, the client needs to deserialize them to use them for local training, and then serialize the updated parameters again to send them back to the server, which (finally!) deserializes them again in order to aggregate them with the updates received from other clients.

The only *real* difference between Client and NumPyClient is that NumPyClient takes care of serialization and deserialization for you. It can do so because it expects you to return parameters as NumPy ndarray's, and it knows how to handle these. This makes working with machine learning libraries that have good NumPy support (most of them) a breeze.

In terms of API, there's one major difference: all methods in Client take exactly one argument (e.g., `FitIns` in `Client.fit`) and return exactly one value (e.g., `FitRes` in `Client.fit`). The methods in `NumPyClient` on the other hand have multiple arguments (e.g., `parameters` and `config` in `NumPyClient.fit`) and multiple return values (e.g., `parameters`, `num_example`, and `metrics` in `NumPyClient.fit`) if there are multiple things to handle. These `*Ins` and `*Res` objects in `Client` wrap all the individual values you're used to from `NumPyClient`.

## Step 3: Custom serialization

Here we will explore how to implement custom serialization with a simple example.

But first what is serialization? Serialization is just the process of converting an object into raw bytes, and equally as important,
deserialization is the process of converting raw bytes back into an object. This is very useful for network communication.
Indeed, without serialization, you could not just a Python object through the internet.

Federated Learning relies heavily on internet communication for training by sending Python objects back and forth between the clients and 
the server. This means that serialization is an essential part of Federated Learning.

In the following section, we will write a basic example where instead of sending a serialized version of our `ndarray`s containing our parameters,
we will first convert the `ndarray` into sparse matrices, before sending them. This technique can be used to save bandwidth, as in certain cases
where the weights of a model are sparse (containing many 0 entries), converting them to a sparse matrix can greatly improve their bytesize.

### Our custom serialization/deserialization functions

This is where the real serialization/deserialization will happen, especially in `ndarray_to_sparse_bytes` for serialization and 
`sparse_bytes_to_ndarray` for deserialization.

Note that we imported the `scipy.sparse` library in order to convert our arrays.

In [9]:
from io import BytesIO
from typing import cast

import numpy as np

from flwr.common.typing import NDArray, NDArrays, Parameters


def ndarrays_to_sparse_parameters(ndarrays: NDArrays) -> Parameters:
    """Convert NumPy ndarrays to parameters object."""
    tensors = [ndarray_to_sparse_bytes(ndarray) for ndarray in ndarrays]
    return Parameters(tensors=tensors, tensor_type="numpy.ndarray")


def sparse_parameters_to_ndarrays(parameters: Parameters) -> NDArrays:
    """Convert parameters object to NumPy ndarrays."""
    return [sparse_bytes_to_ndarray(tensor) for tensor in parameters.tensors]


def ndarray_to_sparse_bytes(ndarray: NDArray) -> bytes:
    """Serialize NumPy ndarray to bytes."""
    bytes_io = BytesIO()

    if len(ndarray.shape) > 1:
        # We convert our ndarray into a sparse matrix
        ndarray = torch.tensor(ndarray).to_sparse_csr()

        # And send it by utilizng the sparse matrix attributes
        # WARNING: NEVER set allow_pickle to true.
        # Reason: loading pickled data can execute arbitrary code
        # Source: https://numpy.org/doc/stable/reference/generated/numpy.save.html
        np.savez(bytes_io,  # type: ignore
             crow_indices=ndarray.crow_indices(),
             col_indices=ndarray.col_indices(),
             values=ndarray.values(),
             allow_pickle=False)
    else:
        # WARNING: NEVER set allow_pickle to true.
        # Reason: loading pickled data can execute arbitrary code
        # Source: https://numpy.org/doc/stable/reference/generated/numpy.save.html
        np.save(bytes_io, ndarray, allow_pickle=False)
    return bytes_io.getvalue()


def sparse_bytes_to_ndarray(tensor: bytes) -> NDArray:
    """Deserialize NumPy ndarray from bytes."""
    bytes_io = BytesIO(tensor)
    # WARNING: NEVER set allow_pickle to true.
    # Reason: loading pickled data can execute arbitrary code
    # Source: https://numpy.org/doc/stable/reference/generated/numpy.load.html
    loader = np.load(bytes_io, allow_pickle=False)  # type: ignore

    if 'crow_indices' in loader:
        # We convert our sparse matrix back to a ndarray, using the attributes we sent
        ndarray_deserialized = torch.sparse_csr_tensor(
            crow_indices=loader['crow_indices'],
            col_indices=loader['col_indices'],
            values=loader['values'],
        ).to_dense().numpy()
    else:
        ndarray_deserialized = loader
    return cast(NDArray, ndarray_deserialized)

### Client-side

To be able to able to serialize our `ndarray`s into sparse parameters, we will just have to call our custom functions in our `flwr.client.Client`.

Indeed, in `get_parameters` we need to serialize the parameters we got from our network using our custom `ndarrays_to_sparse_parameters` defined above.

In `fit`, we first need to deserialize the parameters coming from the server using our custom `sparse_parameters_to_ndarrays` and then we need to 
serialize our local results with `ndarrays_to_sparse_parameters`.

In `evaluate`, we will only need to deserialize the global parameters with our custom function.

In [10]:
from flwr.common import (
    Code,
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    GetParametersIns,
    GetParametersRes,
    Status,
)


class FlowerClient(fl.client.Client):
    def __init__(self, cid, net, trainloader, valloader):
        self.cid = cid
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, ins: GetParametersIns) -> GetParametersRes:
        print(f"[Client {self.cid}] get_parameters")

        # Get parameters as a list of NumPy ndarray's
        ndarrays: List[np.ndarray] = get_parameters(self.net)

        # Serialize ndarray's into a Parameters object using our custom function
        parameters = ndarrays_to_sparse_parameters(ndarrays)

        # Build and return response
        status = Status(code=Code.OK, message="Success")
        return GetParametersRes(
            status=status,
            parameters=parameters,
        )

    def fit(self, ins: FitIns) -> FitRes:
        print(f"[Client {self.cid}] fit, config: {ins.config}")

        # Deserialize parameters to NumPy ndarray's using our custom function
        parameters_original = ins.parameters
        ndarrays_original = sparse_parameters_to_ndarrays(parameters_original)

        # Update local model, train, get updated parameters
        set_parameters(self.net, ndarrays_original)
        train(self.net, self.trainloader, epochs=1)
        ndarrays_updated = get_parameters(self.net)

        # Serialize ndarray's into a Parameters object using our custom function
        parameters_updated = ndarrays_to_sparse_parameters(ndarrays_updated)

        # Build and return response
        status = Status(code=Code.OK, message="Success")
        return FitRes(
            status=status,
            parameters=parameters_updated,
            num_examples=len(self.trainloader),
            metrics={},
        )

    def evaluate(self, ins: EvaluateIns) -> EvaluateRes:
        print(f"[Client {self.cid}] evaluate, config: {ins.config}")

        # Deserialize parameters to NumPy ndarray's using our custom function
        parameters_original = ins.parameters
        ndarrays_original = sparse_parameters_to_ndarrays(parameters_original)

        set_parameters(self.net, ndarrays_original)
        loss, accuracy = test(self.net, self.valloader)

        # Build and return response
        status = Status(code=Code.OK, message="Success")
        return EvaluateRes(
            status=status,
            loss=float(loss),
            num_examples=len(self.valloader),
            metrics={"accuracy": float(accuracy)},
        )


def client_fn(cid) -> FlowerClient:
    net = Net().to(DEVICE)
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]
    return FlowerClient(cid, net, trainloader, valloader)

### Server-side

For this example, we will just use `FedAvg` as a strategy. 
To change the serialization and deserialization here, we only need to reimplement the `evaluate` and `aggregate_fit` functions of `FedAvg`.
The other functions of the strategy will be inherited from the super class `FedAvg`.

As you can see only one line as change in `evaluate`:

```python
parameters_ndarrays = sparse_parameters_to_ndarrays(parameters)
```

And for `aggregate_fit`, we will first deserialize every result we received:

```python
weights_results = [
    (sparse_parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
    for _, fit_res in results
]
```

And then serialize the aggregated result:

```python
parameters_aggregated = ndarrays_to_sparse_parameters(aggregate(weights_results))
```

In [11]:
from logging import WARNING
from typing import Callable, Dict, List, Optional, Tuple, Union

from flwr.common import FitRes, MetricsAggregationFn, NDArrays, Parameters, Scalar
from flwr.common.logger import log
from flwr.server.client_proxy import ClientProxy
from flwr.server.strategy import FedAvg
from flwr.server.strategy.aggregate import aggregate

WARNING_MIN_AVAILABLE_CLIENTS_TOO_LOW = """
Setting `min_available_clients` lower than `min_fit_clients` or
`min_evaluate_clients` can cause the server to fail when there are too few clients
connected to the server. `min_available_clients` must be set to a value larger
than or equal to the values of `min_fit_clients` and `min_evaluate_clients`.
"""


class FedSparse(FedAvg):
    def __init__(
        self,
        *,
        fraction_fit: float = 1.0,
        fraction_evaluate: float = 1.0,
        min_fit_clients: int = 2,
        min_evaluate_clients: int = 2,
        min_available_clients: int = 2,
        evaluate_fn: Optional[
            Callable[
                [int, NDArrays, Dict[str, Scalar]],
                Optional[Tuple[float, Dict[str, Scalar]]],
            ]
        ] = None,
        on_fit_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
        on_evaluate_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
        accept_failures: bool = True,
        initial_parameters: Optional[Parameters] = None,
        fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
        evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
    ) -> None:
        """Custom FedAvg strategy with sparse matrices.

        Parameters
        ----------
        fraction_fit : float, optional
            Fraction of clients used during training. Defaults to 0.1.
        fraction_evaluate : float, optional
            Fraction of clients used during validation. Defaults to 0.1.
        min_fit_clients : int, optional
            Minimum number of clients used during training. Defaults to 2.
        min_evaluate_clients : int, optional
            Minimum number of clients used during validation. Defaults to 2.
        min_available_clients : int, optional
            Minimum number of total clients in the system. Defaults to 2.
        evaluate_fn : Optional[Callable[[int, NDArrays, Dict[str, Scalar]], Optional[Tuple[float, Dict[str, Scalar]]]]]
            Optional function used for validation. Defaults to None.
        on_fit_config_fn : Callable[[int], Dict[str, Scalar]], optional
            Function used to configure training. Defaults to None.
        on_evaluate_config_fn : Callable[[int], Dict[str, Scalar]], optional
            Function used to configure validation. Defaults to None.
        accept_failures : bool, optional
            Whether or not accept rounds containing failures. Defaults to True.
        initial_parameters : Parameters, optional
            Initial global model parameters.
        """

        if (
            min_fit_clients > min_available_clients
            or min_evaluate_clients > min_available_clients
        ):
            log(WARNING, WARNING_MIN_AVAILABLE_CLIENTS_TOO_LOW)

        super().__init__(
            fraction_fit=fraction_fit,
            fraction_evaluate=fraction_evaluate,
            min_fit_clients=min_fit_clients,
            min_evaluate_clients=min_evaluate_clients,
            min_available_clients=min_available_clients,
            evaluate_fn=evaluate_fn,
            on_fit_config_fn=on_fit_config_fn,
            on_evaluate_config_fn=on_evaluate_config_fn,
            accept_failures=accept_failures,
            initial_parameters=initial_parameters,
            fit_metrics_aggregation_fn=fit_metrics_aggregation_fn,
            evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn,
        )

    def evaluate(
        self, server_round: int, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        """Evaluate model parameters using an evaluation function."""
        if self.evaluate_fn is None:
            # No evaluation function provided
            return None

        # We deserialize using our custom method
        parameters_ndarrays = sparse_parameters_to_ndarrays(parameters)

        eval_res = self.evaluate_fn(server_round, parameters_ndarrays, {})
        if eval_res is None:
            return None
        loss, metrics = eval_res
        return loss, metrics

    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        """Aggregate fit results using weighted average."""
        if not results:
            return None, {}
        # Do not aggregate if there are failures and failures are not accepted
        if not self.accept_failures and failures:
            return None, {}

        # We deserialize each of the results with our custom method
        weights_results = [
            (sparse_parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
            for _, fit_res in results
        ]

        # We serialize the aggregated result using our cutom method
        parameters_aggregated = ndarrays_to_sparse_parameters(
            aggregate(weights_results)
        )

        # Aggregate custom metrics if aggregation fn was provided
        metrics_aggregated = {}
        if self.fit_metrics_aggregation_fn:
            fit_metrics = [(res.num_examples, res.metrics) for _, res in results]
            metrics_aggregated = self.fit_metrics_aggregation_fn(fit_metrics)
        elif server_round == 1:  # Only log this warning once
            log(WARNING, "No fit_metrics_aggregation_fn provided")

        return parameters_aggregated, metrics_aggregated

We can now run our custom serialization example!

In [12]:
strategy = FedSparse()

fl.simulation.start_simulation(
    strategy=strategy,
    client_fn=client_fn,
    num_clients=2,
    config=fl.server.ServerConfig(num_rounds=3),
    client_resources=client_resources,
)

INFO flwr 2023-02-27 16:15:25,575 | app.py:145 | Starting Flower simulation, config: ServerConfig(num_rounds=3, round_timeout=None)
2023-02-27 16:15:46,361	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
INFO flwr 2023-02-27 16:15:51,047 | app.py:179 | Flower VCE: Ray initialized with resources: {'memory': 2731558503.0, 'node:10.246.68.42': 1.0, 'object_store_memory': 1365779251.0, 'CPU': 8.0}
INFO flwr 2023-02-27 16:15:51,049 | server.py:86 | Initializing global parameters
INFO flwr 2023-02-27 16:15:51,051 | server.py:270 | Requesting initial parameters from one random client
INFO flwr 2023-02-27 16:15:56,709 | server.py:274 | Received initial parameters from one random client
INFO flwr 2023-02-27 16:15:56,711 | server.py:88 | Evaluating initial parameters
INFO flwr 2023-02-27 16:15:56,712 | server.py:101 | FL starting
DEBUG flwr 2023-02-27 16:15:56,713 | server.py:215 | fit_round 1: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_get_parameters pid=25586)[0m [Client 1] get_parameters
[2m[36m(launch_and_fit pid=25586)[0m [Client 1] fit, config: {}
[2m[36m(launch_and_fit pid=25584)[0m [Client 0] fit, config: {}
[2m[36m(launch_and_fit pid=25586)[0m Epoch 1: train loss 0.06492266803979874, accuracy 0.21155555555555555


DEBUG flwr 2023-02-27 16:16:06,706 | server.py:229 | fit_round 1 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:16:06,719 | server.py:165 | evaluate_round 1: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=25584)[0m Epoch 1: train loss 0.06481149792671204, accuracy 0.21866666666666668


DEBUG flwr 2023-02-27 16:16:11,921 | server.py:179 | evaluate_round 1 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:16:11,923 | server.py:215 | fit_round 2: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_evaluate pid=25586)[0m [Client 0] evaluate, config: {}
[2m[36m(launch_and_evaluate pid=25584)[0m [Client 1] evaluate, config: {}
[2m[36m(launch_and_fit pid=25584)[0m [Client 0] fit, config: {}
[2m[36m(launch_and_fit pid=25586)[0m [Client 1] fit, config: {}


DEBUG flwr 2023-02-27 16:16:21,236 | server.py:229 | fit_round 2 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:16:21,249 | server.py:165 | evaluate_round 2: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=25586)[0m Epoch 1: train loss 0.05685392767190933, accuracy 0.328
[2m[36m(launch_and_fit pid=25584)[0m Epoch 1: train loss 0.05816032737493515, accuracy 0.31066666666666665
[2m[36m(launch_and_evaluate pid=25584)[0m [Client 1] evaluate, config: {}
[2m[36m(launch_and_evaluate pid=25586)[0m [Client 0] evaluate, config: {}


DEBUG flwr 2023-02-27 16:16:26,089 | server.py:179 | evaluate_round 2 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:16:26,101 | server.py:215 | fit_round 3: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=25584)[0m [Client 0] fit, config: {}
[2m[36m(launch_and_fit pid=25586)[0m [Client 1] fit, config: {}


DEBUG flwr 2023-02-27 16:16:32,806 | server.py:229 | fit_round 3 received 2 results and 0 failures
DEBUG flwr 2023-02-27 16:16:32,813 | server.py:165 | evaluate_round 3: strategy sampled 2 clients (out of 2)


[2m[36m(launch_and_fit pid=25584)[0m Epoch 1: train loss 0.053219303488731384, accuracy 0.3748888888888889
[2m[36m(launch_and_fit pid=25586)[0m Epoch 1: train loss 0.05329502001404762, accuracy 0.37422222222222223


[2m[33m(raylet)[0m E0227 16:16:40.024104000   25980 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677511000.024078300","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
[2m[33m(raylet)[0m E0227 16:16:40.172487000   25982 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677511000.172454400","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
[2m[33m(raylet)[0m E0227 16:16:40.355538500   25983 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677511000.355478000","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/i

[2m[36m(launch_and_evaluate pid=25586)[0m [Client 1] evaluate, config: {}


[2m[33m(raylet)[0m E0227 16:16:45.970950300   26121 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677511005.970914100","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
DEBUG flwr 2023-02-27 16:16:46,820 | server.py:179 | evaluate_round 3 received 2 results and 0 failures
INFO flwr 2023-02-27 16:16:46,821 | server.py:144 | FL finished in 50.10799959999986
INFO flwr 2023-02-27 16:16:46,822 | app.py:202 | app_fit: losses_distributed [(1, 0.06226513278484344), (2, 0.057180313706398006), (3, 0.05326614594459533)]
INFO flwr 2023-02-27 16:16:46,830 | app.py:203 | app_fit: metrics_distributed {}
INFO flwr 2023-02-27 16:16:46,835 | app.py:204 | app_fit: losses_centralized []
INFO flwr 2023-02-27 16:16:46,837 | app.py:205 | app_fit: metrics_centralized {}


[2m[36m(launch_and_evaluate pid=25586)[0m [Client 0] evaluate, config: {}


History (loss, distributed):
	round 1: 0.06226513278484344
	round 2: 0.057180313706398006
	round 3: 0.05326614594459533

[2m[33m(raylet)[0m E0227 16:16:48.454009900   26158 socket_utils_common_posix.cc:223] check for SO_REUSEPORT: {"created":"@1677511008.453989700","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":202,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
[2m[33m(raylet)[0m [2023-02-27 16:42:46,349 E 25485 25485] (raylet) node_manager.cc:3097: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 8e63785e45b3e494c6438d5d24d43a1972b0a57bc1f0f109a0d5ce9b, IP: 10.246.68.42) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.246.68.42`
[2m[33m(raylet)[0m 
[2m[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisio

## Recap

In this part of the tutorial, we've seen how we can build clients by subclassing either `NumPyClient` or `Client`. `NumPyClient` is a convenience abstraction that makes it easier to work with machine learning libraries that have good NumPy interoperability. `Client` is a more flexible abstraction that allows us to do things that are not possible in `NumPyClient`. In order to do so, it requires us to handle parameter serialization and deserialization ourselves.

## Next steps

Before you continue, make sure to join the Flower community on Slack: [Join Slack](https://flower.dev/join-slack/)

There's a dedicated `#questions` channel if you need help, but we'd also love to hear who you are in `#introductions`!

This is the final part of the Flower tutorial (for now!), congratulations! You're now well equipped to understand the rest of the documentation. There are many topics we didn't cover in the tutorial, we recommend the following resources:

- [Read Flower Docs](https://flower.dev/docs/)
- [Check out Flower Code Examples](https://github.com/adap/flower/tree/main/examples)
- [Use Flower Baselines for your research](https://flower.dev/docs/using-baselines.html)
- [Watch Flower Summit 2022 videos](https://flower.dev/conf/flower-summit-2022/)
