# **VAE FED** Protein homo

## Required libraries and configuration

In [1]:
!pip install -q flwr[simulation]

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m157.2/157.2 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.9/56.9 MB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m201.4/201.4 kB[0m [31m17.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m60.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m97.9/97.9 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m128.2/128.2 kB[0m [31m15.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.7/8.7 MB[0m [31m89.4 MB/s[0m et


Import required libraries

In [2]:
import os

import re
import time
import random
import sys

import time

from typing import Dict, Optional, Tuple, List
from collections import OrderedDict

import flwr as fl
from flwr.common import Metrics

import numpy as np
import pandas as pd

import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_hub as hub
from tensorflow.keras import models, layers, losses, metrics, optimizers

from imblearn.datasets import fetch_datasets
from imblearn.over_sampling import SMOTE, RandomOverSampler

In [3]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn import metrics as skmetrics
from sklearn.metrics import confusion_matrix

In [4]:
from VAEOversampler import VAEOversampler

Define some parameters for the simulation, such as the number of clients in the federated scenario, the number of federated rounds, the number of epochs of each client before communicating, and the batch size for training phase

In [5]:
# Some parameters
NUM_CLIENTS = 10 # Number of clients in the federated scenario
NUM_ROUNDS = 10 #5 Number of learning rounds in the federated computation
NUM_EPOCHS = 5 #3 Number of epochs that the local dataset is seen each round
BATCH_SIZE = 20 # Batch size for training phase

# Define the seed for random numbers
seed = 10
np.random.seed(seed)
tf.random.set_seed(seed)
tf.keras.utils.set_random_seed(seed)
random.seed(seed)

## Loading and preparing the input data

The Sentiment140 dataset is not available in TFF, so it needs to be loaded from other source such as the tfds (tensorflow datasets) library. Then, it is adapted to the TFF format, so it can be used to train a model using TFF.

Note: We download the full Sentiment140 dataset, but in order to be able to execute the experiments in reasonable time, we are going to use a portion of it. In this notebook we are selecting just 1% of the data for training and 10% for testing purposes.

In [6]:
dset_name = 'protein_homo'
dset = fetch_datasets()[dset_name]

In [7]:
X = pd.DataFrame(StandardScaler().fit_transform(dset.data))
y = dset.target

y[y == -1] = 0
X['label'] = y

dtrain, dtest = train_test_split(X, test_size=0.2, random_state=42, stratify=y)

In [8]:
dtest = dtest.iloc[:-1,:]

In [9]:
dtrain.shape

(116600, 75)

In [10]:
dtrain['label'].value_counts()

0    115563
1      1037
Name: label, dtype: int64

Select the text and polarity columns from the original data, and transform it to a dataframe so it can be later used in TFF.

At this point, we do not select the 'user' column because we will create random IID partitions, so the 'user' column is not neccesary. If the user want to try it with non-IID partition, he/she may also keep the 'user' column and use it as client identifier instead of the following random user ID.

In [11]:
# Create a random list of ids. Each instance is given a random id, indicating the client where will be distributed
ids_train = [i for i in range(NUM_CLIENTS) for _ in range(len(dtrain)//NUM_CLIENTS)]
random.Random(seed).shuffle(ids_train)

# Add the id assignment to the dataframe
dtrain['user'] = ids_train
unique_ids_train = list(set(ids_train))

In [12]:
# Do the same with the test data
ids_test = [i for i in range(NUM_CLIENTS) for _ in range(len(dtest)//NUM_CLIENTS)]
random.Random(seed+1).shuffle(ids_test)
dtest['user'] = ids_test
unique_ids_test = list(set(ids_test))

For the sake of simplicity, in this notebook we will be dealing with a binary problem. For that purpose, we remove the neutral tweets, so the classifier's aim is to differentiate between positive and negative tweets.

Create the DataLoaders for each client. It creates an array of datasets, one for each client.

In [13]:
# Create DataLoaders for each client
train_data = []
test_data = []
for i in range(NUM_CLIENTS):
    train_data.append(dtrain.loc[dtrain['user']==unique_ids_train[i]])
    test_data.append(dtest.loc[dtest['user']==unique_ids_test[i]])

## Create a Deep Learning model

In [14]:
def create_keras_model_vae():
    m = VAEOversampler(epochs=50, intermediate_dim=512, batch_size=BATCH_SIZE, # weights='vae_fl.h5',
                       original_dim=dtrain.shape[1]-2, random_state=42, verbose=False)

    # Build the model
    m.build()

    return m

In [15]:
# Class-independent methods to get and set the parameters of a model
def get_parameters(model):
    return model.get_weights()

def set_parameters(model, parameters):
    model.set_weights(parameters)

In [16]:
class FlowerClientVAE(fl.client.NumPyClient):
    def __init__(self, model, client_train_data, client_test_data):
        # Init the client setting the x (text) and the y (polarity) for both train and testing
        self.model = model
        #x_train, y_train = client_train_data.iloc[:, :-1], client_train_data.iloc[:, -1]
        #x_test, y_test = client_test_data.iloc[:, :-1], client_test_data.iloc[:, -1]
        client_train_data = client_train_data.drop('user', axis=1)
        client_test_data = client_test_data.drop('user', axis=1)
        # print(f"Client_train_data: {client_train_data.info()}")
        x_train, y_train = client_train_data.drop('label',axis=1), client_train_data['label'].to_numpy()
        x_test, y_test = client_test_data.drop('label',axis=1), client_test_data['label'].to_numpy()
        # print(f"X_train: {x_train}")
        self.x_train, self.y_train = x_train, y_train
        self.x_test, self.y_test = x_test, y_test
        # self.train_dataset = tf.data.Dataset.from_tensor_slices((dict(x_train), y_train))
        # self.train_dataset = self.train_dataset.batch(BATCH_SIZE)

    def get_properties(self, config):
        """Get properties of client."""
        raise Exception("Not implemented")

    def get_parameters(self, config):
        """Get parameters of the local model."""
        return get_parameters(self.model)

    def fit(self, parameters, config):
        """Train parameters on the locally held training set."""
        # Update local model parameters
        set_parameters(self.model, parameters)

        # Get hyperparameters for this round
        batch_size: int = BATCH_SIZE
        epochs: int = NUM_EPOCHS

        # Train the model using hyperparameters from config
        #self.model.fit(self.x_train, self.y_train, validation_data=[self.x_test, self.y_test])
        history = self.model.fit(self.x_train, self.y_train)

        # Return updated model parameters and results
        parameters_prime = get_parameters(model=self.model)
        num_examples_train = len(self.x_train)

        results = {"loss": history.history["loss"][0]}
        #results = {"loss": 0.5}

        return parameters_prime, num_examples_train, results

    def evaluate(self, parameters, config):
        """Evaluate parameters on the locally held test set."""

        # Update local model with global parameters
        set_parameters(self.model, parameters)

        # Get config values
        steps: int = 10

        # Evaluate global model parameters on the local test data and return results
        #loss, accuracy = self.model.evaluate(self.x_test)
        loss = self.model.evaluate(self.x_test, self.y_test, 1)
        num_examples_test = len(self.x_test)
        print(time.time())
        self.model.save_weights('vae_fl%f.h5' % time.time())

        return loss, num_examples_test, {"accuracy": 0.95} # tpr*tnr}

In [17]:
def client_fn_vae(cid: str) -> FlowerClientVAE:
    # Load model
    model = create_keras_model_vae()

    # Note: each client gets a different train/test data
    client_train_data = train_data[int(cid)]
    client_test_data = test_data[int(cid)]

    # Create a  single Flower client representing a single organization
    return FlowerClientVAE(model, client_train_data, client_test_data)

In [18]:
# Define the configuration during training/fitting the model. It is used later when defining the FedAvg strategy.
def fit_config(server_round: int):
    config = {
        "batch_size": BATCH_SIZE,
        "local_epochs": NUM_EPOCHS
    }
    return config

# Define the configuration during evaluation. It is used later when defining the FedAvg strategy.
def evaluate_config(server_round: int):
    return {"val_steps": 3}

In [19]:
# Define strategy
strategy = fl.server.strategy.FedAvg(
    min_fit_clients=NUM_CLIENTS,
    min_evaluate_clients=NUM_CLIENTS,
    min_available_clients=NUM_CLIENTS,
    on_fit_config_fn=fit_config,
    on_evaluate_config_fn=evaluate_config,
    #evaluate_metrics_aggregation_fn=weighted_average,
    #fraction_evaluate=0.1,  # Sample 10% of available clients for evaluation
)

# Start simulation
fl_sim = fl.simulation.start_simulation(
    client_fn=client_fn_vae,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=NUM_ROUNDS),
    strategy=strategy,
)

2023-07-22 17:57:12,801	INFO worker.py:1621 -- Started a local Ray instance.


[2m[36m(launch_and_evaluate pid=1183)[0m    1/2915 [..............................] - ETA: 18:19 - loss: 65.1324
  33/2915 [..............................] - ETA: 9s - loss: 89.0934
  13/2915 [..............................] - ETA: 12s - loss: 72.2494  
  62/2915 [..............................] - ETA: 9s - loss: 82.6981
  40/2915 [..............................] - ETA: 11s - loss: 76.2550
  88/2915 [..............................] - ETA: 10s - loss: 81.0645
  68/2915 [..............................] - ETA: 11s - loss: 79.1350
 116/2915 [>.............................] - ETA: 10s - loss: 80.4740
  96/2915 [..............................] - ETA: 10s - loss: 106.2223
 143/2915 [>.............................] - ETA: 10s - loss: 81.4237
 124/2915 [>.............................] - ETA: 10s - loss: 100.0721
 170/2915 [>.............................] - ETA: 10s - loss: 103.0176
 148/2915 [>.............................] - ETA: 10s - loss: 94.8308
 221/2915 [=>...........................

[2m[36m(raylet)[0m Spilled 2475 MiB, 31 objects, write throughput 121 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.


[2m[36m(launch_and_evaluate pid=1182)[0m    1/2915 [..............................] - ETA: 9:09 - loss: 124.3201
[2m[36m(launch_and_evaluate pid=1182)[0m 1690049063.9191284
[2m[36m(launch_and_evaluate pid=1182)[0m   25/2915 [..............................] - ETA: 6s - loss: 92.0100     53/2915 [..............................] - ETA: 5s - loss: 84.9963
  55/2915 [..............................] - ETA: 5s - loss: 85.1342
 103/2915 [>.............................] - ETA: 5s - loss: 84.0250
 151/2915 [>.............................] - ETA: 5s - loss: 81.7317
 205/2915 [=>............................] - ETA: 5s - loss: 82.4039
 251/2915 [=>............................] - ETA: 5s - loss: 81.8827
 303/2915 [==>...........................] - ETA: 5s - loss: 92.2573
 329/2915 [==>...........................] - ETA: 4s - loss: 86.6444
 402/2915 [=

In this case we use a model composed by a pre-trained model from tfhub, as well as dense layers. The pre-trained model is not updated in the example; however, the ``trainable`` parameter can be set to ``True``, so such layers are also fine-tuned in the collaborative training.

Note that any network architecture supported by keras can be used. Besides, Flower also supports PyTorch models, but in contrast to Use Case 1, we use a TFF model here to provide a variety of options.

In [20]:
def create_keras_model():
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Dense(256, activation='relu', input_shape=[74]))
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dense(1))

    # Compile the model
    model.compile(
        loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
        optimizer=tf.optimizers.Adam(),
        metrics=[tf.metrics.AUC(from_logits=True)]
    )

    return model

## Training in the federated scenario

First, we create a FlowerClient class, that includes the information of each simulated client. The class has three methods:
 * `get_parameters`: Get the parameters of the model to send them to the server
 * `fit`: Reveives the model parameters from the server, trains it with local data, and return the updated model parameters to the server
 * `evaluate`: Receives the model from the server and evaluates it with local data

In [21]:
# RUS

def RUS(X_res, y_res, frac=1, minority_class_id=1, random_state=42):
    X_res = pd.DataFrame(X_res)
    X_res['Class'] = y_res

    X_neg = X_res[y_res != minority_class_id].sample(frac=frac, random_state=random_state)
    X_pos = X_res[y_res == minority_class_id].sample(frac=1, random_state=random_state)

    X_rus = pd.concat([X_neg, X_pos], ignore_index=True)

    X_eq = X_rus.drop('Class', axis=1)
    y_eq = X_rus['Class']

    return X_eq, y_eq

In [22]:
# Class-independent methods to get and set the parameters of a model
def get_parameters(model):
        return model.get_weights()

def set_parameters(model, parameters):
    model.set_weights(parameters)

In [23]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, model, client_train_data, client_test_data):
        # Init the client setting the x (text) and the y (polarity) for both train and testing
        self.model = model

        client_train_data = client_train_data.drop('user', axis=1)
        client_test_data = client_test_data.drop('user', axis=1)

        x_train, y_train = client_train_data.drop('label',axis=1), client_train_data['label'].to_numpy()
        x_test, y_test = client_test_data.drop('label',axis=1), client_test_data['label'].to_numpy()

        # VAE
        vae_sampler = VAEOversampler(epochs=50, intermediate_dim=512, batch_size=BATCH_SIZE, weights='vae_fl1690049356.889048.h5',
                          original_dim=dtrain.shape[1]-2, random_state=42, verbose=False)

        # Build the model
        vae_sampler.build()

        Xres, yres = vae_sampler.resample(x_train, y_train, sampling_strategy=.7)
        # RUS
        X_eq, y_eq = RUS(Xres, yres, frac=.02)

        self.x_train, self.y_train = X_eq, y_eq
        self.x_test, self.y_test = x_test, y_test

    def get_properties(self, config):
        """Get properties of client."""
        raise Exception("Not implemented")

    def get_parameters(self, config):
        """Get parameters of the local model."""
        return get_parameters(self.model)

    def fit(self, parameters, config):
        """Train parameters on the locally held training set."""
        # Update local model parameters
        set_parameters(self.model, parameters)

        # Get hyperparameters for this round
        batch_size: int = BATCH_SIZE
        epochs: int = NUM_EPOCHS

        # Train the model using hyperparameters from config
        history = self.model.fit(
            self.x_train,
            self.y_train,
            batch_size,
            epochs,
            verbose=0
        )

        # Return updated model parameters and results
        parameters_prime = get_parameters(model=self.model)
        num_examples_train = len(self.x_train)
        results = {
            "loss": history.history["loss"][0],
            #"accuracy": history.history["accuracy"][0],

        }
        return parameters_prime, num_examples_train, results

    def evaluate(self, parameters, config):
        """Evaluate parameters on the locally held test set."""

        # Update local model with global parameters
        set_parameters(self.model, parameters)

        # Get config values
        steps: int = 10

        # Evaluate global model parameters on the local test data and return results
        loss, accuracy = self.model.evaluate(self.x_test, self.y_test, 1)
        num_examples_test = len(self.x_test)

        y_pred = self.model.predict(self.x_test)
        y_pred = [1 if pred >= 0.5 else 0 for pred in y_pred]

        cm = confusion_matrix(self.y_test, y_pred)
        tn = cm[0][0]
        fn = cm[1][0]
        tp = cm[1][1]
        fp = cm[0][1]
        tnr = tn / (tn + fp)
        tpr = tp / (tp + fn)

        cr = skmetrics.classification_report(self.y_test, y_pred)
        print(cr)

        return loss, num_examples_test, {"TPR*TNR": tpr*tnr}

To simulate the federated scenario in a single machine, the client_fn method allows to create FlowerClients on demand, given the client id.

Note that each client is passed both training and testing local data, so the evaluation over test data is done during the simulation itself.

In [24]:
def client_fn(cid: str) -> FlowerClient:
    # Load model
    model = create_keras_model()

    # Note: each client gets a different train/test data
    client_train_data = train_data[int(cid)]
    client_test_data = test_data[int(cid)]

    # Create a  single Flower client representing a single organization
    return FlowerClient(model, client_train_data, client_test_data)

In order to show averaged evaluations metrics beyond loss, we should define a method to do that; in this case, the accuracy is weighted averaged.

In [25]:
def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    # Multiply accuracy of each client by number of examples used
    accuracies = [num_examples * m["TPR*TNR"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]

    # Aggregate and return custom metric (weighted average)
    return {"TPR*TNR": sum(accuracies) / sum(examples)}

Define a training strategy with the weighted FedAvg algorithm.

Then, start the simulation indicating the method to create clients, the number of clients in the simulation, the number of rounds, and the strategy (i.e., the FedAvg strategy to combine local updates). The simulation covers both the federated model training as well as evaluating the model with each local test data.

In [26]:
# Define the configuration during training/fitting the model. It is used later when defining the FedAvg strategy.
def fit_config(server_round: int):
    config = {
        "batch_size": BATCH_SIZE,
        "local_epochs": NUM_EPOCHS
    }
    return config

# Define the configuration during evaluation. It is used later when defining the FedAvg strategy.
def evaluate_config(server_round: int):
    return {"val_steps": 5}

In [27]:
# Define strategy
strategy = fl.server.strategy.FedAvg(
    min_fit_clients=NUM_CLIENTS,
    min_evaluate_clients=NUM_CLIENTS,
    min_available_clients=NUM_CLIENTS,
    on_fit_config_fn=fit_config,
    on_evaluate_config_fn=evaluate_config,
    evaluate_metrics_aggregation_fn=weighted_average,
)

# Start simulation
fl_sim = fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=NUM_ROUNDS),
    strategy=strategy,
)

2023-07-22 18:11:21,827	INFO worker.py:1621 -- Started a local Ray instance.


[2m[36m(launch_and_get_parameters pid=32668)[0m   1/251 [..............................] - ETA: 27s 45/251 [====>.........................] - ETA: 0s 
 33/251 [==>...........................] - ETA: 0s 




[1;30;43mSe han truncado las últimas 5000 líneas del flujo de salida.[0m
[2m[36m(launch_and_evaluate pid=32668)[0m               precision    recall  f1-score   support
[2m[36m(launch_and_evaluate pid=32668)[0m [32m [repeated 5x across cluster][0m
[2m[36m(launch_and_evaluate pid=32668)[0m     accuracy                           0.98      2915
[2m[36m(launch_and_evaluate pid=32668)[0m    macro avg       0.60      0.99      0.66      2915
[2m[36m(launch_and_evaluate pid=32668)[0m weighted avg       1.00      0.98      0.99      2915
  1/251 [..............................] - ETA: 18s[32m [repeated 2x across cluster][0m
  24/2915 [..............................] - ETA: 6s - loss: 0.0305 - auc_49: 0.0000e+00       
 158/2915 [>.............................] - ETA: 6s - loss: 0.1598 - auc_49: 0.9667
 197/2915 [=>............................] - ETA: 6s - loss: 0.1282 - auc_49: 0.9734
 283/2915 [=>............................] - ETA: 6s - loss: 0.1081 - auc_49: 0.9816[32m

## Evaluation with test data

The evaluation has been done during the simulation. Following, we show the averaged results over test data.
The result of the simulation includes the results on all rounds, so we retrieve those of the last round.

In [28]:
print('Test data, \t Loss={:.4f}, \t Accuracy={:.4f}'.format(fl_sim.losses_distributed[-1][1], fl_sim.metrics_distributed['TPR*TNR'][-1][1]))

Test data, 	 Loss=0.1359, 	 Accuracy=0.9109


In [29]:
fl_sim

History (loss, distributed):
	round 1: 0.11254079341888427
	round 2: 0.10613971948623657
	round 3: 0.09504460990428924
	round 4: 0.095462878793478
	round 5: 0.09386326149106025
	round 6: 0.09285343661904336
	round 7: 0.1003880113363266
	round 8: 0.10474087446928024
	round 9: 0.09510290995240211
	round 10: 0.1358947165310383
History (metrics, distributed, evaluate):
{'TPR*TNR': [(1, 0.8906867677852958), (2, 0.897759990654787), (3, 0.897447464110196), (4, 0.896386644856713), (5, 0.8937760032647873), (6, 0.8991156054234126), (7, 0.913236984958036), (8, 0.9086049117590195), (9, 0.8977010883072362), (10, 0.910859819866159)]}

**Overfitting??**