In [1]:
import collections

import numpy as np
import tensorflow as tf
from tensorflow import keras
import tensorflow_federated as tff
import pandas as pd
from sklearn.model_selection import train_test_split

TEST_SIZE = 0.2
NUM_CLIENTS = 10
ACTIVE_CLIENTS = 5
BATCH_SIZE = 512

2024-07-31 15:46:15.350634: I tensorflow/core/util/port.cc:111] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-07-31 15:46:15.352891: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-31 15:46:15.380317: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-31 15:46:15.380378: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-31 15:46:15.380396: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to regi

In [2]:
# Import del dataset e divisione in train e test
df = pd.read_csv('emnist-letters.csv')
train_df, test_df = train_test_split(df, test_size = TEST_SIZE, random_state = 42)

# Funzione per il preprocessing dei dati del singolo client con i pixel disposti in una matrice
# 28x28 e normalizzati in un range [0,1], il dataset viene restitutito diviso in batch
def preprocess(dataset):
  def batch_format_fn(element):
      return (tf.reshape(element['pixels'], [-1, 28, 28, 1]),
              tf.reshape(element['label'], [-1, 1]))
  return dataset.batch(BATCH_SIZE).map(batch_format_fn)

# Funzione per la creazione di un dataset ClientData a partire dal dataset di training a cui viene
# aggiunta una colonna client_nums che assegna ad ogni riga un client randomico
def create_clients(dataset):
    # Viene creata una lista randomica di client
    client_nums = list(range(NUM_CLIENTS))
    generator = np.random.default_rng(42)
    clients = generator.choice(client_nums, len(dataset))
    dataset['client_nums'] = clients

    # Viene convertito il dataset in una serie di dizionari, uno per ogni client, con label e pixel associati
    client_train_dataset = collections.OrderedDict()
    grouped_dataset = dataset.groupby('client_nums')
    for key, item in grouped_dataset:
        current_client = grouped_dataset.get_group(key)
        data = collections.OrderedDict((('label',current_client.iloc[:,0]), ('pixels', current_client.iloc[:,1:-1])))
        client_train_dataset[key] = data

    # I dizionari vengono convertiti in ClientDataset
    def serializable_dataset_fn(client_id):
        client_data = client_train_dataset[client_id]
        return tf.data.Dataset.from_tensor_slices(client_data)

    tff_train_data = tff.simulation.datasets.ClientData.from_clients_and_tf_fn(
        client_ids=list(client_train_dataset.keys()),
        serializable_dataset_fn=serializable_dataset_fn
    )
    
    return tff_train_data
    
# Creazione della lista contenente i client con i relativi dataset
train_df = create_clients(train_df)
client_ids = sorted(train_df.client_ids)[:ACTIVE_CLIENTS]
federated_train_data = [preprocess(train_df.create_tf_dataset_for_client(x)) for x in client_ids]

In [3]:
# Funzione per la creazione di un dataset ClientData a partire dal dataset di test
def create_clients(dataset):
    # Viene convertito il dataset in un dizionario con label e pixel
    data = collections.OrderedDict((('label', dataset.iloc[:,0]), ('pixels', dataset.iloc[:,1:-1])))
    # Il dizionario viene convertito in ClientDataset
    def serializable_dataset_fn(client_ids):
        return tf.data.Dataset.from_tensor_slices(data)
    tff_train_data = tff.simulation.datasets.ClientData.from_clients_and_tf_fn(client_ids=[0], serializable_dataset_fn=serializable_dataset_fn)
    
    return tff_train_data

# Creazione del dataset di test
central_test_df = create_clients(test_df)
central_test_df = preprocess(central_test_df.create_tf_dataset_from_all_clients())

In [5]:
# Creazione del modello con le API di Keras
def create_keras_model():
  return keras.models.Sequential([keras.layers.Conv2D(filters=32, kernel_size=(5,5), activation='tanh', input_shape=(28, 28, 1), kernel_initializer="glorot_normal"),
                                  keras.layers.Dropout(rate=0.3),
                                  keras.layers.AveragePooling2D(pool_size=(2, 2), strides=2),
                                  keras.layers.Conv2D(filters=48, kernel_size=(5,5), activation='tanh'),
                                  keras.layers.Dropout(rate=0.4),
                                  keras.layers.AveragePooling2D(pool_size=(2, 2), strides=2),
                                  keras.layers.Conv2D(filters=64, kernel_size=(5,5), padding='same', activation='tanh'),
                                  keras.layers.Flatten(),
                                  keras.layers.Dense(120, activation='tanh', kernel_regularizer=keras.regularizers.l2(0.01)),
                                  keras.layers.Dense(84, activation='tanh', kernel_regularizer=keras.regularizers.l2(0.01)),
                                  keras.layers.Dense(26, activation='softmax')
                                ])
keras_model = create_keras_model()

# Creazione del modello TFF a partire dal modello Keras
tff_model = tff.learning.models.functional_model_from_keras(keras_model,
                                                            loss_fn=tf.keras.losses.SparseCategoricalCrossentropy(),
                                                            input_spec=federated_train_data[0].element_spec,
                                                            metrics_constructor=collections.OrderedDict(accuracy=tf.keras.metrics.SparseCategoricalAccuracy))

2024-07-31 15:46:50.588758: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:382] MLIR V1 optimization pass is not enabled


In [6]:
#Al client update si può aggiungere un parametro che indica il numero di epoche in cui ripetere l'addestramento prima di inviare i pesi al server
@tf.function
def client_update(model, dataset, initial_weights, client_optimizer):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights and the optimizer
  # state.
  client_weights = initial_weights.trainable
  optimizer_state = client_optimizer.initialize(
      tf.nest.map_structure(tf.TensorSpec.from_tensor, client_weights)
  )

  # Use the client_optimizer to update the local model.
  for batch in dataset:
    x, y = batch
    with tf.GradientTape() as tape:
      tape.watch(client_weights)
      # Compute a forward pass on the batch of data
      outputs = model.predict_on_batch(
          model_weights=(client_weights, ()), x=x, training=True
      )
      loss = model.loss(output=outputs, label=y)

    # Compute the corresponding gradient
    grads = tape.gradient(loss, client_weights)

    # Apply the gradient using a client optimizer.
    optimizer_state, client_weights = client_optimizer.next(
        optimizer_state, weights=client_weights, gradients=grads
    )

  return tff.learning.models.ModelWeights(client_weights, non_trainable=())

@tf.function
def server_update(model, mean_client_weights):
  return mean_client_weights

In [7]:
# La logica computazionale di tensorflow federated dev'essere separata rispetto alla logica
# computazionale di tensorflow, qui vengono definite le funzioni di inizializzazione del server
# e di aggiornamento dei client e del server

# Inizializzazione del server con i pesi iniziali del modello
@tff.tensorflow.computation
def server_init():
  return tff.learning.models.ModelWeights(*tff_model.initial_weights)

# Vengono salvati i tipi di dato dei pesi del modello e del dataset    
model_weights_type = server_init.type_signature.result
tf_dataset_type = tff.SequenceType(tff.types.tensorflow_to_type(tff_model.input_spec))

# Funzione di aggiornamento del client, viene passato il dataset del client edi pesi
# aggiornati dal server, restituisce i pesi aggiornati del client
@tff.tensorflow.computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  client_optimizer = tff.learning.optimizers.build_adam(learning_rate=0.01)
  return client_update(tff_model, tf_dataset, server_weights, client_optimizer)

# Funzione di aggiornamento del server, riceve i pesi mediati dai client e restituisce
# i pesi aggiornati del server
@tff.tensorflow.computation(model_weights_type)
def server_update_fn(mean_client_weights):
  return server_update(tff_model, mean_client_weights)

In [8]:
# Aggiornati i tipi di dato dei pesi del modello e del dataset con i tipi federati
# includendo oltre al tipo di dato il placement
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

# Definizione della computazione federata per l'inizializzazione del server
# la funzione ritorna i pesi iniziali del modello
@tff.federated_computation
def initialize_fn():
  return tff.federated_eval(server_init, tff.SERVER)

# Definizione della computazione federata per un round di training. Si divide in 3 parti:
# 1. Broadcast dei pesi del server ai client
# 2. Chiamata della funzione di aggiornamento del client
# 3. Il server aggiorna i pesi facendo la media dei pesi dei client
@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):  
  server_weights_at_client = tff.federated_broadcast(server_weights)
  client_weights = tff.federated_map(client_update_fn, (federated_dataset, server_weights_at_client))
  server_weights = tff.federated_map(server_update_fn, tff.federated_mean(client_weights))

  return server_weights

In [9]:
def evaluate(model_weights):
  keras_model = create_keras_model()
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
  )
  model_weights.assign_weights_to(keras_model)
  keras_model.evaluate(test_df)

In [10]:
# Viene creato l'iterative process con le funzioni init e next custom
federated_algorithm = tff.templates.IterativeProcess(initialize_fn=initialize_fn, next_fn=next_fn)

# Inizializzazione del server
server_state = federated_algorithm.initialize()

# Addestramento distribuito del modello
for _ in range(15):
  server_state = federated_algorithm.next(server_state, federated_train_data)
  evaluate(server_state)

2024-07-31 15:49:50.750099: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-07-31 15:49:50.750339: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-07-31 15:49:53.076886: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-07-31 15:49:53.077111: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-07-31 15:49:53.182531: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-07-31 15:49:53.182875: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-07-31 15:49:53.196195: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-07-31 15:49:53.196494: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session


_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "Failed to run computation: {{function_node __forward_predict_on_batch_1174}} {{function_node __forward_predict_on_batch_1174}} {{function_node __forward_predict_on_batch_1100}} {{function_node __forward_predict_on_batch_1100}} You must feed a value for placeholder tensor 'conv2d_3_input' with dtype float and shape [?,28,28,1]
	 [[{{node conv2d_3_input}}]]
	 [[StatefulPartitionedCall]]
	 [[StatefulPartitionedCall]]
	 [[StatefulPartitionedCall/ReduceDataset]] while evaluating local [_jvu22] in block locals [_jvu1,_jvu2,_jvu3,_jvu14,_jvu15,_jvu16,_jvu17,_jvu18,_jvu19,_jvu20,_jvu21,_jvu22,_jvu23,_jvu24]"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2024-07-31T15:49:55.456998826+00:00", grpc_status:13, grpc_message:"Failed to run computation: {{function_node __forward_predict_on_batch_1174}} {{function_node __forward_predict_on_batch_1174}} {{function_node __forward_predict_on_batch_1100}} {{function_node __forward_predict_on_batch_1100}} You must feed a value for placeholder tensor \'conv2d_3_input\' with dtype float and shape [?,28,28,1]\n\t [[{{node conv2d_3_input}}]]\n\t [[StatefulPartitionedCall]]\n\t [[StatefulPartitionedCall]]\n\t [[StatefulPartitionedCall/ReduceDataset]] while evaluating local [_jvu22] in block locals [_jvu1,_jvu2,_jvu3,_jvu14,_jvu15,_jvu16,_jvu17,_jvu18,_jvu19,_jvu20,_jvu21,_jvu22,_jvu23,_jvu24]"}"
>