<a href="https://colab.research.google.com/github/AnastasiaBrinati/Progetto-ML-23-24/blob/main/task1_federato.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q flwr["simulation"] tensorflow
!pip install -q flwr_datasets["vision"]
!pip install matplotlib

from typing import Dict, List, Tuple
import tensorflow as tf
import flwr as fl
from flwr.common import Metrics
from flwr.simulation.ray_transport.utils import enable_tf_gpu_growth
from datasets import Dataset
from flwr_datasets import FederatedDataset
from keras.initializers import RandomNormal, he_normal, glorot_normal, he_uniform, glorot_uniform

VERBOSE = 0
NUM_CLIENTS = 100



In [2]:
def get_model():
    """Constructs a model suitable for features."""
    # Define input layers
    Support_Calls_input = tf.keras.layers.Input(shape=(1,), name='Support Calls')
    Payment_Delay_input = tf.keras.layers.Input(shape=(1,), name='Payment Delay')
    Usage_Frequency_input = tf.keras.layers.Input(shape=(1,), name='Usage Frequency')
    Total_Spend_input = tf.keras.layers.Input(shape=(1,), name='Total Spend')
    Last_Interaction_input = tf.keras.layers.Input(shape=(1,), name='Last Interaction')
    Age_18_25_input = tf.keras.layers.Input(shape=(1,), name='Age_binned_(18.0, 25.0]')
    Age_25_35_input = tf.keras.layers.Input(shape=(1,), name='Age_binned_(25.0, 35.0]')
    Age_35_50_input = tf.keras.layers.Input(shape=(1,), name='Age_binned_(35.0, 50.0]')
    Age_50_60_input = tf.keras.layers.Input(shape=(1,), name='Age_binned_(50.0, 60.0]')
    Age_60_70_input = tf.keras.layers.Input(shape=(1,), name='Age_binned_(60.0, 70.0]')
    Age_nan_input = tf.keras.layers.Input(shape=(1,), name='Age_binned_nan')
    Contract_Length_Annual_input = tf.keras.layers.Input(shape=(1,), name='Contract Length_Annual')
    Contract_Length_Monthly_input = tf.keras.layers.Input(shape=(1,), name='Contract Length_Monthly')
    Contract_Length_Quarterly_input = tf.keras.layers.Input(shape=(1,), name='Contract Length_Quarterly')
    Contract_Length_nan_input = tf.keras.layers.Input(shape=(1,), name='Contract Length_nan')
    Gender_Female_input = tf.keras.layers.Input(shape=(1,), name='Gender_Female')
    Gender_Male_input = tf.keras.layers.Input(shape=(1,), name='Gender_Male')
    Gender_nan_input = tf.keras.layers.Input(shape=(1,), name='Gender_nan')
    Subscription_Type_Basic_input = tf.keras.layers.Input(shape=(1,), name='Subscription Type_Basic')
    Subscription_Type_Premium_input = tf.keras.layers.Input(shape=(1,), name='Subscription Type_Premium')
    Subscription_Type_Standard_input = tf.keras.layers.Input(shape=(1,), name='Subscription Type_Standard')
    Subscription_Type_nan_input = tf.keras.layers.Input(shape=(1,), name='Subscription Type_nan')

    # Concatenate the input tensors
    concatenated_inputs = tf.keras.layers.concatenate([
        Support_Calls_input, Payment_Delay_input, Usage_Frequency_input, Total_Spend_input, Last_Interaction_input,
        Age_18_25_input, Age_25_35_input, Age_35_50_input, Age_50_60_input, Age_60_70_input, Age_nan_input,
        Contract_Length_Annual_input, Contract_Length_Monthly_input, Contract_Length_Quarterly_input, Contract_Length_nan_input,
        Gender_Female_input, Gender_Male_input, Gender_nan_input,
        Subscription_Type_Basic_input, Subscription_Type_Premium_input, Subscription_Type_Standard_input, Subscription_Type_nan_input
    ])

    # Define the rest of the model
    x = tf.keras.layers.Flatten()(concatenated_inputs)
    x = tf.keras.layers.Dense(96, activation="tanh", kernel_initializer=he_normal)(x)
    x = tf.keras.layers.Dense(128, activation="relu", kernel_initializer=he_normal)(x)
    x = tf.keras.layers.Dense(80, activation="tanh", kernel_initializer=he_normal)(x)
    x = tf.keras.layers.Dense(32, activation="relu", kernel_initializer=he_normal)(x)
    output = tf.keras.layers.Dense(1, activation="sigmoid", kernel_initializer=glorot_normal)(x)

    # Construct the model
    model = tf.keras.models.Model(inputs=[
        Support_Calls_input, Payment_Delay_input, Usage_Frequency_input, Total_Spend_input, Last_Interaction_input,
        Age_18_25_input, Age_25_35_input, Age_35_50_input, Age_50_60_input, Age_60_70_input, Age_nan_input,
        Contract_Length_Annual_input, Contract_Length_Monthly_input, Contract_Length_Quarterly_input, Contract_Length_nan_input,
        Gender_Female_input, Gender_Male_input, Gender_nan_input,
        Subscription_Type_Basic_input, Subscription_Type_Premium_input, Subscription_Type_Standard_input, Subscription_Type_nan_input
    ], outputs=output)

    # Compile the model
    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    return model


  and should_run_async(code)


In [3]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, trainset, valset) -> None:
        # Create model
        self.model = get_model()
        self.trainset = trainset
        self.valset = valset

    def get_parameters(self, config):
        return self.model.get_weights()

    def fit(self, parameters, config):
        self.model.set_weights(parameters)
        self.model.fit(self.trainset, epochs=1, verbose=VERBOSE)
        return self.model.get_weights(), len(self.trainset), {}

    def evaluate(self, parameters, config):
        self.model.set_weights(parameters)
        loss, acc = self.model.evaluate(self.valset, verbose=VERBOSE)
        return loss, len(self.valset), {"accuracy": acc}

In [4]:
# Download MNIST dataset and partition it
mnist_fds = FederatedDataset(dataset="giulioappetito/churn_dataset_giulioappetito", partitioners={"train": NUM_CLIENTS})
# Get the whole test set for centralised evaluation
centralized_testset = mnist_fds.load_full("test").to_tf_dataset(
  columns=[
            'Support Calls',
            'Payment Delay',
            'Usage Frequency',
            'Total Spend',
            'Last Interaction',
            'Age_binned_(18.0, 25.0]',
            'Age_binned_(25.0, 35.0]',
            'Age_binned_(35.0, 50.0]',
            'Age_binned_(50.0, 60.0]',
            'Age_binned_(60.0, 70.0]',
            'Age_binned_nan',
            'Contract Length_Annual',
            'Contract Length_Monthly',
            'Contract Length_Quarterly',
            'Contract Length_nan',
            'Gender_Female',
            'Gender_Male',
            'Gender_nan',
            'Subscription Type_Basic',
            'Subscription Type_Premium',
            'Subscription Type_Standard',
            'Subscription Type_nan'
        ],
        label_cols="Churn", batch_size=64
    )

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [5]:
# @title
def get_client_fn(dataset: FederatedDataset):
    """Return a function to construct a client.

    The VirtualClientEngine will execute this function whenever a client is sampled by
    the strategy to participate.
    """

    def client_fn(cid: str) -> fl.client.Client:
        """Construct a FlowerClient with its own dataset partition."""

        # Extract partition for client with id = cid
        client_dataset = dataset.load_partition(int(cid), "train")

        # Now let's split it into train (90%) and validation (10%)
        client_dataset_splits = client_dataset.train_test_split(test_size=0.1)
        trainset = client_dataset_splits["train"].to_tf_dataset(
            columns=[
                'Support Calls',
                'Payment Delay',
                'Usage Frequency',
                'Total Spend',
                'Last Interaction',
                'Age_binned_(18.0, 25.0]',
                'Age_binned_(25.0, 35.0]',
                'Age_binned_(35.0, 50.0]',
                'Age_binned_(50.0, 60.0]',
                'Age_binned_(60.0, 70.0]',
                'Age_binned_nan',
                'Contract Length_Annual',
                'Contract Length_Monthly',
                'Contract Length_Quarterly',
                'Contract Length_nan',
                'Gender_Female',
                'Gender_Male',
                'Gender_nan',
                'Subscription Type_Basic',
                'Subscription Type_Premium',
                'Subscription Type_Standard',
                'Subscription Type_nan'
            ],
            label_cols="Churn", batch_size=32
        )
        valset = client_dataset_splits["test"].to_tf_dataset(
            columns=[
                'Support Calls',
                'Payment Delay',
                'Usage Frequency',
                'Total Spend',
                'Last Interaction',
                'Age_binned_(18.0, 25.0]',
                'Age_binned_(25.0, 35.0]',
                'Age_binned_(35.0, 50.0]',
                'Age_binned_(50.0, 60.0]',
                'Age_binned_(60.0, 70.0]',
                'Age_binned_nan',
                'Contract Length_Annual',
                'Contract Length_Monthly',
                'Contract Length_Quarterly',
                'Contract Length_nan',
                'Gender_Female',
                'Gender_Male',
                'Gender_nan',
                'Subscription Type_Basic',
                'Subscription Type_Premium',
                'Subscription Type_Standard',
                'Subscription Type_nan'
            ],
            label_cols="Churn", batch_size=64
        )

        # Create and return client
        return FlowerClient(trainset, valset).to_client()

    return client_fn


def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    """Aggregation function for (federated) evaluation metrics, i.e. those returned by
    the client's evaluate() method."""
    # Multiply accuracy of each client by number of examples used
    accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]

    # Aggregate and return custom metric (weighted average)
    return {"accuracy": sum(accuracies) / sum(examples)}


def get_evaluate_fn(testset: Dataset):
    """Return an evaluation function for server-side (i.e. centralised) evaluation."""

    # The `evaluate` function will be called after every round by the strategy
    def evaluate(
        server_round: int,
        parameters: fl.common.NDArrays,
        config: Dict[str, fl.common.Scalar],
    ):
        model = get_model()  # Construct the model
        model.set_weights(parameters)  # Update model with the latest parameters
        loss, accuracy = model.evaluate(testset, verbose=VERBOSE)
        return loss, {"accuracy": accuracy}

    return evaluate


In [9]:
# Create FedAvg strategy
strategy_FedOptim = fl.server.strategy.FedProx(
    proximal_mu = 1.0,
    fraction_fit=0.1,  # Sample 10% of available clients for training
    fraction_evaluate=0.05,  # Sample 5% of available clients for evaluation
    min_fit_clients=10, # Never sample less than 10 clients for training
    min_evaluate_clients=5,  # Never sample less than 5 clients for evaluation
    min_available_clients=int(
        NUM_CLIENTS * 0.75
    ),  # Wait until at least 75 clients are available
    evaluate_metrics_aggregation_fn=weighted_average,  # aggregates federated metrics
    evaluate_fn=get_evaluate_fn(centralized_testset),  # global evaluation function
)

strategy_FedProx = fl.server.strategy.FedProx(
    proximal_mu = 1.0,
    fraction_fit=0.1,  # Sample 10% of available clients for training
    fraction_evaluate=0.05,  # Sample 5% of available clients for evaluation
    min_fit_clients=10,  # Never sample less than 10 clients for training
    min_evaluate_clients=5,  # Never sample less than 5 clients for evaluation
    min_available_clients=int(
        NUM_CLIENTS * 0.75
    ),  # Wait until at least 75 clients are available
    evaluate_metrics_aggregation_fn=weighted_average,  # aggregates federated metrics
    evaluate_fn=get_evaluate_fn(centralized_testset),  # global evaluation function
)

strategy_FedAvg = fl.server.strategy.FedAvg(
    fraction_fit=0.1,  # Sample 10% of available clients for training
    fraction_evaluate=0.05,  # Sample 5% of available clients for evaluation
    min_fit_clients=10,  # Never sample less than 10 clients for training
    min_evaluate_clients=5,  # Never sample less than 5 clients for evaluation
    min_available_clients=int(
        NUM_CLIENTS * 0.75
    ),  # Wait until at least 75 clients are available
    evaluate_metrics_aggregation_fn=weighted_average,  # aggregates federated metrics
    evaluate_fn=get_evaluate_fn(centralized_testset),  # global evaluation function
)

# Configura le risorse del client
client_resources = {"num_cpus": 1, "num_gpus": 0.0}

# Avvia la simulazione una volta
history = fl.simulation.start_simulation(
    client_fn=get_client_fn(mnist_fds),
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=27),
    strategy=strategy_FedAvg,
    client_resources=client_resources,
    actor_kwargs={"on_actor_init_fn": enable_tf_gpu_growth}  # Enable GPU growth upon actor init.
)

INFO flwr 2024-02-16 14:48:56,793 | app.py:178 | Starting Flower simulation, config: ServerConfig(num_rounds=27, round_timeout=None)
INFO:flwr:Starting Flower simulation, config: ServerConfig(num_rounds=27, round_timeout=None)
2024-02-16 14:49:01,863	INFO worker.py:1621 -- Started a local Ray instance.
INFO flwr 2024-02-16 14:49:05,836 | app.py:213 | Flower VCE: Ray initialized with resources: {'CPU': 2.0, 'object_store_memory': 3920937369.0, 'memory': 7841874740.0, 'node:172.28.0.12': 1.0, 'node:__internal_head__': 1.0}
INFO:flwr:Flower VCE: Ray initialized with resources: {'CPU': 2.0, 'object_store_memory': 3920937369.0, 'memory': 7841874740.0, 'node:172.28.0.12': 1.0, 'node:__internal_head__': 1.0}
INFO flwr 2024-02-16 14:49:05,847 | app.py:219 | Optimize your simulation with Flower VCE: https://flower.dev/docs/framework/how-to-run-simulations.html
INFO:flwr:Optimize your simulation with Flower VCE: https://flower.dev/docs/framework/how-to-run-simulations.html
INFO flwr 2024-02-16 1

In [11]:
val_accuracy_per_epoch = [0.9342, 0.9340, 0.9340, 0.9343, 0.9339, 0.9340, 0.9340, 0.9341, 0.9343, 0.9339, 0.9308, 0.9343, 0.9339, 0.9345, 0.9343, 0.9341, 0.9333, 0.9344, 0.9342, 0.9340, 0.9344, 0.9343, 0.9336, 0.9342, 0.9337, 0.9335, 0.9336]

  and should_run_async(code)


In [22]:
import matplotlib.pyplot as plt

print(f"{history.metrics_centralized = }")

global_accuracy_centralised = history.metrics_centralized["accuracy"]
round = [data[0] for data in global_accuracy_centralised]
acc = [100.0 * data[1] for data in global_accuracy_centralised]
plt.plot(round, acc)
plt.grid()
plt.ylabel("Accuracy (%)")
plt.xlabel("Round")
plt.title("MNIST - IID - 100 clients with 10 clients per round")
print(acc)

  and should_run_async(code)


SyntaxError: Generator expression must be parenthesized (<ipython-input-22-fffbdaf8839f>, line 9)