Write a code to implement federated learning in Python using the Flower framework to train a model across multiple clients while ensuring data privacy.  Create dummy dataset

In [3]:
!pip install flwr

Collecting flwr
  Downloading flwr-1.10.0-py3-none-any.whl.metadata (15 kB)
Collecting iterators<0.0.3,>=0.0.2 (from flwr)
  Downloading iterators-0.0.2-py3-none-any.whl.metadata (2.5 kB)
Collecting pathspec<0.13.0,>=0.12.1 (from flwr)
  Downloading pathspec-0.12.1-py3-none-any.whl.metadata (21 kB)
Collecting protobuf<5.0.0,>=4.25.2 (from flwr)
  Downloading protobuf-4.25.4-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting pycryptodome<4.0.0,>=3.18.0 (from flwr)
  Downloading pycryptodome-3.20.0-cp35-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting tomli-w<2.0.0,>=1.0.0 (from flwr)
  Downloading tomli_w-1.0.0-py3-none-any.whl.metadata (4.9 kB)
Collecting typer<0.10.0,>=0.9.0 (from typer[all]<0.10.0,>=0.9.0->flwr)
  Downloading typer-0.9.4-py3-none-any.whl.metadata (14 kB)
Collecting colorama<0.5.0,>=0.4.3 (from typer[all]<0.10.0,>=0.9.0->flwr)
  Downloading colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Downloading flwr-1.10.0-py3-

In [4]:

import flwr as fl
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
import warnings
from threading import Thread, Event
import time

warnings.filterwarnings("ignore")

# Generate dummy dataset
def generate_dummy_data(num_samples, num_features):
    X = np.random.randn(num_samples, num_features)
    y = np.random.randint(0, 2, num_samples)
    return X, y

# Define Flower client
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.model = LogisticRegression()
        # Initialize the model to ensure consistent shapes
        self.model.fit(self.X, self.y)

    def get_parameters(self, config):
        return [self.model.coef_.astype(np.float32), self.model.intercept_.astype(np.float32)]

    def set_parameters(self, parameters):
        self.model.coef_ = parameters[0].astype(np.float64)
        self.model.intercept_ = parameters[1].astype(np.float64)

    def fit(self, parameters, config):
        self.set_parameters(parameters)
        self.model.fit(self.X, self.y)
        return self.get_parameters(config), len(self.X), {}

    def evaluate(self, parameters, config):
        self.set_parameters(parameters)
        y_pred = self.model.predict_proba(self.X)
        loss = log_loss(self.y, y_pred)
        accuracy = self.model.score(self.X, self.y)
        return loss, len(self.X), {"accuracy": accuracy}

# Define client function
def client_fn(cid: str) -> fl.client.Client:
    X, y = generate_dummy_data(100, 4)
    return FlowerClient(X, y)

# Define strategy
strategy = fl.server.strategy.FedAvg(
    fraction_fit=1.0,
    fraction_evaluate=1.0,
    min_fit_clients=3,
    min_evaluate_clients=3,
    min_available_clients=3,
    initial_parameters=fl.common.ndarrays_to_parameters(
        [np.zeros((1, 4), dtype=np.float32), np.zeros(1, dtype=np.float32)]
    ),
)

# Start Flower server
def start_server(stop_event):
    fl.server.start_server(
        server_address="[::]:8084",
        config=fl.server.ServerConfig(num_rounds=3),
        strategy=strategy,
        stop_event=stop_event
    )

# Client function
def start_client(stop_event):
    fl.client.start_numpy_client(server_address="[::]:8084", client=client_fn, root_certificates=None, stop_event=stop_event)

# Function to run the server in a separate thread
def run_server(stop_event):
    server_thread = Thread(target=start_server, args=(stop_event,))
    server_thread.start()
    return server_thread

# Function to run multiple clients
def run_clients(num_clients=3, stop_event=None):
    client_threads = []
    for _ in range(num_clients):
        client_thread = Thread(target=start_client, args=(stop_event,))
        client_thread.start()
        client_threads.append(client_thread)
    return client_threads

# Main function to run the federated learning process
def run_federated_learning():
    stop_event = Event()

    print("Starting server...")
    server_thread = run_server(stop_event)

    # Wait for the server to start
    time.sleep(5)

    print("Starting clients...")
    client_threads = run_clients(3, stop_event)

    # Wait for all clients to finish
    for thread in client_threads:
        thread.join()

    # Stop the server
    stop_event.set()
    server_thread.join()

    print("Federated learning process completed.")

# Run the federated learning process
if __name__ == "__main__":
    run_federated_learning()

Exception in thread Thread-10 (start_server):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-4-fa427e760366>", line 64, in start_server
TypeError: start_server() got an unexpected keyword argument 'stop_event'


Starting server...


Exception in thread Thread-11 (start_client):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-4-fa427e760366>", line 73, in start_client
TypeError: start_numpy_client() got an unexpected keyword argument 'stop_event'
Exception in thread Thread-12 (start_client):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
Exception in thread Thread-13 (start_client)    self._target(*self._args, **self._kwargs)
  File "<ipython-input-4-fa427e760366>", line 73, in start_client
:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
TypeError: start_numpy_client() got an unexpected keyword argument 'stop_event'
 

Starting clients...
Federated learning process completed.


Write a code to configure a federated learning strategy and implement a server that coordinates the federated learning process.

In [5]:
import flwr as fl
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
import warnings

warnings.filterwarnings("ignore")

# Define the Flower client
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.model = LogisticRegression()

    def get_parameters(self, config):
        return [self.model.coef_, self.model.intercept_]

    def set_parameters(self, parameters):
        self.model.coef_ = parameters[0]
        self.model.intercept_ = parameters[1]

    def fit(self, parameters, config):
        self.set_parameters(parameters)
        self.model.fit(self.X, self.y)
        return self.get_parameters(config), len(self.X), {}

    def evaluate(self, parameters, config):
        self.set_parameters(parameters)
        loss = log_loss(self.y, self.model.predict_proba(self.X))
        accuracy = self.model.score(self.X, self.y)
        return loss, len(self.X), {"accuracy": accuracy}

# Define the client function
def client_fn(cid: str) -> fl.client.Client:
    X, y = generate_dummy_data(100, 4)
    return FlowerClient(X, y)

# Define the strategy
strategy = fl.server.strategy.FedAvg(
    fraction_fit=1.0,
    fraction_evaluate=1.0,
    min_fit_clients=3,
    min_evaluate_clients=3,
    min_available_clients=3,
)

# Define the server configuration
server_config = fl.server.ServerConfig(num_rounds=3)

# Start the Flower server
def start_server():
    fl.server.start_server(
        server_address="[::]:8084",
        config=server_config,
        strategy=strategy,
    )

# Run the server in a separate thread
def run_server():
    server_thread = Thread(target=start_server)
    server_thread.start()
    return server_thread

# Main function to run the federated learning process
def run_federated_learning():
    print("Starting server...")
    server_thread = run_server()

    # Wait for the server to start
    time.sleep(5)

    print("Federated learning process completed.")

# Run the federated learning process
run_federated_learning()

[92mINFO [0m:      Starting Flower server, config: num_rounds=3, no round_timeout
INFO:flwr:Starting Flower server, config: num_rounds=3, no round_timeout


Starting server...


[92mINFO [0m:      Flower ECE: gRPC server running (3 rounds), SSL is disabled
INFO:flwr:Flower ECE: gRPC server running (3 rounds), SSL is disabled
[92mINFO [0m:      [INIT]
INFO:flwr:[INIT]
[92mINFO [0m:      Requesting initial parameters from one random client
INFO:flwr:Requesting initial parameters from one random client


Federated learning process completed.


Write a code to Implement a federated averaging strategy for model aggregation

In [6]:
import flwr as fl
import numpy as np

# Define the federated averaging strategy
class CustomFedAvg(fl.server.strategy.FedAvg):
    def aggregate_fit(self, server_round, results, failures):
        aggregated_weights = super().aggregate_fit(server_round, results, failures)

        # You can further customize the aggregation process here
        # For example, you can use a weighted average based on the client dataset sizes
        # aggregated_weights = self.weighted_average(results)

        return aggregated_weights

    # def weighted_average(self, results):
    #     total_size = sum(result.num_examples for result, _ in results)
    #     aggregated_weights = [
    #         np.average([result.parameters[i] for result, _ in results], axis=0, weights=[result.num_examples / total_size for result, _ in results])
    #         for i in range(len(results[0][0].parameters))
    #     ]
    #     return aggregated_weights

# Define the server configuration
server_config = fl.server.ServerConfig(num_rounds=3)

# Create the custom strategy instance
custom_strategy = CustomFedAvg(
    fraction_fit=1.0,
    fraction_evaluate=1.0,
    min_fit_clients=3,
    min_evaluate_clients=3,
    min_available_clients=3,
)

# Start the Flower server with the custom strategy
def start_server():
    fl.server.start_server(
        server_address="[::]:8084",
        config=server_config,
        strategy=custom_strategy,
    )

# Run the server in a separate thread
def run_server():
    server_thread = Thread(target=start_server)
    server_thread.start()
    return server_thread

# Main function to run the federated learning process
def run_federated_learning():
    print("Starting server...")
    server_thread = run_server()

    # Wait for the server to start
    time.sleep(5)

    print("Federated learning process completed.")

# Run the federated learning process
run_federated_learning()

[92mINFO [0m:      Starting Flower server, config: num_rounds=3, no round_timeout
INFO:flwr:Starting Flower server, config: num_rounds=3, no round_timeout
[92mINFO [0m:      Flower ECE: gRPC server running (3 rounds), SSL is disabled
INFO:flwr:Flower ECE: gRPC server running (3 rounds), SSL is disabled
[92mINFO [0m:      [INIT]
INFO:flwr:[INIT]
[92mINFO [0m:      Requesting initial parameters from one random client
INFO:flwr:Requesting initial parameters from one random client


Starting server...
Federated learning process completed.
