# Server

## Setting variables

In [1]:
rounds = 10
local_epoch = 1
users = 2 # number of clients
target_test_acc = 0.99
lr = 0.1

C = 1
E = 5
B = 10 # 'all' for a single minibatch

In [2]:
import os

import socket
import struct
import pickle
import sys

from threading import Thread
from threading import Lock

import copy

import logging
import math
import random
import re
import time
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from datetime import timedelta
from keras import backend as K
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelBinarizer
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Flatten
from tensorflow.keras.optimizers import SGD
from tqdm import tqdm
tf.get_logger().setLevel(logging.ERROR)

## Device

In [3]:
device = ("gpu" if tf.test.gpu_device_name() else "cpu")
print(device)

cpu


## Model

A CNN with two 5x5 convolution layers (the first with 32 channels, the second with 64, each followed with 2x2 max pooling), a fully connected layer with 512 units and ReLu activation, and a final softmax output layer (1,663,370 total parameters)

In [4]:
class CNN:
    @staticmethod
    def build(input_shape):
        model = Sequential()
        model.add(Conv2D(filters=32, kernel_size=(5,5), padding='same', activation='relu', input_shape=input_shape))
        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Conv2D(filters=64, padding='same', kernel_size=(5,5), activation='relu'))
        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Flatten())
        model.add(Dense(512, activation='relu'))
        model.add(Dense(10, activation='softmax'))
        return model

initialize global model

In [5]:
model = CNN()
  
global_model = model.build((28,28,1))
initial_weights = global_model.get_weights()

# client_models = [model.build((28,28,1)) for _ in range(K)]

# for i in range(len(client_models)):
#   client_models[i].compile(loss=loss, 
#                       optimizer=optimizer, 
#                       metrics=metrics)
#   client_models[i].set_weights(global_model.get_weights())

global_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 28, 28, 32)        832       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 14, 14, 32)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 14, 14, 64)        51264     
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 7, 7, 64)         0         
 2D)                                                             
                                                                 
 flatten (Flatten)           (None, 3136)              0         
                                                                 
 dense (Dense)               (None, 512)               1

## variables

In [6]:
clientsoclist = [0]*users

start_time = 0
weight_count = 0

global_weights = initial_weights

datasetsize = [0]*users
weights_list = [0]*users

lock = Lock()

## Comunication overhead

In [7]:
total_sendsize_list = []
total_receivesize_list = []

client_sendsize_list = [[] for i in range(users)]
client_receivesize_list = [[] for i in range(users)]

train_sendsize_list = [] 
train_receivesize_list = []

## Socket initialization
### Set host address and port number

### Required socket functions

In [8]:
def send_msg(sock, msg):
    # prefix each message with a 4-byte length in network byte order
    msg = pickle.dumps(msg)
    l_send = len(msg)
    msg = struct.pack('>I', l_send) + msg
    sock.sendall(msg)
    return l_send

def recv_msg(sock):
    # read message length and unpack it into an integer
    raw_msglen = recvall(sock, 4)
    if not raw_msglen:
        return None
    msglen = struct.unpack('>I', raw_msglen)[0]
    # read the message data
    msg =  recvall(sock, msglen)
    msg = pickle.loads(msg)
    return msg, msglen

def recvall(sock, n):
    # helper function to receive n bytes or return None if EOF is hit
    data = b''
    while len(data) < n:
        packet = sock.recv(n - len(data))
        if not packet:
            return None
        data += packet
    return data

In [9]:
import copy

def average_weights(w, datasize):
    """
    Returns the average of the weights.
    """
    avg_weights = list()
    for j in range(len(global_weights)):
        weights = [w[k][j] for k in range(len(w))]
        layer_mean = tf.math.reduce_mean(weights, axis=0)
        avg_weights.append(layer_mean)
        
    return avg_weights

## Train

In [10]:
def train(userid, train_dataset_size, num_users, client_conn):
    global weights_list
    global global_weights
    global weight_count
    global val_acc
    
    for r in range(rounds):
        with lock:
            if weight_count == num_users:
                for i, conn in enumerate(clientsoclist):
                    datasize = send_msg(conn, global_weights)
                    total_sendsize_list.append(datasize)
                    client_sendsize_list[i].append(datasize)
                    train_sendsize_list.append(datasize)
                    weight_count = 0

        client_weights, datasize = recv_msg(client_conn)
        total_receivesize_list.append(datasize)
        client_receivesize_list[userid].append(datasize)
        train_receivesize_list.append(datasize)

        weights_list[userid] = client_weights
        print("User" + str(userid) + "'s Round " + str(r + 1) +  " is done")
        with lock:
            weight_count += 1
            if weight_count == num_users:
                #average
                global_weights = average_weights(weights_list, datasetsize)
#                 global_model.set_weights(global_weights)

## Receive users before training

In [11]:
def receive(userid, num_users, conn): #thread for receive clients
    global weight_count
    global datasetsize


    msg = {
        'rounds': rounds,
        'client_id': userid,
        'local_epoch': local_epoch
    }

    datasize = send_msg(conn, msg)    #send epoch
    total_sendsize_list.append(datasize)
    client_sendsize_list[userid].append(datasize)

    train_dataset_size, datasize = recv_msg(conn)    # get total_batch of train dataset
    total_receivesize_list.append(datasize)
    client_receivesize_list[userid].append(datasize)
    
    
    with lock:
        datasetsize[userid] = train_dataset_size
        weight_count += 1
    
    train(userid, train_dataset_size, num_users, conn)

## Thread define

In [12]:
def run_thread(func, num_user):
    global clientsoclist
    global start_time
    
    thrs = []
    for i in range(num_user):
        conn, addr = s.accept()
        print('Conntected with', addr)
        # append client socket on list
        clientsoclist[i] = conn
        args = (i, num_user, conn)
        thread = Thread(target=func, args=args)
        thrs.append(thread)
        thread.start()
    print("timmer start!")
    start_time = time.time()    # store start time
    for thread in thrs:
        thread.join()
    end_time = time.time()  # store end time
    print("TrainingTime: {} sec".format(end_time - start_time))

In [13]:
host_name = socket.gethostbyname(socket.gethostname())
port_number = 12345
print(host_name)

172.16.2.211


In [14]:
s = socket.socket()
s.bind((host_name, port_number))
s.listen(5)

### Open the server socket

In [15]:
run_thread(receive, users)

Conntected with ('172.16.2.211', 63390)
Conntected with ('172.16.2.211', 63391)
timmer start!
User0's Round 1 is doneUser1's Round 1 is done

User0's Round 2 is done
User1's Round 2 is done
User0's Round 3 is done
User1's Round 3 is done
User0's Round 4 is done
User1's Round 4 is done
User0's Round 5 is doneUser1's Round 5 is done

User0's Round 6 is done
User1's Round 6 is done
User0's Round 7 is done
User1's Round 7 is done
User0's Round 8 is done
User1's Round 8 is done
User0's Round 9 is doneUser1's Round 9 is done

User1's Round 10 is done
User0's Round 10 is done
TrainingTime: 161.2054409980774 sec


In [16]:
end_time = time.time()  # store end time
print("TrainingTime: {} sec".format(end_time - start_time))

TrainingTime: 161.21686220169067 sec


## Print all of communication overhead

In [17]:
# print('val_acc list')
# for acc in val_acc:
#     print(acc)

print('\n')
total_size = 0
for size in total_sendsize_list:
#     print(size)
    total_size += size
print("total_sendsize size: {} bytes".format(total_size))
print('\n')

total_size = 0
for size in total_receivesize_list:
#     print(size)
    total_size += size
print("total receive sizes: {} bytes".format(total_size) )
print('\n')

for i in range(users):
    print('client_sendsize_list(user{}): '.format(i))
    total_size = 0
    for size in client_sendsize_list[i]:
#         print(size)
        total_size += size
    print("total client_sendsizes(user{}): {} bytes".format(i, total_size))
    print('\n')

    total_size = 0
    for size in client_receivesize_list[i]:
#         print(size)
        total_size += size
    print("total client_receive sizes(user{}): {} bytes".format(i, total_size))
    print('\n')

total_size = 0
for size in train_sendsize_list:
#     print(size)
    total_size += size
print("total train_sendsizes: {} bytes".format(total_size))
print('\n')

total_size = 0
for size in train_receivesize_list:
#     print(size)
    total_size += size
print("total train_receivesizes: {} bytes".format(total_size))
print('\n')




total_sendsize size: 133079930 bytes


total receive sizes: 133077990 bytes


client_sendsize_list(user0): 
total client_sendsizes(user0): 66539965 bytes


total client_receive sizes(user0): 66538995 bytes


client_sendsize_list(user1): 
total client_sendsizes(user1): 66539965 bytes


total client_receive sizes(user1): 66538995 bytes


total train_sendsizes: 133079816 bytes


total train_receivesizes: 133077980 bytes




In [18]:
mnist = tf.keras.datasets.mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

In [19]:
X_train = X_train.astype("float32")/255
X_test = X_test.astype("float32")/255
X_train = np.expand_dims(X_train, -1)
X_test = np.expand_dims(X_test, -1)
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)

print("x_train shape:", X_train.shape)
print(X_train.shape[0], "train samples")
print(X_test.shape[0], "test samples")

x_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples


In [20]:
train_batched = tf.data.Dataset.from_tensor_slices((X_train, y_train)).batch(len(y_train)) # for testing on train set
test_batched = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(len(y_test))

## Accuracy of train and each of classes

In [21]:
result_per_lr = {}
train_losses = []
train_accs = []
test_losses = []
test_accs = []

loss='categorical_crossentropy'
metrics = ['accuracy']
cce = tf.keras.losses.CategoricalCrossentropy()

start = time.time()
# optimizer = tf.keras.optimizers.legacy.SGD(learning_rate=lr)
# global_model.set_weights(initial_weights)
print('\nlearning rate: {}'.format(lr))
# for r in range(rounds):
# global_weights = global_model.get_weights()
global_model.set_weights(global_weights)

# test global model on full training set
for (X,y) in train_batched:
  preds = global_model.predict(X)
  train_loss = cce(y, preds)
  train_acc = accuracy_score(tf.argmax(preds, axis=1), tf.argmax(y, axis=1))
  train_losses.append(train_loss.numpy())
  train_accs.append(train_acc)

# test global model on testing set
for(X, y) in test_batched:
  preds = global_model.predict(X)
  test_loss = cce(y, preds)
  test_acc = accuracy_score(tf.argmax(preds, axis=1), tf.argmax(y, axis=1))
  test_losses.append(test_loss.numpy())
  test_accs.append(test_acc)

elapsed = (time.time() - start)

print('comm_round: {} | test_acc: {:.3%} | test_loss: {} | train_acc: {:.3%} | train_loss: {} | elapsed: {}'.format(rounds, test_acc, test_loss, train_acc, train_loss, timedelta(seconds=elapsed)))

result_per_lr[lr] = {
    'train_accs' : train_accs,
    'test_accs' : test_accs,
    'train_losses' : train_losses,
    'test_losses' : test_losses
                      }
with open(dir+'result_per_lr_{}_{}_{}_{}.pickle'.format(B,C,E, lr), 'wb') as handle:
    pickle.dump(result_per_lr, handle, protocol=pickle.HIGHEST_PROTOCOL)



learning rate: 0.1
comm_round: 10 | test_acc: 46.180% | test_loss: 5.320533752441406 | train_acc: 46.368% | train_loss: 5.233935356140137 | elapsed: 0:00:12.976362


TypeError: unsupported operand type(s) for +: 'builtin_function_or_method' and 'str'

In [None]:
end_time = time.time()  # store end time
print("WorkingTime: {} sec".format(end_time - start_time))