In [1]:
!pip install -q flwr[simulation] flwr-datasets[vision] torch torchvision matplotlib

In [2]:
from collections import OrderedDict
from typing import List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from datasets.utils.logging import disable_progress_bar
from torch.utils.data import DataLoader

import flwr
from flwr.client import Client, ClientApp, NumPyClient
from flwr.common import Metrics, Context
from flwr.server import ServerApp, ServerConfig, ServerAppComponents
from flwr.server.strategy import FedAvg
from flwr.simulation import run_simulation
from flwr_datasets import FederatedDataset

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(f"Training on {DEVICE}")
print(f"Flower {flwr.__version__} / PyTorch {torch.__version__}")
disable_progress_bar()

  from .autonotebook import tqdm as notebook_tqdm
2024-08-05 20:00:18,445	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


Training on cpu
Flower 1.10.0 / PyTorch 2.3.1+cpu


In [3]:
#Cargamos los datos

In [6]:
NUM_CLIENTS = 10
BATCH_SIZE = 32


def load_datasets(partition_id: int):
    fds = FederatedDataset(dataset="cifar10", partitioners={"train": NUM_CLIENTS})
    partition = fds.load_partition(partition_id)
    # Divide data on each node: 80% train, 20% test
    partition_train_test = partition.train_test_split(test_size=0.2, seed=42)
    pytorch_transforms = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )

    def apply_transforms(batch):
        # Instead of passing transforms to CIFAR10(..., transform=transform)
        # we will use this function to dataset.with_transform(apply_transforms)
        # The transforms object is exactly the same
        batch["img"] = [pytorch_transforms(img) for img in batch["img"]]
        return batch

    # Create train/val for each partition and wrap it into DataLoader
    partition_train_test = partition_train_test.with_transform(apply_transforms)
    trainloader = DataLoader(
        partition_train_test["train"], batch_size=BATCH_SIZE, shuffle=True
    )
    valloader = DataLoader(partition_train_test["test"], batch_size=BATCH_SIZE)
    testset = fds.load_split("test").with_transform(apply_transforms)
    testloader = DataLoader(testset, batch_size=BATCH_SIZE)
    return trainloader, valloader, testloader

Estamos usando la librería datasets y las utilidades de pytorch para poder crear un dataset que podamos usar de manera disrtibuida. En este caso se usa el dataset CIFAR 10. Más información del dataset,  https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html 
Variables:
* NUM_CLIENTS es el número de nodos de la red federada. En este caso, 10. 
* BATCH_SIZE es el tamaño promedio de los "trozos de datos" que emplearemos para el entrenamiento. 
* Como resultado teneos una parte del dataset para entrenamiento del modelo, una para pruebas y otra para la validacion del modelo. 

Vamos ahora a graficar algunas imagenes del dataset. Para esto nos apoyamos de la librería matplotlib

In [None]:
trainloader, _, _ = load_datasets(partition_id=0)
batch = next(iter(trainloader))
images, labels = batch["img"], batch["label"]

# Reshape and convert images to a NumPy array
# matplotlib requires images with the shape (height, width, 3)
images = images.permute(0, 2, 3, 1).numpy()

# Denormalize
images = images / 2 + 0.5

# Create a figure and a grid of subplots
fig, axs = plt.subplots(4, 8, figsize=(12, 6))

# Loop over the images and plot them
for i, ax in enumerate(axs.flat):
    ax.imshow(images[i])
    ax.set_title(trainloader.dataset.features["label"].int2str([labels[i]])[0])
    ax.axis("off")

# Show the plot
fig.tight_layout()
plt.show()

Ahora, vamos a crear un modelo para el entrenamiento. Para esto, prinicplamente usaremos la librería torch. Esta librería permite el entrenamiento de algoritmos de IA, expresando el modelo en "tensores". Son valores que se van actualizando en el tiempo, y que pueden ser distribuidos en el hardware que los ejecuta, normalmente una GPU.  El modelo se define como una clase que instanciaremos varias veces (una por nodo aunque sea la misma). El modelo que veremos a continuación es muy sencillo y contiene dos capas convolucionales, dos capas totalmente conectadas (o fully connected fc) y capas de pooling que reducen normalmente el tamaño del mapa de características.  La función Relu es una función de activación. Lo que devuelve esta función es un mapa de características expresado en tensores.  Más información en los componentes y tipos de capas se puede encontrar aquí: https://pytorch.org/tutorials/recipes/recipes/defining_a_neural_network.html   

In [7]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

Ahora, vamos a crear las funciones típicas de entrenamiento de IA, que son la de entrenamiento sobre los datos de training, y la de evvaluación sobre los datos de prueba

In [8]:
def train(net, trainloader, epochs: int, verbose=False):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for batch in trainloader:
            images, labels = batch["img"].to(DEVICE), batch["label"].to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        if verbose:
            print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for batch in testloader:
            images, labels = batch["img"].to(DEVICE), batch["label"].to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

# 1. Entrenamiento centralizado. 

Ya tenemos el modelo, los datos y las funciones de entrenamitno.. Pues, vamos a entrenar. El código de abajo realiza este proceso para uno de los nodos "partition_id"  En este caso, el número 0  que simularía el nodo 1.

In [9]:
trainloader, valloader, testloader = load_datasets(partition_id=0)
net = Net().to(DEVICE)

for epoch in range(5):
    train(net, trainloader, 1)
    loss, accuracy = test(net, valloader)
    print(f"Epoch {epoch+1}: validation loss {loss}, accuracy {accuracy}")

loss, accuracy = test(net, testloader)
print(f"Final test set performance:\n\tloss {loss}\n\taccuracy {accuracy}")

Epoch 1: validation loss 0.058892894864082335, accuracy 0.344
Epoch 2: validation loss 0.05364039552211761, accuracy 0.403
Epoch 3: validation loss 0.05173851764202118, accuracy 0.419
Epoch 4: validation loss 0.04914828205108643, accuracy 0.463
Epoch 5: validation loss 0.049026955485343936, accuracy 0.453
Final test set performance:
	loss 0.049915652310848234
	accuracy 0.4194


Como podemos observar, si todo va bien, deberíammos haber tenido 5 épocas de entramiento. Una época en IA es una barrida de entrenamiento de un modelo sobre todos los datos de un dataset. Como vemos, la función **train** toma como parametros de entrada la red neuronal (modelo que describimos arriba)  y los datos para entrenamiento. La función **test** evalua que tan bien lo está haciendo mi modelo...  La función se imprime para cada época y para el final de las iteraciones.  

Si has llegado hasta aquí, en cierto modo has conseguido realizar una operción de entrenamiento de una red neuronal... Enhorabuena.... Aunque, podemos ir más allá. a por ello...

# 2. Entrenamiento Federado
Ahora, vamos a simular un entorno diferente. Imaginaros que realmente tenemos 10 nodos diferentes, que cada uno tiene parte de nuestro dataset, y que queremos se entrene de manera distribuida. ¿Como lo hacemos?  Con Federated Learning. Existen varias distribuciones de FL. La idea original viene de google y está descrito en este artículo (https://arxiv.org/pdf/1602.05629). Gracias a FL, google mejora la experiencia de sus herramientas de AI, sin "conocer" nuestros datos.   Esa implementación está realizada en Golang, el lenguaie de programación de google. No queremos entrar en otro lenguaje más, así que optaremos por la implementación usando Flower, que se ha impuesto como la librería más robusta para aplicaciones federadas. Más información de Flower:   https://flower.ai/ 

## Actualizando los parametros de los modelos. 
Si ahora tenemos 10 nodos, por lógica, tenemos 10 versiones del modelo de la red neuronal. Cada una depende de los datos en lso que se entrena. Imaginaros la situación: 10 hospitales tienen datos clinicos de pacientes. Pero estos datos son propios del hospital ( y de los pacientes) y los hospitales no se arriesgan a exponer sus datos y que haya un problema por exponer la ifnormación de sus pacientes. Para esto, el Federated Learning, permite que cada hospital actualice los datos del modelo, entrenandolo con sus datos, y actualizando los pesos. Lo único que se intercambia entre los nodos son los pesos del modelo. Así los datos no abandonan el lugar de origen (por ejemplo el hospital), los hospitales reducen el riesgo de exponer información sensible, peeeeero, la sociedad podría beneficiarse de estos modelos de IA que pueden salvar vidas.    Por ello, vamos a definir un par de funciones que actualizan los pesos de lso nodos:

In [10]:
def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

Estas 2 funciones obtienen los valores de los modelos de IA en valores numéricos estandar (numpy) o valores de tesnores cuando se actualizan los pesos.

## Definir un cliente para cada nodo
Flower trabaja tomando un conjunto de nodos como cliente y otro como servidor (imaginad el servidor es el nodo que agrupa todos los valores y los agrega para tener un entrenamiento distribuido con un modelo resultante que se pueda ejecutar en cualquier sitio). En nuestro caso, todo se ejecuta en una misma máquina, con lo cual efectuaremos una simulación. Tened en cuenta que los algoritmos de IA consumen muchos recursos, y tener varias instancias ejecutandose al mismo tiempo puede rapidamente colapsar nuestors equipos. Para facilitar esta tarea, la herramienta de simulación de Flower permite que se haga una planificación temporal de los nodos, y que no se ejecuten todos al tiempo, conservando los recursos de nuestra máquina. A continacón vamos a definir como se vería la apliccación de un cliente. 

In [11]:
class FlowerClient(NumPyClient):
    def __init__(self, net, trainloader, valloader):
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        return get_parameters(self.net)

    def fit(self, parameters, config):
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}

Como se observa, el cliente, tiene cuatro funciones principales:
* La de inicialización (**__init__**), que permite o "simula" la recogida de los datos locales (imaginaros los datos de cada hospital)
* **get_parameters**: función definida anteriormente, que permite recoger (del servidor) la versión más actual de los pesos de mi modelo de IA. 
* **fit**: Esta función coge los valores más actualizados de los pesos, los asigna al modelo (local) y se pone a entyrenar con los datos de ese nodo. Deolverá por un lado los valores de los pesos del modelo más actualizados, y por otro lado la longitud del datasset con el cual ha entrenado Esto es útil para que la agregación no quede desbalanceada. No es lo mismo entrenar un modelo con 100 muestras que con  1000000000. 
*  **evaluate**: esta función simpelemnten evalua las métricas para el entrenamiento que acaba de realizar.  

Ahora lo que haremos es crear una función cliente que llame tantas veces a la clase de FlowerCLient como sea necesario (por ejemplo, 10 veces para cada nodo en una época):


In [12]:
def client_fn(context: Context) -> Client:
    """Create a Flower client representing a single organization."""

    # Load model
    net = Net().to(DEVICE)

    # Load data (CIFAR-10)
    # Note: each client gets a different trainloader/valloader, so each client
    # will train and evaluate on their own unique data partition
    # Read the node_config to fetch data partition associated to this node
    partition_id = context.node_config["partition-id"]
    trainloader, valloader, _ = load_datasets(partition_id=partition_id)

    # Create a single Flower client representing a single organization
    # FlowerClient is a subclass of NumPyClient, so we need to call .to_client()
    # to convert it to a subclass of `flwr.client.Client`
    return FlowerClient(net, trainloader, valloader).to_client()


# Create the ClientApp
client = ClientApp(client_fn=client_fn)

## Definir el Servidor
Del mismo modo que hemos definido un nodo cliente, necesitamos un nodo servidor, que sea el que envíe los pesos del modelo, los reciba actualizados y haga la función  de merging o agregación de los mismos.  * ¿Como se realiza?: Aquí debemos tener un concepto de Flower llamado la estrategia, que define los parametros del entrenamiento distribuido: 

In [13]:
# Create FedAvg strategy
strategy = FedAvg(
    fraction_fit=1.0,  # Sample 100% of available clients for training
    fraction_evaluate=0.5,  # Sample 50% of available clients for evaluation
    min_fit_clients=10,  # Never sample less than 10 clients for training
    min_evaluate_clients=5,  # Never sample less than 5 clients for evaluation
    min_available_clients=10,  # Wait until all 10 clients are available
)

Como se observa, la estrategia define varios parametros de simulación distribuida:
* **fraction_fit**: Muestrear a todos los nodos de la federación
* **fraction_evaluate** : Realizar la evaluación del modelo (métricas) con el 50 por ciento de los nodos.
*  **min_fit_clients** : El número mínimo de clientes necesarios para hacer el entrenamiento. 
...


Del mismo modo que creamos una aplicación para los clientes, es necesario crear una aplicación para el servidor. Esto permite reutilizar código y eventualmente, usar múltiples estrategias en el mismo servidor. 

In [14]:
def server_fn(context: Context) -> ServerAppComponents:
    """Construct components that set the ServerApp behaviour.

    You can use the settings in `context.run_config` to parameterize the
    construction of all elements (e.g the strategy or the number of rounds)
    wrapped in the returned ServerAppComponents object.
    """

    # Configure the server for 5 rounds of training
    config = ServerConfig(num_rounds=5)

    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

## Entrenamiento distribuido. 
En la simulación, ( y en entornos de producción) queremos controlar la forma en que  se ejecuta el entrenamiento en un nodo. Esto se puede definir por parametros  de la siguiente manera.

In [15]:
# Specify the resources each of your clients need
# By default, each client will be allocated 1x CPU and 0x GPUs
backend_config = {"client_resources": {"num_cpus": 1, "num_gpus": 0.0}}

# When running on GPU, assign an entire GPU for each client
if DEVICE.type == "cuda":
    backend_config = {"client_resources": {"num_cpus": 1, "num_gpus": 1.0}}
    # Refer to our Flower framework documentation for more details about Flower simulations
    # and how to set up the `backend_config`

Donde definimos los recursos y la disposición de la ejecución. En nuestro caso, siempre será en cpu. Finalmente realizamos la ejecución

In [16]:
# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_CLIENTS,
    backend_config=backend_config,
)

[92mINFO [0m:      Starting Flower ServerApp, config: num_rounds=5, no round_timeout
[92mINFO [0m:      
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Requesting initial parameters from one random client
