In [1]:
import collections

import numpy as np
import os
import tensorflow as tf
from tensorflow import keras
import tensorflow_federated as tff
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import MinMaxScaler, StandardScaler, OrdinalEncoder

from keras.models import Sequential 
from keras.layers import Dense
from keras.initializers import GlorotUniform
from keras.initializers import HeUniform
from keras.layers import Dense, Dropout, BatchNormalization, Input

from scipy.stats import chi2_contingency

TEST_SIZE = 0.2
NUM_CLIENTS = 5
ACTIVE_CLIENTS = 5
BATCH_SIZE = 512
DROPOUT = 0.1
path = os.path.dirname(tff.__file__)
print(path)

2024-09-19 15:33:54.695534: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-19 15:33:54.695583: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-19 15:33:54.695623: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-19 15:33:54.706606: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


/home/ella/Documents/FL/venv-federated/lib/python3.9/site-packages/tensorflow_federated


In [2]:
# Lista delle GPU disponibili
gpus = tf.config.list_physical_devices('GPU')
print("GPUs disponibili: ", gpus)

# Verifica se TensorFlow utilizza la GPU
if gpus:
    print("TensorFlow sta usando la GPU")
else:
    print("TensorFlow non sta usando la GPU")

GPUs disponibili:  [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
TensorFlow sta usando la GPU


2024-09-19 15:33:58.676338: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:33:58.682730: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:33:58.683078: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysf

In [3]:
# Import del dataset e divisione in train e test
df = pd.read_csv('datasets/data.csv')
# riempire i nan con 0
df.fillna(0, inplace=True)

train_df, test_df = train_test_split(df, test_size = TEST_SIZE, random_state = 42)

# Funzione per il preprocessiSng dei dati del singolo client con i pixel disposti in una matrice
# 28x28 il dataset viene restitutito diviso in batch
def preprocess(dataset):
  return dataset.batch(BATCH_SIZE)


In [9]:
# Funzione per la creazione di un dataset ClientData a partire dal dataset di training a cui viene
# aggiunta una colonna client_nums che assegna ad ogni riga un client randomico
def create_clients(dataset):
    # Viene creata una lista randomica di client
    client_nums = list(range(NUM_CLIENTS))
    generator = np.random.default_rng(42)
    clients = generator.choice(client_nums, len(dataset))
    dataset['client_nums'] = clients

    # Viene convertito il dataset in dizionari, uno per ogni client, con label e pixel associati
    client_train_dataset = collections.OrderedDict()
    grouped_dataset = dataset.groupby('client_nums')
    for key, item in grouped_dataset:
        current_client = grouped_dataset.get_group(key)
        data = collections.OrderedDict((('y',current_client.iloc[:,-2]), ('x', current_client.iloc[:,:-2])))
        client_train_dataset[key] = data

    # I dizionari vengono convertiti in ClientDataset
    def serializable_dataset_fn(client_id):
        client_data = client_train_dataset[client_id]
        return tf.data.Dataset.from_tensor_slices(client_data)

    tff_train_data = tff.simulation.datasets.ClientData.from_clients_and_tf_fn(
        client_ids=list(client_train_dataset.keys()),
        serializable_dataset_fn=serializable_dataset_fn
    )

    return tff_train_data

In [10]:
from tensorflow.python.client import device_lib

def get_available_gpus():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos if x.device_type == 'GPU']

def print_gpu_memory():
    gpus = get_available_gpus()
    for gpu in gpus:
        gpu_name = gpu.split(':')[-1]
        gpu_mem = tf.config.experimental.get_memory_info(gpu)
        print(f"GPU {gpu_name}:")
        print(f"  Memoria libera: {gpu_mem['current'] / (1024 ** 2):.2f} MB")
        print(f"  Memoria totale: {gpu_mem['peak'] / (1024 ** 2):.2f} MB")

print_gpu_memory()

GPU 0:
  Memoria libera: 2.34 MB
  Memoria totale: 2.34 MB


2024-09-19 15:35:05.214139: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:35:05.214557: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:35:05.214861: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysf

In [11]:
# Creazione della lista contenente i client con i relativi dataset
client_data_df = create_clients(train_df)
client_ids = sorted(client_data_df.client_ids)[:ACTIVE_CLIENTS]
federated_train_data = [preprocess(client_data_df.create_tf_dataset_for_client(x)) for x in client_ids]

La struttura del train è la seguente:
    * federated_train_data ha un entry per client
    * Ogni client ha un certo numero di batch
    * Ogni batch è un array con due elementi, uno contiene tutti i pixel, l'altro tutte le label

In [12]:
sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(federated_train_data[2])))
sample_batch

OrderedDict([('y',
              array([0., 0., 1., 1., 0., 1., 0., 0., 0., 0., 0., 0., 0., 1., 1., 0., 0.,
                     0., 1., 0., 0., 0., 1., 0., 0., 0., 1., 0., 0., 1., 0., 1., 0., 1.,
                     1., 1., 1., 1., 0., 0., 0., 1., 1., 0., 1., 0., 1., 1., 0., 1., 0.,
                     1., 0., 1., 1., 1., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 1., 1.,
                     0., 0., 1., 0., 0., 1., 1., 0., 1., 0., 1., 0., 0., 1., 1., 1., 0.,
                     1., 1., 0., 0., 1., 1., 1., 0., 0., 1., 0., 0., 1., 0., 0., 0., 0.,
                     0., 1., 1., 0., 0., 1., 1., 0., 0., 0., 1., 0., 0., 1., 0., 1., 1.,
                     0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 0., 0., 0., 0., 1.,
                     1., 0., 1., 1., 1., 0., 1., 1., 1., 0., 0., 1., 0., 1., 1., 0., 0.,
                     1., 0., 0., 1., 1., 0., 0., 1., 1., 0., 1., 0., 1., 0., 1., 0., 1.,
                     0., 1., 1., 0., 1., 0., 1., 0., 0., 1., 1., 0., 1., 0., 0., 0., 0.,
  

In [13]:
# Controlli consistenza datast vs. DataClient

print('Numero di clients: '+str(len(client_data_df.client_ids)))
print(federated_train_data)
total = 0
for x in client_data_df.client_ids:
    num_elem = 0
    for i in federated_train_data[x]:
        num = len(list(i['x']))
        num_elem += num
        total += num
    print('Numero di batch per client {}: {}\nNumero elementi per client: {}'.format(x, str(len(federated_train_data[x])), str(num_elem)))
print('TOT TRAIN CD: {} \nTOT TRAIN DF: {}'.format(total, train_df.shape))

Numero di clients: 5
[<_BatchDataset element_spec=OrderedDict([('y', TensorSpec(shape=(None,), dtype=tf.float64, name=None)), ('x', TensorSpec(shape=(None, 42), dtype=tf.float64, name=None))])>, <_BatchDataset element_spec=OrderedDict([('y', TensorSpec(shape=(None,), dtype=tf.float64, name=None)), ('x', TensorSpec(shape=(None, 42), dtype=tf.float64, name=None))])>, <_BatchDataset element_spec=OrderedDict([('y', TensorSpec(shape=(None,), dtype=tf.float64, name=None)), ('x', TensorSpec(shape=(None, 42), dtype=tf.float64, name=None))])>, <_BatchDataset element_spec=OrderedDict([('y', TensorSpec(shape=(None,), dtype=tf.float64, name=None)), ('x', TensorSpec(shape=(None, 42), dtype=tf.float64, name=None))])>, <_BatchDataset element_spec=OrderedDict([('y', TensorSpec(shape=(None,), dtype=tf.float64, name=None)), ('x', TensorSpec(shape=(None, 42), dtype=tf.float64, name=None))])>]
Numero di batch per client 0: 3
Numero elementi per client: 1451
Numero di batch per client 1: 3
Numero elementi 

In [19]:
# Creazione del modello con le API di Keras
import tensorflow_addons as tfa
def create_keras_model():
  model = Sequential()

  model.add(Dense(64, kernel_initializer = HeUniform(), activation = 'relu', input_dim = 42))
  #model.add(Dropout(DROPOUT))
  model.add(Dense(1024, kernel_initializer = HeUniform(), activation = 'relu', kernel_regularizer = tf.keras.regularizers.l2(30e-6)))
  #model.add(Dropout(DROPOUT))
  model.add(Dense(256, kernel_initializer = HeUniform(), activation = 'relu', kernel_regularizer = tf.keras.regularizers.l2(30e-6)))
  #model.add(Dropout(DROPOUT))
  model.add(Dense(128, kernel_initializer = HeUniform(), activation = 'relu', kernel_regularizer = tf.keras.regularizers.l2(30e-6)))
  #model.add(Dropout(DROPOUT))
  model.add(Dense(1, kernel_initializer = GlorotUniform(), activation = 'sigmoid'))
  print(model.summary())
  return model
keras_model = create_keras_model()
# Creazione del modello TFF a partire dal modello Keras
tff_model = tff.learning.models.functional_model_from_keras(keras_model,
                                                            loss_fn=tf.keras.losses.BinaryCrossentropy(),
                                                            input_spec=federated_train_data[0].element_spec,
                                                            metrics_constructor=collections.OrderedDict(accuracy=tf.keras.metrics.SparseCategoricalAccuracy))

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_5 (Dense)             (None, 64)                2752      
                                                                 
 dense_6 (Dense)             (None, 1024)              66560     
                                                                 
 dense_7 (Dense)             (None, 256)               262400    
                                                                 
 dense_8 (Dense)             (None, 128)               32896     
                                                                 
 dense_9 (Dense)             (None, 1)                 129       
                                                                 
Total params: 364737 (1.39 MB)
Trainable params: 364737 (1.39 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
None


2024-09-19 15:36:18.391291: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:36:18.391571: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:36:18.391765: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysf

In [20]:
#Al client update si può aggiungere un parametro che indica il numero di epoche in cui ripetere l'addestramento prima di inviare i pesi al server
@tf.function
def client_update(model, dataset, initial_weights, client_optimizer):

  client_weights = initial_weights.trainable
  optimizer_state = client_optimizer.initialize(tf.nest.map_structure(tf.TensorSpec.from_tensor, client_weights))
  #for _ in range(10):
  for batch in dataset:
    x = batch['x']
    y = batch['y']
    with tf.GradientTape() as tape:
      tape.watch(client_weights)
      # Compute a forward pass on the batch of data
      outputs = model.predict_on_batch(model_weights=(client_weights, ()), x=x, training=True)
      loss = model.loss(output=outputs, label=y)
    # Compute the corresponding gradient
    grads = tape.gradient(loss, client_weights)

    # Apply the gradient using a client optimizer.
    optimizer_state, client_weights = client_optimizer.next(optimizer_state, weights=client_weights, gradients=grads)
  return tff.learning.models.ModelWeights(client_weights, non_trainable=())

@tf.function
def server_update(model, mean_client_weights):
  del model
  return mean_client_weights

In [21]:
# La logica computazionale di tensorflow federated dev'essere separata rispetto alla logica
# computazionale di tensorflow, qui vengono definite le funzioni di inizializzazione del server
# e di aggiornamento dei client e del server

# Inizializzazione del server con i pesi iniziali del modello
@tff.tensorflow.computation
def server_init():
  return tff.learning.models.ModelWeights(*tff_model.initial_weights)

# Vengono salvati i tipi di dato dei pesi del modello e del dataset
model_weights_type = server_init.type_signature.result
tf_dataset_type = tff.SequenceType(tff.types.tensorflow_to_type(tff_model.input_spec))

# Funzione di aggiornamento del client, viene passato il dataset del client ed i pesi
# aggiornati dal server, restituisce i pesi aggiornati del client
@tff.tensorflow.computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  client_optimizer = tff.learning.optimizers.build_adam(learning_rate=0.0001)
  return client_update(tff_model, tf_dataset, server_weights, client_optimizer)

# Funzione di aggiornamento del server, riceve i pesi mediati dai client e restituisce
# i pesi aggiornati del server
@tff.tensorflow.computation(model_weights_type)
def server_update_fn(mean_client_weights):
  return server_update(tff_model, mean_client_weights)

In [22]:
# Aggiornati i tipi di dato dei pesi del modello e del dataset con i tipi federati
# includendo oltre al tipo di dato il placement
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

# Definizione della computazione federata per l'inizializzazione del server
# la funzione ritorna i pesi iniziali del modello
@tff.federated_computation
def initialize_fn():
  return tff.federated_eval(server_init, tff.SERVER)

# Definizione della computazione federata per un round di training. Si divide in 3 parti:
# 1. Broadcast dei pesi del server ai client
# 2. Chiamata della funzione di aggiornamento del client
# 3. Il server aggiorna i pesi facendo la media dei pesi dei client
@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
  server_weights_at_client = tff.federated_broadcast(server_weights)
  client_weights = tff.federated_map(client_update_fn, (federated_dataset, server_weights_at_client))
  server_weights = tff.federated_map(server_update_fn, tff.federated_mean(client_weights))

  return server_weights

In [23]:
def evaluate(model_weights):
  keras_model = create_keras_model()
  keras_model.compile(loss = 'binary_focal_crossentropy', metrics = ['accuracy'])
  model_weights.assign_weights_to(keras_model)
  test_x = np.array(test_df.iloc[:,:-1])
  test_y = np.array(test_df.iloc[:,-1])
  keras_model.evaluate(x = test_x, y = test_y)

In [24]:
# Viene creato l'iterative process con le funzioni init e next custom
federated_algorithm = tff.templates.IterativeProcess(initialize_fn=initialize_fn, next_fn=next_fn)

# Inizializzazione del server
server_state = federated_algorithm.initialize()

2024-09-19 15:36:31.822684: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:36:31.822932: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-19 15:36:31.823200: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-09-19 15:36:31.826441: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:36:31.826790: I tensorflow/compiler/xla/stream_executor/

In [25]:
i = 1
for _ in range(5):
    print('pre server state '+str(i))
    server_state = federated_algorithm.next(server_state, federated_train_data)
    print('post server state '+str(i))
    i+=1
evaluate(server_state)

pre server state 1


2024-09-19 15:36:36.402468: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:36:36.402627: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-19 15:36:36.402730: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-09-19 15:36:36.403029: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-19 15:36:36.403197: I tensorflow/compiler/xla/stream_executor/

post server state 1
pre server state 2
post server state 2
pre server state 3
post server state 3
pre server state 4
post server state 4
pre server state 5
post server state 5
Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_10 (Dense)            (None, 64)                2752      
                                                                 
 dense_11 (Dense)            (None, 1024)              66560     
                                                                 
 dense_12 (Dense)            (None, 256)               262400    
                                                                 
 dense_13 (Dense)            (None, 128)               32896     
                                                                 
 dense_14 (Dense)            (None, 1)                 129       
                                                                 
Total para

2024-09-19 15:36:40.371499: I tensorflow/tsl/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory


