In [None]:
import numpy as np
import pickle
import tracemalloc
import random
import os
import psutil
import shutil
os.environ['NUMEXPR_MAX_THREADS'] = '16'
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
import numexpr as ne
import time
import matplotlib.pyplot as plt
import gc
import sys
import ctypes
import tensorflow as tf
from client import Client
from edgeserver import Edgeserver
from server import Server 
from datasets_partitioning.mnist_cifar10 import get_dataset
from datasets_partitioning.mnist_cifar10 import k_niid_equal_size_split
from datasets_partitioning.mnist_cifar10 import k_niid_equal_size_split_1
from datasets_partitioning.mnist_cifar10 import Gaussian_noise
from datasets_partitioning.mnist_cifar10 import get_classes
from datasets_partitioning.mnist_cifar10 import random_edges
from datasets_partitioning.mnist_cifar10 import iid_edges
from datasets_partitioning.mnist_cifar10 import niid_edges
from datasets_partitioning.mnist_cifar10 import iid_equal_size_split
from datasets_partitioning.mnist_cifar10 import iid_nequal_size_split
from datasets_partitioning.mnist_cifar10 import niid_labeldis_split
from tensorflow.keras.models import load_model
from model.initialize_model import create
from tensorflow.keras.utils import plot_model,to_categorical
from plots import client_plot


# =============================================================================================================
#                                 🔷               Partitioning                🔷
# =============================================================================================================
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        print(e)

dataset="mnist"
if dataset=='cifar10' or dataset=="mnist":
    num_labels=10
model="cnn1"   #or cnn2, cnn3
batch_size=32
communication_round=3              
epochs=10                          #  number of local update 
num_edge_aggregation=4             #  number of edge aggregation 
num_edges=3  
num_clients=30 
folder="IID-mnist"
fraction_clients=1              # fraction of participated clients
lr=0.01
val_ratio=0.1     
#beta=0.5         
number_labels=10
image_shape=(28,28,1)
loss="categorical_crossentropy"      #optimizer is "Adam"
metrics=["accuracy"]
verbose=0    
seed=4   
np.random.seed(seed)
random.seed(seed)
optimizer=tf.keras.optimizers.SGD(learning_rate=lr)

#     ********** Get dataset **********
tracemalloc.start()
process=psutil.Process()
start_rss=process.memory_info().rss

X_train ,Y_train,X_test,Y_test=get_dataset(dataset,model) 
#X_train ,Y_train,X_test,Y_test=X_train[:21000] ,Y_train[:21000],X_test[:9000],Y_test[:9000]

#     ********** partitioning and assigning ********** 
print('1 : clients_iid (equal size)\n'
      '2 : clients_iid (nonequal size)\n'
      '3 : each client owns data samples of a fixed number of labels\n'
      '4 : each client(and edge) owns data samples of a different feature distribution\n'
      '5 : each client owns a proportion of the samples of each label\n')
flag1=int(input('select a number:')) 
print("\nUsing a locally saved model?\n"
        "1 : YES\n"
        "0 : NO\n")
replace=int(input('select a number:'))

#     ***********clients_iid*****************
if flag1 in (1,2):                                      
    print('\n** randomly are assigned clients to edgesevers **')
    clients=[]
    edges=[]
    
    if flag1==1:
        train_partitions=iid_equal_size_split(X_train,Y_train,num_clients)
        test_partitions=iid_equal_size_split(X_test,Y_test,num_clients)
    else:
        train_partitions=iid_nequal_size_split(X_train,Y_train,num_clients,beta)
        test_partitions=iid_nequal_size_split(X_test,Y_test,num_clients,beta)
        
    for i in range(num_clients):
        clients.append(Client(i,train_partitions[i],test_partitions[i],dataset,model,loss,metrics,
                                                         lr,batch_size,image_shape)) 
    assigned_clients_list=random_edges(num_edges,num_clients) 
    for edgeid in range(num_edges):
        edges.append(Edgeserver(edgeid,assigned_clients_list[edgeid],dataset,model,loss,metrics,lr,image_shape))
        for client_name in assigned_clients_list[edgeid]:               
            index=int(client_name.split('_')[1])-1               
            edges[edgeid].client_registering(clients[index])
    clients_per_edge=int(num_clients/num_edges)
    server=Server(dataset,model,loss,metrics,lr,image_shape)   

    print(tracemalloc.get_traced_memory()) 
    del X_train,Y_train,X_test,Y_test,train_partitions,test_partitions,assigned_clients_list
    gc.collect()
    print(tracemalloc.get_traced_memory()) 
    
#     **************** partitioning and assigning from up to down ******************
#     ********** each edge owns data samples of a fixed number of labels ********** 
elif flag1==3:                                       
    clients_per_edge=int(num_clients/num_edges)
    k1=int(input('\nk1 : number of labels for each edge  ?  '))
    k2=int(input('k2 : number of labels for clients per edge  ?  '))
    print(f'\n** assign each edge {clients_per_edge} clients with {k1} classes'
          f'\n** assign each client samples of {k2}  classes of {k1} edge classes')
    
    label_list=list(range(num_labels))
    X_train,Y_train,X_test,Y_test,party_labels_list=k_niid_equal_size_split(X_train,Y_train,X_test,
                                                                        Y_test,num_edges,label_list,k1,flag1)  
    clients=[]
    edges=[]
    index=0  
    for edgeid in range(num_edges):           
        train_partitions,test_partitions=k_niid_equal_size_split(X_train[edgeid],Y_train[edgeid],X_test[edgeid],
                                                Y_test[edgeid],clients_per_edge,party_labels_list[edgeid],k2)
        assigned_clients=[]
        for i in range(clients_per_edge):
            clients.append(Client(index,train_partitions[i],test_partitions[i],dataset,model,loss,metrics,
                                                         lr,batch_size,image_shape))   
            assigned_clients.append(index)
            index+=1
        assigned_clients=list(map(lambda x :f'client_{x+1}',assigned_clients))
        edges.append(Edgeserver(edgeid,assigned_clients,dataset,model,loss,metrics,lr,image_shape))
        for client_name in assigned_clients:                 # client's name : 'client_k'
            idx=int(client_name.split('_')[1])-1                # k-1
            edges[edgeid].client_registering(clients[idx])
        for i in range(clients_per_edge):
            print(f'{edges[edgeid].cnames[i]}')
        print(f'be assigned to {edges[edgeid].name}')
    server=Server(dataset,model,loss,metrics,lr,image_shape)   

    print(tracemalloc.get_traced_memory()) 
    del X_train,X_test,Y_train,Y_test,test_partitions,train_partitions
    gc.collect()  
    print(tracemalloc.get_traced_memory()) 

#     ********** each edge owns data samples of a different feature distribution ********** 
#     ***** each edge owns data samples of 10 labels but each client owns data samples of one or 10 labels ***** 
elif flag1==4:                                   
    original_std=float(input('\noriginal standard deviation for gaussian noise  ?  '))
    k=int(input('k : number of labels for clients of each edge  ?  '))  
    
    X_train,Y_train=iid_equal_size_split(X_train,Y_train,num_edges,flag1) 
    X_test,Y_test=iid_equal_size_split(X_test,Y_test,num_edges,flag1)
    #basic_std=0.1      
    edges=[]
    clients=[]
    clients_per_edge=int(num_clients/num_edges)
    labels_list=list(range(num_labels)) 
    mean=0      
    index=0 
    for edgeid in range(num_edges):
        train_noisy_edge=Gaussian_noise(X_train[edgeid],original_std,edgeid,num_edges,mean)
        test_noisy_edge=Gaussian_noise(X_test[edgeid],original_std,edgeid,num_edges,mean)
        train_party_partitions,test_party_partitions=k_niid_equal_size_split(train_noisy_edge,Y_train,test_noisy_edge, 
                                                                             Y_test,clients_per_edge,labels_list,k)
        assigned_clients=[]
        for i in range(clients_per_edge):
            clients.append(Client(index,train_party_partitions[i],test_party_partitions[i],dataset,model,loss,metrics,
                                                         lr,batch_size,image_shape))  
            assigned_clients.append(index)
            index+=1
        assigned_clients=list(map(lambda x :f'client_{x+1}',assigned_clients))
        edges.append(Edgeserver(edgeid,assigned_clients,dataset,model,loss,metrics,lr,image_shape))
        for client_name in assigned_clients:                  
            idx=int(client_name.split('_')[1])-1                
            edges[edgeid].client_registering(clients[idx])
        for i in range(clients_per_edge):
            print(f'{edges[edgeid].cnames[i]}')
        print(f'be assigned to {edges[edgeid].name}')
    server=Server(dataset,model,loss,metrics,lr,image_shape)   
    
    print(tracemalloc.get_traced_memory()) 
    del X_train,Y_train,X_test,Y_test,train_partitions,test_partitions,train_noisy_edge,test_noisy_edge,train_party_partitions,test_party_partitions
    gc.collect()
    print(tracemalloc.get_traced_memory())
    
#     ************** each client owns a proportion of the samples of each label **************
elif flag1==5:                       
    train_partitions=niid_labeldis_split(X_train,Y_train,num_clients,'train',beta)
    test_partitions=niid_labeldis_split(X_test,Y_test,num_clients,'test',beta)
    clients=[]
    edges=[]
    clients_per_edge=int(num_clients/num_edges)
    index=0  
    for edgeid in range(num_edges):                           
        assigned_clients=[]
        for _ in range(clients_per_edge):
            client_classes=get_classes(train_partitions[index])
            clients.append(Client(index,train_partitions[index],test_partitions[index],dataset,model,loss,metrics,
                                                         lr,batch_size,image_shape))  
            assigned_clients.append(index)
            index+=1
        assigned_clients=list(map(lambda x :f'client_{x+1}',assigned_clients))
        edges.append(Edgeserver(edgeid,assigned_clients,dataset,model,loss,metrics,lr,image_shape))
        for client_name in assigned_clients:                 
            idx=int(client_name.split('_')[1])-1               
            edges[edgeid].client_registering(clients[idx])
        for i in range(clients_per_edge):
            print(f'{edges[edgeid].cnames[i]}')
        print(f'be assigned to {edges[edgeid].name}')
    server=Server(dataset,model,loss,metrics,lr,image_shape)   
    
    print(tracemalloc.get_traced_memory()) 
    del X_train,Y_train,X_test,Y_test,train_partitions,test_partitions
    gc.collect()
    print(tracemalloc.get_traced_memory()) 

In [None]:
####     ********** Select type of server aggregation ********** 
path=fr'.\results\edges_models\{folder}\\'                     
for file_name in os.listdir(path):
    file=path+file_name
    if os.path.isfile(file):
        os.remove(file)
        
path=fr'.\results\edges_models\{folder}\\'                       
for file_name in os.listdir(path):
    file=path+file_name
    shutil.rmtree(file)
    
path=fr'.\results\global_models\{folder}\\'                    
for file_name in os.listdir(path):
    file=path+file_name
    if os.path.isfile(file):
        os.remove(file)
        
path=fr'.\results\fig\{folder}\\'                        
for file_name in os.listdir(path):
    file=path+file_name
    if os.path.isfile(file):
        os.remove(file)
print('method_1(m1) : uses number of samples of all clients \nmethod_2(m2) :uses number of samples of the the participated clients...')
type_of_server_agg=input('\nselect a method for server aggregation:')

# assigning edges to server 
for edge in edges:                                   
    server.edgeserver_registering(edge)                   

In [None]:
# =============================================================================
#                       🔷     GLOBAL TRAINING PHASE         🔷
# =============================================================================

server.model.save(fr".\results\global_models\{folder}\itr_0.h5")
for comm_r in range(communication_round):    
  
    print(f'===================================={comm_r+1} c_round...start================================================')
    for edge in edges:
        server.send_to_edgeserver(edge) 
                  
    #buffer is cleared              
    server.refresh_server() 

    # my assumption: all edges participate in training phase in each communication round             
    for num_agg in range(num_edge_aggregation):
        print(f'--------------------------------------{num_agg+1} agg...start---------------------------------------') 
        for edge in edges:
            print(f'************{edge.name}******************start')

            # buffer & participated_sample are cleared
            edge.refresh_edgeserver()
                              
            #fraction of clients of each edge participate ...
            selected_clients_num=max(int(clients_per_edge*fraction_clients),1)
            selected_clients_name=np.random.choice(edge.cnames,selected_clients_num,replace=False)
            for client_name in selected_clients_name:                 
                index=int(client_name.split('_')[1])-1               
                edge.client_registering(clients[index])               # parti.._sample
            
            for client_name in selected_clients_name: 
                index=int(client_name.split('_')[1])-1
                edge.send_to_client(clients[index])    
                for i in range(8):
                    print(np.testing.assert_allclose(edge.model.get_weights()[i],clients[index].model.get_weights()[i]))
                
                print(f"\n--------------------------------> {client_name} be selected:")
                if comm_r!=0 or num_agg!=0:
                    clients[index].local_model_train(epochs,batch_size,verbose,folder,comm_r,num_agg)   
                else:
                    if replace==1:
                        clients[index].model=load_model(fr".\results\clients_models\{folder}\{clients[index].name}.h5")
                    else:
                        clients[index].local_model_train(epochs,batch_size,verbose,folder,comm_r,num_agg)  
                        clients[index].model.save(fr".\results\clients_models\{folder}\{clients[index].name}.h5")
                clients[index].send_to_edgeserver(edge)               # buffer
            
                for i in range(8):
                    print(np.testing.assert_allclose(edge.buffer[client_name][i],clients[index].model.get_weights()[i]))
                print(edge.participated_sample[client_name]==clients[index].train_num)
                
            edge.aggregate(comm_r,num_agg,folder)

            print(f'************{edge.name}******************end')
    #************end for/// iteration in edges
        print(f'--------------------------------------{num_agg+1} agg...end---------------------------------------')
    #*********** end for///edge aggregation        
                  
    # begin server aggregation
    if type_of_server_agg=='m1':
        for edge in edges:                            
            edge.send_to_server(server)     # server' buffer
            for i in range(8):
                print(np.testing.assert_allclose(edge.model.get_weights()[i],server.buffer[edge.name][i]))
                    
        server.aggregate_method1(comm_r,folder)
    else:        
        num_samples_in_agg_for_edges=[]
        for edge in edges:                            
            edge.send_to_server(server)     # server' buffer
            num_samples_in_agg_for_edges.append(sum(edge.num_samples_in_agg))    
        server.aggregate_method2(num_samples_in_agg_for_edges,comm_r,folder)
        for edge in edges():
            edge.num_samples_in_agg.clear()

    for client in clients:
        acc=client.test_s(server)
        client.acc.append(acc)
    print(f'===================================={comm_r+1} c_round...end================================================')

print(process.memory_info().rss-start_rss)
print(tracemalloc.get_traced_memory())
tracemalloc.stop()

In [None]:
#=================================================================================================
#   🔷   SEND GLOBAL MODEL TO CLIENTS AND ...         🔷
# =================================================================================================
for edge in edges:                                       
    server.send_to_edgeserver(edge)  
for edge in edges:
    for client_name in edge.cnames:
        index=int(client_name.split('_')[1])-1
        edge.send_to_client(clients[index]) 

In [None]:
#=============================================================================
#         🔷         TRANSFER LEARNING       🔷
# =============================================================================
if flag1==3:
    for client in clients: 
        for layer in client.model.layers[:-2]:                     
            layer.trainable=False
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.00001)
        client.m_compile(loss=loss,optimizer=optimizer,metrics=metrics)
        client.local_model_train(epochs=epochs,batch_size=batch_size,verbose=0)  #fit // epoch , bs متفاوت یا قبلی؟  
        acc=client.test()
        client.acc.append(acc)

In [None]:
# ==================================================================================
#     🔷     ACCURACY REPORT  (with personalized ,without personalized)        🔷
# ==================================================================================
for client in clients:
    print(client.name,":",client.acc,"------",client.comm_agg)

In [None]:
# =============================================================================
#                        🔷    clients PLOTS         🔷
# =============================================================================
c_model=create(dataset,model,loss,metrics,lr,image_shape)

In [None]:
for edge in edges:
    for client_name in edge.cnames:
        index=int(client_name.split('_')[1])-1
        file=fr'.\results\global_models\{folder}\itr_0.h5'
        c_model.load_weights(file)
        clients[index].predict(c_model,0)            # 0 -->  level 0 : sever model
        for comm_r in range(communication_round):
            
            for num_agg in range(num_edge_aggregation):
                file=fr'.\results\edges_models\{folder}\comm_{comm_r+1}_agg_{num_agg+1}_{client_name}.h5'
                if os.path.isfile(file):
                    c_model.load_weights(file)
                    clients[index].predict(c_model,2)      #2 --> level 2 : client model
                else:       
                    clients[index].all_acc.append(clients[index].all_acc[-1])
                    
                file=fr'.\results\edges_models\{folder}\itr_{comm_r+1}\agg_{num_agg+1}_{edge.name}.h5'
                c_model.load_weights(file)
                clients[index].predict(c_model,1)            # 1 --> level 1 : edge model 
                
            file=fr'.\results\global_models\{folder}\itr_{comm_r+1}.h5'
            c_model.load_weights(file)
            clients[index].predict(c_model,0)

In [None]:
for client in clients:
    client_plot(client,folder)      
    
for client in clients:
    print(client.name ,"--", "local :",client.all_acc[1] , "/" , "fed :" ,client.all_acc[-1])