In [1]:
import numpy as np
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.datasets import cifar10, mnist

import pandas as pd

import matplotlib.pyplot as plt

import gc
class GarbageCollectorCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        gc.collect()

from fedartml import SplitAsFederatedData

import flwr as fl
from typing import List, Tuple, Dict, Optional
from flwr.common import Metrics
from logging import WARNING
from typing import Callable, Dict, List, Optional, Tuple, Union

from flwr.common import (
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    MetricsAggregationFn,
    NDArrays,
    Parameters,
    Scalar,
    ndarrays_to_parameters,
    parameters_to_ndarrays,
)
from flwr.common.logger import log
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy

from flwr.server.strategy.aggregate import aggregate
from flwr.server.strategy import Strategy
import flwr as fl
from flwr.server.client_manager import ClientManager
import threading
from abc import ABC, abstractmethod
from logging import INFO
from flwr.common.logger import log
from typing import Dict, List, Optional
import random
from flwr.server.client_proxy import ClientProxy
from flwr.server.criterion import Criterion

In [2]:
np.random.seed(42)
N = -104.0
U = np.random.uniform(1.0, 5.0, size=50) * 10**4
f = np.random.uniform(1.0, 2.0, size=50) * 10**9
B = np.random.uniform(1.0, 10.0, size=50)
p = np.random.uniform(20.0, 40.0, size=50)
x, y = np.random.uniform(-5000.0, 5000.0, size=50), np.random.uniform(-5000.0, 5000.0, size=50)
M = 185.82
#cifar: 1410
#emnist: 185.82
#mnist: 173.54
def g(x1, y1):
    d = np.sqrt((x1 - 0) ** 2 + (y1 - 0) ** 2)
    if(d == 0):
       return 0
    return -128.1 - 37.6 * np.log10(d)

def computation_time(D, U, f):
    return (D * U) / f

def communication_time(B, M, p, N, x, y):
    gi = g(x, y)
    r = B * np.log2(1 + gi * p / N)
    return M / r

def computation_energy(D, U, f):
    return 10**(-28)*D*U*f**2

def communication_energy(tcom, p):
    return tcom*p

In [3]:
def test_model(model, X_test, Y_test):
    model.compile(optimizer=SGD(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    loss, acc = model.evaluate(X_test, Y_test, verbose=3, callbacks=[GarbageCollectorCallback()])
    return loss, acc

def from_FedArtML_to_Flower_format(clients_dict):
  list_x_train = []
  list_y_train = []
  client_names = list(clients_dict.keys())
  for client in client_names:
    each_client_train=np.array(clients_dict[client],dtype=object)
    feat=[]
    x_tra=np.array(each_client_train[:, 0])
    for row in x_tra:
      feat.append(row)
    feat=np.array(feat)
    y_tra=np.array(each_client_train[:, 1])
    list_x_train.append(feat)
    list_y_train.append(y_tra)

  return list_x_train, list_y_train

class FedProxLoss(tf.keras.losses.Loss):
    def __init__(self, base_loss, global_weights, mu=0.1):
        super(FedProxLoss, self).__init__()
        self.base_loss = base_loss
        self.global_weights = global_weights
        self.mu = mu
        self.local_weights = None  # Local weights initialized as None

    def set_local_weights(self, local_weights):
        # Update local weights before each epoch or batch
        self.local_weights = local_weights

    def call(self, y_true, y_pred):
        # Standard loss (cross-entropy)
        base_loss_value = self.base_loss(y_true, y_pred)

        # Proximal term: penalty for deviation from global model
        prox_term = 0
        for lw, gw in zip(self.local_weights, self.global_weights):
            prox_term += tf.reduce_sum(tf.square(lw - gw))

        return base_loss_value + 0.5 * self.mu * prox_term
    
class UpdateLocalWeightsCallback(tf.keras.callbacks.Callback):
    def __init__(self, loss_fn):
        self.loss_fn = loss_fn

    def on_batch_begin(self, batch, logs=None):
        # Update the local weights in the FedProxLoss before each batch
        self.loss_fn.set_local_weights(self.model.trainable_weights)

def get_model():
    model = Sequential([
    tf.keras.layers.Conv2D(6, kernel_size=5, strides=1, activation='relu', input_shape=(28, 28, 1), padding='same'),  # C1
    tf.keras.layers.AveragePooling2D(),
    tf.keras.layers.Conv2D(16, kernel_size=5, strides=1, activation='relu', padding='valid'),  # C2
    tf.keras.layers.AveragePooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(120, activation='relu'),
    tf.keras.layers.Dense(84, activation='relu'),
    tf.keras.layers.Dense(47, activation='softmax')  # Thay đổi số lớp đầu ra thành 47 cho EMNIST Letters
])
    return model

class FlowerClient(fl.client.NumPyClient):
    def __init__(self, model, x_train, y_train, cid) -> None:
        self.model = model
        self.x_train, self.y_train = x_train, y_train
        self.cid = cid
        
    def get_parameters(self, config):
        return self.model.get_weights()

    def fit(self, parameters, config):
        self.model.set_weights(parameters)
        loss_fn = FedProxLoss(tf.keras.losses.SparseCategoricalCrossentropy(), self.model.trainable_weights, 0.1)
        update_weights_callback = UpdateLocalWeightsCallback(loss_fn)
        self.model.compile(optimizer=SGD(learning_rate = config["learning_rate"]), 
                           loss=loss_fn, 
                           metrics=['accuracy'])
        self.model.fit(self.x_train, self.y_train, epochs=1,verbose=3, batch_size = 64,callbacks=[update_weights_callback])
        return self.model.get_weights(), len(self.x_train) , {"id": self.cid}

    def evaluate(self, parameters, config):
        return loss, len(self.x_test), {"accuracy": acc}


In [4]:
WARNING_MIN_AVAILABLE_CLIENTS_TOO_LOW = """"""

class FedAvg(Strategy):
    def __init__(
        self,
        *,
        fraction_fit: float = 1.0,
        fraction_evaluate: float = 1.0,
        min_fit_clients: int = 2,
        min_evaluate_clients: int = 2,
        min_available_clients: int = 2,
        evaluate_fn: Optional[
            Callable[
                [int, NDArrays, Dict[str, Scalar]],
                Optional[Tuple[float, Dict[str, Scalar]]],
            ]
        ] = None,
        on_fit_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
        on_evaluate_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
        accept_failures: bool = True,
        initial_parameters: Optional[Parameters] = None,
        fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
        evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
        tcom,
        tcmp,
        alpha
    ) -> None:
        super().__init__()
        self.fraction_fit = fraction_fit
        self.fraction_evaluate = fraction_evaluate
        self.min_fit_clients = min_fit_clients
        self.min_evaluate_clients = min_evaluate_clients
        self.min_available_clients = min_available_clients
        self.evaluate_fn = evaluate_fn
        self.on_fit_config_fn = on_fit_config_fn
        self.on_evaluate_config_fn = on_evaluate_config_fn
        self.accept_failures = accept_failures
        self.initial_parameters = initial_parameters
        self.fit_metrics_aggregation_fn = fit_metrics_aggregation_fn
        self.evaluate_metrics_aggregation_fn = evaluate_metrics_aggregation_fn
        self.learning_rate = 0.01
        self.decay = 0.995
        self.total_time = [tcom[i] + tcmp[i] for i in range(50)]
        self.training_time = 0
        self.round_time = 0
        self.energy = 0
        self.alpha=alpha
        self.result = {"accuracy": [], "Completion time": [0.0], "energy": [0.0]}

    def __repr__(self) -> str:
        rep = f"FedAvg(accept_failures={self.accept_failures})"
        return rep

    def num_fit_clients(self, num_available_clients: int) -> Tuple[int, int]:
        num_clients = int(num_available_clients * self.fraction_fit)
        return max(num_clients, self.min_fit_clients), self.min_available_clients

    def num_evaluation_clients(self, num_available_clients: int) -> Tuple[int, int]:
        num_clients = int(num_available_clients * self.fraction_evaluate)
        return max(num_clients, self.min_evaluate_clients), self.min_available_clients

    def initialize_parameters(
        self, client_manager: ClientManager
    ) -> Optional[Parameters]:
        initial_parameters = self.initial_parameters
        self.initial_parameters = None
        return initial_parameters

    def evaluate(
        self, server_round: int, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        if self.evaluate_fn is None:
            return None
        parameters_ndarrays = parameters_to_ndarrays(parameters)
        eval_res = self.evaluate_fn(server_round, parameters_ndarrays, {})

        loss, metrics = eval_res
        self.result["accuracy"].append(metrics["accuracy"])
        if server_round == comms_round:
            df = pd.DataFrame(self.result)
            df.to_csv(f"result/prox{self.alpha}.csv", index=False)
        return loss, metrics

    def configure_fit(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, FitIns]]:
        config = [{"learning_rate": self.learning_rate} for _ in range(10)]
        sample_size, min_num_clients = self.num_fit_clients(
            client_manager.num_available()
        )
        clients = client_manager.sample(
            num_clients=sample_size, min_num_clients=min_num_clients
        )
        
        self.result["energy"].append(self.energy+self.result["energy"][-1])
        self.energy = 0
        fit_ins = [FitIns(parameters, con) for con in config]
        return [(client, fit) for client,fit in zip(clients, fit_ins)]

    def configure_evaluate(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, EvaluateIns]]:
        if self.fraction_evaluate == 0.0:
            return []

        config = {}
        if self.on_evaluate_config_fn is not None:
            config = self.on_evaluate_config_fn(server_round)
        evaluate_ins = EvaluateIns(parameters, config)

        sample_size, min_num_clients = self.num_evaluation_clients(
            client_manager.num_available()
        )
        clients,cid = client_manager.sample(
            num_clients=sample_size, min_num_clients=min_num_clients
        )
        return [(client, evaluate_ins) for client in clients]

    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:

        weights_results = [
                (parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
                for _, fit_res in results
        ]
        aggregated_ndarrays = aggregate(weights_results)
        parameters_aggregated = ndarrays_to_parameters(aggregated_ndarrays)

        metrics_aggregated = {}
        cid = [fit_res.metrics["id"]  for _, fit_res in results]
        self.round_time = np.max([self.total_time[int(id)] for id in cid])
        self.training_time += self.round_time
        self.result["Completion time"].append(self.training_time)
        return parameters_aggregated, metrics_aggregated

    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        if not results:
            return None, {}
        if not self.accept_failures and failures:
            return None, {}

        loss_aggregated = weighted_loss_avg(
            [
                (evaluate_res.num_examples, evaluate_res.loss)
                for _, evaluate_res in results
            ]
        )

        metrics_aggregated = {}
        if self.evaluate_metrics_aggregation_fn:
            eval_metrics = [(res.num_examples, res.metrics) for _, res in results]
            metrics_aggregated = self.evaluate_metrics_aggregation_fn(eval_metrics)
        elif server_round == 1:  # Only log this warning once
            log(WARNING, "No evaluate_metrics_aggregation_fn provided")
        return loss_aggregated, metrics_aggregated

In [5]:
def evaluate_DNN_CL(
    server_round: int,
    parameters: fl.common.NDArrays,
    config: Dict[str, fl.common.Scalar],
) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]:
    net = get_model()
    net.set_weights(parameters) # Update model with the latest parameters
    loss, accuracy = test_model(net, test_images, test_labels)
    return loss, {"accuracy": accuracy}

In [6]:
class SimpleClientManager(ClientManager):
    def __init__(self) -> None:
        self.clients: Dict[str, ClientProxy] = {}
        self._cv = threading.Condition()
        self.seed = 0 # cài đặt seed để fix client tham gia mỗi round

    def __len__(self) -> int:
        return len(self.clients)

    def num_available(self) -> int:
        return len(self)

    def wait_for(self, num_clients: int, timeout: int = 86400) -> bool:
        with self._cv:
            return self._cv.wait_for(
                lambda: len(self.clients) >= num_clients, timeout=timeout
            )

    def register(self, client: ClientProxy) -> bool:
        if client.cid in self.clients:
            return False

        self.clients[client.cid] = client
        with self._cv:
            self._cv.notify_all()

        return True

    def unregister(self, client: ClientProxy) -> None:
        if client.cid in self.clients:
            del self.clients[client.cid]

            with self._cv:
                self._cv.notify_all()

    def all(self) -> Dict[str, ClientProxy]:
        return self.clients

    def sample(
        self,
        num_clients: int,
        min_num_clients: Optional[int] = None,
        criterion: Optional[Criterion] = None,
    ) -> List[ClientProxy]:
    
        if min_num_clients is None:
            min_num_clients = num_clients
        self.wait_for(min_num_clients)
        available_cids = list(self.clients)

        if num_clients == 1:
            sampled_cids = random.sample(available_cids, num_clients)
            return [self.clients[cid] for cid in sampled_cids]
        
        sampled_cids = random.sample(available_cids, num_clients)
        return [self.clients[cid] for cid in sampled_cids]

In [7]:
colors = ["#00cfcc","#e6013b","#007f88","#00cccd","#69e0da","darkblue","#FFFFFF"]
local_nodes_glob = 50
random_state = 1
comms_round =500

In [8]:
from torchvision.datasets import EMNIST
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

def convert_to_numpy(loader):
    images, labels = [], []
    for batch in loader:
        x, y = batch
        images.append(x.numpy())
        labels.append(y.numpy())
    return np.concatenate(images), np.concatenate(labels)

def load_emnist():
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5), (0.5))
    ])
    trainset = EMNIST("data", split="balanced", train=True, download=True, transform=transform)
    testset = EMNIST("data", split="balanced", train=False, download=True, transform=transform)
    train_loader = DataLoader(trainset, batch_size=64, shuffle=True)
    test_loader = DataLoader(testset, batch_size=64, shuffle=False)
    x_train, y_train = convert_to_numpy(train_loader)
    x_test, y_test = convert_to_numpy(test_loader)

    x_train = x_train.transpose(0, 2, 3, 1)
    x_test = x_test.transpose(0, 2, 3, 1)
    return (x_train, y_train), (x_test, y_test)

In [9]:
from sklearn.model_selection import train_test_split

# (train_images, train_labels), (test_images, test_labels) = cifar10.load_data()

# train_images = train_images / 255
# test_images = test_images / 255
# train_labels, test_labels = np.concatenate(train_labels), np.concatenate(test_labels)

(train_images, train_labels), (test_images, test_labels) = load_emnist()
train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255


def split_data(alpha):

    my_federater = SplitAsFederatedData(random_state = random_state)

    clients_glob_dic1, list_ids_sampled_dic1, miss_class_per_node1, distances1 = my_federater.create_clients(image_list = train_images, label_list = train_labels,
                                                                num_clients = 50, prefix_cli='client', method = "dirichlet", alpha = alpha)
    clients_glob1 = clients_glob_dic1['with_class_completion']
    list_ids_sampled1 = list_ids_sampled_dic1['with_class_completion']

    list_x_train1, list_y_train1 = from_FedArtML_to_Flower_format(clients_dict=clients_glob1)

    return list_x_train1, list_y_train1

In [10]:
model = get_model()
for i in range(1,11):
    alpha = i*10
    list_x_train, list_y_train = split_data(i)  
    tcmp = [computation_time(len(list_x_train[i])*3, U[i], f[i]) for i in range(50)]
    tcom = [communication_time(B[i], M, p[i], N, x[i], y[i]) for i in range(50)]
    ecmp = [computation_energy(len(list_x_train[i])*3, U[i], f[i]) for i in range(50)]
    ecom = [communication_energy(tcom[i], p[i]) for i in range(50)]

    def client_fn(cid: str) -> fl.client.Client:
        x_train_cid = np.array(list_x_train[int(cid)],dtype=float)
        y_train_cid = np.array(list_y_train[int(cid)],dtype=int)
        return FlowerClient(model, x_train_cid, y_train_cid, int(cid))
    
    strategy=FedAvg(
            fraction_fit=0.2,
            fraction_evaluate=0,
            min_fit_clients=10,
            min_available_clients = 50,
            evaluate_fn=evaluate_DNN_CL,
            tcom = tcom,
            tcmp = tcmp,
            alpha = alpha
    )

    clientmanager = SimpleClientManager()

    history = fl.simulation.start_simulation(
        client_fn=client_fn,
        num_clients=local_nodes_glob,
        config=fl.server.ServerConfig(num_rounds=comms_round),
        strategy=strategy,
        client_manager = clientmanager,
        client_resources = {'num_cpus': 1, 'num_gpus': 0},
    )

[92mINFO [0m:      aggregate_fit: received 10 results and 0 failures
[92mINFO [0m:      fit progress: (94, 1.717635154724121, {'accuracy': 0.4011000096797943}, 647.7884869830013)
[92mINFO [0m:      configure_evaluate: no clients selected, skipping evaluation
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 95]
[92mINFO [0m:      configure_fit: strategy sampled 10 clients (out of 50)
[36m(ClientAppActor pid=3511815)[0m Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.[32m [repeated 10x across cluster][0m
[36m(ClientAppActor pid=3511815)[0m Cause: Unknown node type <gast.gast.Import object at 0x7fb6bf779580>[32m [repeated 10x across cluster][0m
[36m(ClientAppActor pid=3511815)[0m         [32m [repeated 20x across cluster][0m
[36m(ClientAppActor pid=3511815)[0m             This is a deprecated feature. It will be removed[32m [repeated 10x across cluster][0