In [None]:
import numpy as np
import os
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.metrics import SparseCategoricalAccuracy
from tensorflow.keras.optimizers import SGD
from tensorflow.keras import datasets, layers, models

import pandas as pd

import matplotlib.pyplot as plt

import gc

class GarbageCollectorCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        gc.collect()

from fedartml import InteractivePlots, SplitAsFederatedData

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import flwr as fl

from typing import Callable, Dict, List, Optional, Tuple, Union
from flwr.common import Metrics
import random
from keras.datasets import cifar10, mnist
from flwr.common import (
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    MetricsAggregationFn,
    NDArrays,
    Parameters,
    Scalar,
    ndarrays_to_parameters,
    parameters_to_ndarrays,
)
from flwr.common.logger import log
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy

from flwr.server.strategy.aggregate import aggregate
from flwr.server.strategy import Strategy

In [None]:
np.random.seed(42)
N = -104.0
U = np.random.uniform(1.0, 5.0, size=50) * 10**4
f = np.random.uniform(1.0, 2.0, size=50) * 10**9
B = np.random.uniform(1.0, 10.0, size=50)
p = np.random.uniform(20.0, 40.0, size=50)
x, y = np.random.uniform(-500.0, 500.0, size=50), np.random.uniform(-500.0, 500.0, size=50)
M = 173.54

def g(x1, y1):
    d = np.sqrt((x1 - 0) ** 2 + (y1 - 0) ** 2)
    if(d == 0):
       return 0
    return -128.1 - 37.6 * np.log10(d)

def computation_time(D, U, f):
    return (D * U) / f

def communication_time(B, M, p, N, x, y):
    gi = g(x, y)
    r = B * np.log2(1 + gi * p / N)
    return M / r

In [None]:
def test_model(model, X_test, Y_test):
    cce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = False)
    logits = model.predict(X_test, batch_size=64, verbose=3, callbacks=[GarbageCollectorCallback()])
    y_pred = tf.argmax(logits, axis=1)
    loss = cce(Y_test, logits).numpy()
    acc = accuracy_score(y_pred, Y_test)
    pre = precision_score(y_pred, Y_test, average='weighted',zero_division = 0)
    rec = recall_score(y_pred, Y_test, average='weighted',zero_division = 0)
    f1s = f1_score(y_pred, Y_test, average='weighted',zero_division = 0)
    return loss, acc, pre, rec, f1s

def from_FedArtML_to_Flower_format(clients_dict):
  list_x_train = []
  list_y_train = []

  client_names = list(clients_dict.keys())

  for client in client_names:
    each_client_train=np.array(clients_dict[client],dtype=object)
    feat=[]
    x_tra=np.array(each_client_train[:, 0])
    for row in x_tra:
      feat.append(row)
    feat=np.array(feat)
    y_tra=np.array(each_client_train[:, 1])
    list_x_train.append(feat)
    list_y_train.append(y_tra)

  return list_x_train, list_y_train

def get_model():
    model = Sequential([
            tf.keras.layers.Conv2D(6, kernel_size=(5, 5), activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(16, kernel_size=(5, 5), activation='relu'),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(120, activation='relu'),
            tf.keras.layers.Dense(84, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax')
    ])
    return model

from collections import Counter, OrderedDict
import math

class FlowerClient(fl.client.NumPyClient):
    def __init__(self, model, x_train, y_train, cid) -> None:
        self.model = model
        self.x_train, self.y_train = x_train, y_train
        self.cid = int(cid)

    def get_parameters(self, config):
        return self.model.get_weights()
    
    def fit(self, parameters, config):
        self.model.set_weights(parameters)
        self.model.compile(optimizer=SGD(learning_rate = config["learning_rate"]), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
        history = self.model.fit(self.x_train, self.y_train, epochs=1,verbose=3, batch_size = 64, callbacks=[GarbageCollectorCallback()])
        loss = history.history['loss'][-1]
        acc = history.history['accuracy'][-1]
        return self.model.get_weights(), len(self.x_train), {"loss": loss, "accuracy": acc, "id": self.cid}

    def evaluate(self, parameters, config):
        return loss, len(self.x_test), {"accuracy": acc}


def plot_metric_from_history(
    hist: None,
    save_plot_path: None,
    metric_type: None,
    metric: None,
) -> None:

    metric_dict = (
        hist.metrics_centralized
        if metric_type == "centralized"
        else hist.metrics_distributed
    )
    rounds, values = zip(*metric_dict[metric])
    plt.plot(np.asarray(rounds), np.asarray(values), color=colors[5], linewidth=5, label='Test')
    plt.legend(fontsize=45)
    plt.xlabel('Communication round', fontsize=40)
    plt.ylabel(metric, fontsize=50)
    plt.title(metric, fontsize=60)
    plt.xticks(fontsize=30)
    plt.yticks(fontsize=30)
    plt.ylim(0, 1)

def retrieve_global_metrics(
    hist: None,
    metric_type: None,
    metric: None,
    best_metric: None,
) -> None:

    metric_dict = (
        hist.metrics_centralized
        if metric_type == "centralized"
        else hist.metrics_distributed
    )
    rounds, values = zip(*metric_dict[metric])
    if best_metric:
      metric_return = max(values)
    else:
      metric_return = values[-1]
    return metric_return

In [None]:
def weighted_srs_wr(population, weights, k):
    # Chuẩn hóa trọng số để tính xác suất lấy mẫu
    total_weight = sum(weights)
    normalized_weights = [w / total_weight for w in weights]

    # Lấy mẫu theo trọng số
    samples = random.choices(population, weights=normalized_weights, k=k)

    return samples

In [None]:
from flwr.server.strategy.aggregate import aggregate
from flwr.server.strategy import Strategy

WARNING_MIN_AVAILABLE_CLIENTS_TOO_LOW = """"""

class FedAvg(Strategy):
    def __init__(
        self,
        *,
        fraction_fit: float = 1.0,
        fraction_evaluate: float = 1.0,
        min_fit_clients: int = 2,
        min_evaluate_clients: int = 2,
        min_available_clients: int = 2,
        evaluate_fn: Optional[
            Callable[
                [int, NDArrays, Dict[str, Scalar]],
                Optional[Tuple[float, Dict[str, Scalar]]],
            ]
        ] = None,
        on_fit_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
        on_evaluate_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
        accept_failures: bool = True,
        initial_parameters: Optional[Parameters] = None,
        fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
        evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
        total_time,
        latency_reduce,
        labels
    ) -> None:
        super().__init__()
        self.fraction_fit = fraction_fit
        self.fraction_evaluate = fraction_evaluate
        self.min_fit_clients = min_fit_clients
        self.min_evaluate_clients = min_evaluate_clients
        self.min_available_clients = min_available_clients
        self.evaluate_fn = evaluate_fn
        self.on_fit_config_fn = on_fit_config_fn
        self.on_evaluate_config_fn = on_evaluate_config_fn
        self.accept_failures = accept_failures
        self.initial_parameters = initial_parameters
        self.fit_metrics_aggregation_fn = fit_metrics_aggregation_fn
        self.evaluate_metrics_aggregation_fn = evaluate_metrics_aggregation_fn
        self.learning_rate = 0.01
        self.decay = 0.995
        self.loss = [1 for _ in range(50)]
        self.total_time = total_time
        self.latency_reduce = latency_reduce
        self.completion_time = 0
        self.round_time = 0
        self.labels = labels
        self.clients: Dict[str, ClientProxy] = {}

    def __repr__(self) -> str:
        rep = f"FedAvg(accept_failures={self.accept_failures})"
        return rep

    def num_fit_clients(self, num_available_clients: int) -> Tuple[int, int]:
        num_clients = int(num_available_clients * self.fraction_fit)
        return max(num_clients, self.min_fit_clients), self.min_available_clients

    def num_evaluation_clients(self, num_available_clients: int) -> Tuple[int, int]:
        num_clients = int(num_available_clients * self.fraction_evaluate)
        return max(num_clients, self.min_evaluate_clients), self.min_available_clients

    def initialize_parameters(
        self, client_manager: ClientManager
    ) -> Optional[Parameters]:
        initial_parameters = self.initial_parameters
        self.initial_parameters = None
        return initial_parameters

    def evaluate(
        self, server_round: int, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        if self.evaluate_fn is None:
            return None
        parameters_ndarrays = parameters_to_ndarrays(parameters)
        eval_res = self.evaluate_fn(server_round, parameters_ndarrays, {})
        if eval_res is None:
            return None
        loss, metrics = eval_res
        metrics["Completion time"] = self.completion_time
        return loss, metrics

    def avgloss(self, cluster):
        return np.sum([self.loss[c] for c in cluster])/len(cluster)

    def cal_weight(self):
        cluster_loss = [self.avgloss(cluster) for cluster in self.labels]
        sumloss= np.sum(cluster_loss)
        return [0.5*latency + 0.5*clusterloss/(sumloss) for latency, clusterloss in zip(self.latency_reduce, cluster_loss)]
    
    def configure_fit(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, FitIns]]:
        config = [{"learning_rate": self.learning_rate} for _ in range(10)]
        self.clients = client_manager.clients
        sample_size, min_num_clients = self.num_fit_clients(
            client_manager.num_available()
        )
        sample_choices = []
        latencies = []
        for i in range(5):
            ss = []
            latency = 0
            w = self.cal_weight()
            sample_cluster = weighted_srs_wr(self.labels, w,  10)
            for cluster in sample_cluster:
                client = cluster[0]
                ss.append(client)
                latency = max(latency, self.total_time[client])
            sample_choices.append(ss)
            latencies.append(latency)
        sampled_cids = sample_choices[np.argmin(latencies)]
        sampled_cids = [str(cid) for cid in sampled_cids]
        clients = [self.clients[(cid)] for cid in sampled_cids]
        self.round_time = np.max([self.total_time[int(id)] for id in sampled_cids])

        fit_ins = [FitIns(parameters, con) for con in config]
        return [(client, fit) for client,fit in zip(clients, fit_ins)]

    def configure_evaluate(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, EvaluateIns]]:
        if self.fraction_evaluate == 0.0:
            return []

        config = {}
        if self.on_evaluate_config_fn is not None:
            config = self.on_evaluate_config_fn(server_round)
        evaluate_ins = EvaluateIns(parameters, config)

        sample_size, min_num_clients = self.num_evaluation_clients(
            client_manager.num_available()
        )
        clients,cid = client_manager.sample(
            num_clients=sample_size, min_num_clients=min_num_clients
        )
        return [(client, evaluate_ins) for client in clients]

    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:

        weights_results = [
                (parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
                for _, fit_res in results
        ]
        result = [(fit_res.metrics["id"], fit_res.metrics["loss"])
                for _, fit_res in results]
        for id, loss in result:
            self.loss[id] = loss

        aggregated_ndarrays = aggregate(weights_results)
        self.learning_rate *= self.decay
        parameters_aggregated = ndarrays_to_parameters(aggregated_ndarrays)

        metrics_aggregated = {}
        self.completion_time += self.round_time

        return parameters_aggregated, metrics_aggregated

    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        if not results:
            return None, {}
        if not self.accept_failures and failures:
            return None, {}

        loss_aggregated = weighted_loss_avg(
            [
                (evaluate_res.num_examples, evaluate_res.loss)
                for _, evaluate_res in results
            ]
        )

        metrics_aggregated = {}
        if self.evaluate_metrics_aggregation_fn:
            eval_metrics = [(res.num_examples, res.metrics) for _, res in results]
            metrics_aggregated = self.evaluate_metrics_aggregation_fn(eval_metrics)
        elif server_round == 1:  # Only log this warning once
            log(WARNING, "No evaluate_metrics_aggregation_fn provided")
        return loss_aggregated, metrics_aggregated

In [None]:
random_state = 1
colors = ["#00cfcc","#e6013b","#007f88","#00cccd","#69e0da","darkblue","#FFFFFF"]
local_nodes_glob = 50
Alpha = 10

In [None]:
from tensorflow.keras.datasets import mnist, cifar10

(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255
my_federater = SplitAsFederatedData(random_state = random_state)

clients_glob_dic, list_ids_sampled_dic, miss_class_per_node, distances = my_federater.create_clients(image_list = train_images, label_list = train_labels,
                                                             num_clients = local_nodes_glob, prefix_cli='client', method = "dirichlet", alpha = Alpha)

clients_glob = clients_glob_dic['with_class_completion']
list_ids_sampled = list_ids_sampled_dic['with_class_completion']

list_x_train, list_y_train = from_FedArtML_to_Flower_format(clients_dict=clients_glob)

In [None]:
tcmp = [computation_time(len(list_x_train[i]), U[i], f[i]) for i in range(50)]
tcom = [communication_time(B[i], M, p[i], N, x[i], y[i]) for i in range(50)]
total_time = [tcom[i] + tcmp[i] for i in range(len(tcmp))]
print(total_time)

In [None]:
from sklearn.cluster import OPTICS

def split_clusters(labels):
    clusters = {}
    for i, label in enumerate(labels):
        if label not in clusters:
            clusters[label] = [i]
        else:
            clusters[label].append(i)
    return clusters

def hellinger_distance(p, q):
    return np.sqrt(0.5 * ((np.sqrt(p) - np.sqrt(q)) ** 2).sum())


def compute_hellinger_distance_matrix(distributions):
    n = len(distributions)
    distances = np.zeros((n, n))
    for i in range(n):
        for j in range(i+1, n):
            distances[i, j] = hellinger_distance(distributions[i], distributions[j])
            distances[j, i] = distances[i, j]
    return distances

def to_prob_dist(data):
    return data / np.sum(data, axis=1, keepdims=True)

def kmeans_no_small_clusters(data):
    counts = [dict(sorted(Counter(d).items())) for d in list_y_train]
    counts = [list(c.values()) for c in counts]
    counts = [0 if value is None else value for value in counts]
    counts = to_prob_dist(counts)
    distance_matrix = compute_hellinger_distance_matrix(counts)

    clustering = OPTICS(min_samples=2,
                  metric="precomputed").fit(distance_matrix)
    labels = split_clusters(clustering.labels_)
    labels = dict(sorted(labels.items(), key=lambda item: len(item[1])))
    new_dict = labels.copy()
    del new_dict[-1]

    new_key = max(new_dict.keys()) + 1
    for index, value in enumerate(labels[-1]):
        while new_key in new_dict:
            new_key += 1
        new_dict[new_key] = [value]
        new_key += 1
    return new_dict

labels = kmeans_no_small_clusters(list_y_train).values()
def sortf(item):
    return tcmp[item] + tcom[item]

new_label = []
for label in labels:
    label = sorted(label, key = sortf, reverse=True)
    new_label.append(label)
labels = new_label
print(labels)

In [None]:
cluster_latency = [np.max([total_time[i] for i in label]) for label in labels]
maxlatency = np.max(cluster_latency)

latency_reduce = [(1 - cluster_la/maxlatency) for cluster_la in cluster_latency]

In [None]:
def evaluate_DNN_CL(
    server_round: int,
    parameters: fl.common.NDArrays,
    config: Dict[str, fl.common.Scalar],
) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]:
    net = get_model()
    net.set_weights(parameters) # Update model with the latest parameters
    loss, accuracy, precision, recall, f1score  = test_model(net, test_images, test_labels)
    return loss, {"accuracy": accuracy,"precision": precision,"recall": recall,"f1score": f1score}

In [None]:
import json
comms_round = 100

def client_fn(cid: str) -> fl.client.Client:
    model = get_model()

    x_train_cid = np.array(list_x_train[int(cid)],dtype=float)
    y_train_cid = np.array(list_y_train[int(cid)],dtype=int)
    return FlowerClient(model, x_train_cid, y_train_cid, cid)

strategy=FedAvg(
        fraction_fit=0.2,  
        fraction_evaluate=0, 
        min_fit_clients=10,
        min_available_clients = 50,
        evaluate_fn=evaluate_DNN_CL,
        total_time = total_time,
        latency_reduce = latency_reduce,
        labels = labels
)


commun_metrics_history = fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=local_nodes_glob,
    config=fl.server.ServerConfig(num_rounds=comms_round),
    strategy=strategy,
    client_resources = {'num_cpus': 1, 'num_gpus': 0},
)

In [None]:
metrics_show = ["accuracy","precision","recall","f1score"]

# Define dimensions for plot
f, axs = plt.subplots(1,len(metrics_show),figsize=(70,15))

# Loop over the communication round history and metrics
for i in range(len(metrics_show)):
  plt.subplot(1, len(metrics_show), i + 1)
  plot_metric_from_history(commun_metrics_history,"any","centralized",metrics_show[i])

In [None]:
with open('outputacc.txt', 'w') as f:
    # Write some content to the file
    json.dump(commun_metrics_history.metrics_centralized["accuracy"], f)

with open('outputt.txt', 'w') as f:
    # Write some content to the file
    json.dump(commun_metrics_history.metrics_centralized["Completion time"], f)
