<a href="https://colab.research.google.com/github/artsasse/fedkan/blob/main/Flower_MNIST_MLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Using a Federated MLP to classify MNIST

This notebook is based mainly on the Flower Tutorial "Use a federated learning strategy", found in https://flower.ai/docs/framework/tutorial-series-use-a-federated-learning-strategy-pytorch.html .

## Dependencies

In [1]:
pip install -q flwr[simulation] flwr-datasets[vision] torch torchvision

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.1/65.1 MB[0m [31m8.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.8/40.8 MB[0m [31m19.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m51.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m542.1/542.1 kB[0m [31m27.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.0/172.0 kB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.3/8.3 MB[0m [31m95.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.6/294.6 kB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m52.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

import flwr
from flwr.client import Client, ClientApp, NumPyClient
from flwr.server import ServerApp, ServerConfig, ServerAppComponents
from flwr.server.strategy import FedAvg, FedAdagrad
from flwr.simulation import run_simulation
from flwr_datasets import FederatedDataset
from flwr.common import ndarrays_to_parameters, NDArrays, Scalar, Context

# Preciso alterar o runtime para usar GPU (SASSE)
# DEVICE = torch.device("cuda")  # Try "cuda" to train on GPU
DEVICE = torch.device("cpu")  # Run training on CPU
print(f"Training on {DEVICE}")
print(f"Flower {flwr.__version__} / PyTorch {torch.__version__}")

Training on cpu
Flower 1.10.0 / PyTorch 2.4.0+cu121


## Data loading

In [3]:
NUM_PARTITIONS = 10
# SASSE - confirmar o batch size
BATCH_SIZE = 32


def load_datasets(partition_id: int, num_partitions: int):

    pytorch_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5,), (0.5,)),
         transforms.Lambda(lambda x: torch.flatten(x))  # Flatten the image into a 1D tensor
         ]
    )

    def apply_transforms(batch):
        # Instead of passing transforms to MNIST(..., transform=transform)
        # we will use this function to dataset.with_transform(apply_transforms)
        # The transforms object is exactly the same
        batch["image"] = [pytorch_transforms(img) for img in batch["image"]]
        return batch

    fds = FederatedDataset(dataset="mnist", partitioners={"train": num_partitions})
    partition = fds.load_partition(partition_id).with_transform(apply_transforms)
    trainloader = DataLoader(partition, batch_size=BATCH_SIZE, shuffle=True)

    testset = fds.load_split("test").with_transform(apply_transforms)
    testloader = DataLoader(testset, batch_size=BATCH_SIZE)

    return trainloader, testloader

  and should_run_async(code)


## Model training/evaluation (PyTorch)

In [61]:
# class Net(nn.Module):

#     def __init__(self) -> None:
#         super(Net, self).__init__()
#         self.layer1 = nn.Linear(28 * 28, 200)  # 28 x 28 pixels
#         self.layer2 = nn.Linear(200, 200)  # 2 hidden layers with 200 neurons each
#         self.layer3 = nn.Linear(200, 10)  # 10 classes
#         self.relu = nn.ReLU()
#         self.softmax = nn.Softmax(dim=1)

#     def forward(self, x: torch.Tensor) -> torch.Tensor:
#         x = self.relu(self.layer1(x))
#         x = self.relu(self.layer2(x))
#         x = self.softmax(self.layer3(x))
#         return x

class Net(KAN):
    def __init__(self) -> None:
        super().__init__([28 * 28, 24, 24, 10])


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def train(net, trainloader, epochs: int):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    # Será que Adam e SGD influenciam a KAN? (SASSE)
    # Estão usando o default para learning rate (lr) e momentum
    # Um dos requisitos para garantir a convergencia é lr decrescente
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for batch in trainloader:
            images, labels = batch["image"], batch["label"]
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(net(images), labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for batch in testloader:
            images, labels = batch["image"], batch["label"]
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

KAN Model:

In [28]:
pip install git+https://github.com/Blealtan/efficient-kan.git

Collecting git+https://github.com/Blealtan/efficient-kan.git
  Cloning https://github.com/Blealtan/efficient-kan.git to /tmp/pip-req-build-moejhply
  Running command git clone --filter=blob:none --quiet https://github.com/Blealtan/efficient-kan.git /tmp/pip-req-build-moejhply
  Resolved https://github.com/Blealtan/efficient-kan.git to commit 7b6ce1c87f18c8bc90c208f6b494042344216b11
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting pytest>=8.2.0 (from efficient-kan==0.1.0)
  Downloading pytest-8.3.2-py3-none-any.whl.metadata (7.5 kB)
Downloading pytest-8.3.2-py3-none-any.whl (341 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m341.8/341.8 kB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: efficient-kan
  Building wheel for efficient-kan (pyproject.toml) ... [?25l[?25hdone
  Created wheel for effic

In [29]:
from efficient_kan import KAN

## Flower Architecture

### Flower client

In [62]:
class FlowerClient(NumPyClient):
    def __init__(self, pid, net, trainloader, valloader=None):
        self.pid = pid  # partition ID of a client
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.pid}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        # Read values from config
        server_round = config["server_round"]
        local_epochs = config["local_epochs"]

        # Use values provided by the config
        print(f"[Client {self.pid}, round {server_round}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=local_epochs)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.pid}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def client_fn(context: Context) -> Client:
    net = Net().to(DEVICE)
    partition_id = context.node_config["partition-id"]
    num_partitions = context.node_config["num-partitions"]
    trainloader, _ = load_datasets(partition_id, num_partitions)
    return FlowerClient(partition_id, net, trainloader).to_client()


# Create the ClientApp
client = ClientApp(client_fn=client_fn)

### Server-side parameter **initialization**

In [63]:
# Create an instance of the model and get the parameters
initial_model = Net()
params = get_parameters(initial_model)

#### Get number of parameters

In [64]:
total_params = sum(p.numel() for p in initial_model.parameters())
print(f"Total number of parameters: {total_params}")

Total number of parameters: 196320


### Server-side parameter **evaluation**

In [65]:
# The `evaluate` function will be called by Flower after every round
def evaluate(
    server_round: int,
    parameters: NDArrays,
    config: Dict[str, Scalar],
) -> Optional[Tuple[float, Dict[str, Scalar]]]:
    net = Net().to(DEVICE)
    _, testloader = load_datasets(0, NUM_PARTITIONS)
    set_parameters(net, parameters)  # Update model with the latest parameters
    loss, accuracy = test(net, testloader)
    print(f"Server-side evaluation loss {loss} / accuracy {accuracy}")
    return loss, {"accuracy": accuracy}

### Training **Configuration** (e.g. epochs)




In [66]:
def fit_config(server_round: int):
    """Return training configuration dict for each round.

    Perform two rounds of training with one local epoch, increase to two local
    epochs afterwards.
    """
    config = {
        "server_round": server_round,  # The current round of federated learning
        "local_epochs": 1 if server_round < 2 else 2,
    }
    return config

### Flower **Server**

In [67]:
def server_fn(context: Context) -> ServerAppComponents:
    # Create FedAvg strategy
    strategy = FedAvg(
        fraction_fit=0.3,
        fraction_evaluate=0,
        min_fit_clients=3,
        min_evaluate_clients=0,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(params),
        evaluate_fn=evaluate,
        on_fit_config_fn=fit_config,  # Pass the fit_config function
    )
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

## Simulation

### Run Simulation

In [68]:
NUM_PARTITIONS = 10

In [69]:
backend_config = {"client_resources": None}
# if DEVICE.type == "cuda":
#     backend_config = {"client_resources": {"num_gpus": 1}}

# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

[92mINFO [0m:      Starting Flower ServerApp, config: num_rounds=3, no round_timeout
[92mINFO [0m:      
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Using initial global parameters provided by strategy
[92mINFO [0m:      Evaluating initial global parameters
  self.pid = _posixsubprocess.fork_exec(
  self.pid = _posixsubprocess.fork_exec(
[36m(pid=15691)[0m 2024-08-27 00:23:02.751760: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(pid=15691)[0m 2024-08-27 00:23:02.800286: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(pid=15691)[0m 2024-08-27 00:23:02.813777: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for pl

Server-side evaluation loss 0.07202646002769471 / accuracy 0.1615


[36m(ClientAppActor pid=15691)[0m see the appropriate new directories, set the environment variable
[36m(ClientAppActor pid=15691)[0m `JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.
[36m(ClientAppActor pid=15691)[0m The use of platformdirs will be the default in `jupyter_core` v6
[36m(ClientAppActor pid=15691)[0m   from jupyter_core.paths import jupyter_data_dir, jupyter_runtime_dir, secure_write


[36m(ClientAppActor pid=15691)[0m [Client 0, round 1] fit, config: {'server_round': 1, 'local_epochs': 1}
[36m(ClientAppActor pid=15691)[0m Epoch 1: train loss 0.03513731434941292, accuracy 0.6765
[36m(ClientAppActor pid=15691)[0m [Client 3, round 1] fit, config: {'server_round': 1, 'local_epochs': 1}
[36m(ClientAppActor pid=15691)[0m Epoch 1: train loss 0.03392567113041878, accuracy 0.6898333333333333
[36m(ClientAppActor pid=15691)[0m [Client 6, round 1] fit, config: {'server_round': 1, 'local_epochs': 1}
[36m(ClientAppActor pid=15691)[0m Epoch 1: train loss 0.03519357740879059, accuracy 0.6795


[92mINFO [0m:      aggregate_fit: received 3 results and 0 failures
[92mINFO [0m:      fit progress: (1, 0.014736710050702095, {'accuracy': 0.8693}, 46.07981303700035)
[92mINFO [0m:      configure_evaluate: no clients selected, skipping evaluation
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 2]
[92mINFO [0m:      configure_fit: strategy sampled 3 clients (out of 10)


Server-side evaluation loss 0.014736710050702095 / accuracy 0.8693
[36m(ClientAppActor pid=15691)[0m [Client 0, round 2] fit, config: {'server_round': 2, 'local_epochs': 2}
[36m(ClientAppActor pid=15691)[0m Epoch 1: train loss 0.013785460032522678, accuracy 0.873
[36m(ClientAppActor pid=15691)[0m Epoch 2: train loss 0.010981257073581219, accuracy 0.897
[36m(ClientAppActor pid=15691)[0m [Client 5, round 2] fit, config: {'server_round': 2, 'local_epochs': 2}
[36m(ClientAppActor pid=15691)[0m Epoch 1: train loss 0.013669862411916256, accuracy 0.8751666666666666
[36m(ClientAppActor pid=15691)[0m Epoch 2: train loss 0.010655165649950504, accuracy 0.9015
[36m(ClientAppActor pid=15691)[0m [Client 9, round 2] fit, config: {'server_round': 2, 'local_epochs': 2}
[36m(ClientAppActor pid=15691)[0m Epoch 1: train loss 0.013523644767701626, accuracy 0.8791666666666667
[36m(ClientAppActor pid=15691)[0m Epoch 2: train loss 0.010517115704715252, accuracy 0.9055


[92mINFO [0m:      aggregate_fit: received 3 results and 0 failures
[92mINFO [0m:      fit progress: (2, 0.010014868995547295, {'accuracy': 0.9044}, 103.55692098600002)
[92mINFO [0m:      configure_evaluate: no clients selected, skipping evaluation
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 3]
[92mINFO [0m:      configure_fit: strategy sampled 3 clients (out of 10)


Server-side evaluation loss 0.010014868995547295 / accuracy 0.9044
[36m(ClientAppActor pid=15691)[0m [Client 0, round 3] fit, config: {'server_round': 3, 'local_epochs': 2}
[36m(ClientAppActor pid=15691)[0m Epoch 1: train loss 0.010251138359308243, accuracy 0.9038333333333334
[36m(ClientAppActor pid=15691)[0m Epoch 2: train loss 0.008859232999384403, accuracy 0.9173333333333333
[36m(ClientAppActor pid=15691)[0m [Client 2, round 3] fit, config: {'server_round': 3, 'local_epochs': 2}
[36m(ClientAppActor pid=15691)[0m Epoch 1: train loss 0.01109837181866169, accuracy 0.8965
[36m(ClientAppActor pid=15691)[0m Epoch 2: train loss 0.00918219331651926, accuracy 0.914
[36m(ClientAppActor pid=15691)[0m [Client 7, round 3] fit, config: {'server_round': 3, 'local_epochs': 2}
[36m(ClientAppActor pid=15691)[0m Epoch 1: train loss 0.010787480510771275, accuracy 0.8946666666666667
[36m(ClientAppActor pid=15691)[0m Epoch 2: train loss 0.009149247780442238, accuracy 0.9113333333333333


[92mINFO [0m:      aggregate_fit: received 3 results and 0 failures
[92mINFO [0m:      fit progress: (3, 0.008601170311495663, {'accuracy': 0.9194}, 160.7633316260003)
[92mINFO [0m:      configure_evaluate: no clients selected, skipping evaluation
[92mINFO [0m:      
[92mINFO [0m:      [SUMMARY]
[92mINFO [0m:      Run finished 3 round(s) in 160.77s
[92mINFO [0m:      	History (loss, centralized):
[92mINFO [0m:      		round 0: 0.07202646002769471
[92mINFO [0m:      		round 1: 0.014736710050702095
[92mINFO [0m:      		round 2: 0.010014868995547295
[92mINFO [0m:      		round 3: 0.008601170311495663
[92mINFO [0m:      	History (metrics, centralized):
[92mINFO [0m:      	{'accuracy': [(0, 0.1615), (1, 0.8693), (2, 0.9044), (3, 0.9194)]}
[92mINFO [0m:      


Server-side evaluation loss 0.008601170311495663 / accuracy 0.9194
