# Federated Learning:

An example code using MNIST dataset.

__Federated Learning is mostly suited for parameterized learning — this includes all variants of neural networks__. In contrast, traditional Machine Learning models like Decision Trees, KNN, etc. which just store the training data while training might not fully utilize the potentials of Federated Learning.


__Refer:__

https://towardsdatascience.com/federated-learning-a-step-by-step-implementation-in-tensorflow-aac568283399

In [26]:
import numpy as np
import random
import cv2
import os
from imutils import paths
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score

import tensorflow as tf
from tensorflow.keras.layers import AveragePooling2D, Conv2D, MaxPooling2D, ReLU
from tensorflow.keras import models, layers, datasets
from tensorflow.keras.layers import Dense, Flatten, Reshape, Input, InputLayer
from tensorflow.keras.models import Sequential, Model

from tensorflow.keras.optimizers import SGD
from tensorflow.keras import backend as K


In [2]:
from tutorial_master.fl_mnist_implementation_tutorial_utils import *

### Reading and preprocessing MNIST data set:

The JPEG version of MNIST data set can be downloaded from [Kaggle](https://www.kaggle.com/scolianni/mnistasjpg). It consists of 42000 digit images with each class kept in
separate folder. The data will be loaded into memory with the code below. For validation data, 10% of this data is used for the finally trained global model.

In [3]:
def load(paths, verbose = -1):
    '''
    Funtion to load MNIST JPEG digit images stored in it's respective folders.
    All gigist belonging to class '9' will exist in a directory named '9'
    
    Returns a list of loaded images as numpy arrays.
    '''
    data = list()
    labels = list()
    
    # loop over the input images
    for (i, imgpath) in enumerate(paths):
        # Read in image as greyscale-
        im_gray = cv2.imread(imgpath, cv2.IMREAD_GRAYSCALE)
        
        # Flatten image due to MLP neural network-
        image = np.array(im_gray).flatten()
        
        # Extract label using path to directory-
        label = imgpath.split(os.path.sep)[-2]
        
        # Normalize image to be in the range [0, 1]-
        data.append(image/255)
        
        # Add label to list-
        labels.append(label)
        
        # show an update every 'verbose' images
        if verbose > 0 and i > 0 and (i + 1) % verbose == 0:
            print("[INFO] processed {}/{}".format(i + 1, len(paths)))
            
    # return a tuple of the data and labels
    return data, labels


In [4]:
# Specify the absolute path for MNIST training dataset folder-

# Folder containing training images-
# img_path = 'D:\\Other_Files\\Research_Works\\MNIST_dataset\\trainingSample\\trainingSample\\'
# NOTE: There are 60 images each per folder. Hence, total number of training images = 60 x 10 = 600

img_path = "D:\\Other_Files\\Research_Works\\MNIST_dataset\\trainingSet\\trainingSet\\"

# Obtain directory using path object-
image_paths = list(paths.list_images(img_path))

# Load images and labels-
image_list, label_list = load(image_paths, verbose = 10000)

[INFO] processed 10000/42000
[INFO] processed 20000/42000
[INFO] processed 30000/42000
[INFO] processed 40000/42000


In [5]:
# Sanity check-
len(image_list), len(label_list)

(42000, 42000)

In [6]:
# Create labels using 1-hot-encode the labels-
lb = LabelBinarizer()
label_list = lb.fit_transform(label_list)

# Split dataset into training and testing sets-
X_train, X_test, y_train, y_test = train_test_split(
    image_list, label_list, 
    test_size = 0.1)

In [7]:
type(X_train), type(y_train), type(X_test), type(y_test)

(list, numpy.ndarray, list, numpy.ndarray)

In [8]:
y_train.shape, y_test.shape

((37800, 10), (4200, 10))

In [9]:
len(X_train), len(X_test)

(37800, 4200)

In [10]:
# Convert from np array to list-
y_train = list(y_train)
y_test = list(y_test)

In [11]:
len(y_train)

37800

In [12]:
y_train[0]

array([0, 0, 0, 0, 0, 1, 0, 0, 0, 0])

In [None]:
'''
print(f"X_train.shape = {X_train.shape} & y_train.shape = {y_train.shape}")
print(f"X_test.shape = {X_test.shape} & y_test.shape = {y_test.shape}")
'''

### Federated Members (clients) as Data Shards:

For real world Federated Learningapplication/implementation, each federated member usually has it's own data associated with it in isolation. The aim of FL is to deploy models to data and not vice-versa. The shard creation step shown here happens only for experiments.

For this particular code example, the training set is shared/divided into 10 shards, one for each FL client. 'create_clients()' aims to achieve this sharding.

In [13]:
def create_clients(image_list, label_list, num_clients = 10, initial = 'client'):
    '''
    return: a dictionary with key = clients' names and value = data shards
    
    input args: 
    
    image_list: a list of numpy arrays of training images
    label_list:a list of binarized labels for each image
    num_client: number of fedrated members (clients)
    initials: the clients'name prefix, e.g, client_1
    '''

    # Create a list of client names-
    client_names = ['{}_{}'.format(initial, i + 1) for i in range(num_clients)]

    # Randomly shuffle image data-
    data = list(zip(image_list, label_list))
    random.shuffle(data)

    # Split/Shard image data and place each at each client-
    size = len(data) // num_clients
    shards = [data[i: i + size] for i in range(0, size*num_clients, size)]

    # Make sure that: number of clients must equal number of shards-
    assert(len(shards) == len(client_names))

    return {client_names[i] : shards[i] for i in range(len(client_names))}


In [14]:
# Create clients-
clients = create_clients(X_train, y_train, num_clients = 10, initial = 'client')

In [15]:
for client_name in clients.keys():
    print(f"client name: {client_name} has {len(clients[client_name])} training data")

client name: client_1 has 3780 training data
client name: client_2 has 3780 training data
client name: client_3 has 3780 training data
client name: client_4 has 3780 training data
client name: client_5 has 3780 training data
client name: client_6 has 3780 training data
client name: client_7 has 3780 training data
client name: client_8 has 3780 training data
client name: client_9 has 3780 training data
client name: client_10 has 3780 training data


### Processing and batching clients’ and test data:

Process all of the client's shards/data into TF dataset and then batch them-

In [17]:
def batch_data(data_shard, batch_size = 32):
    '''
    Creates tf.Dataset object using a client's 'data_shard'
    
    Args:
    shard: a data, label tuple constituting a client's data shard
    batch_size :batch size
    
    Returns:
    tfds object
    '''
    
    # Split a data shard as tuple separate data and labels lists-
    data, label = zip(*data_shard)
    
    dataset = tf.data.Dataset.from_tensor_slices((list(data), list(label)))
    
    return dataset.shuffle(len(label)).batch(batch_size = batch_size)


In [18]:
# Process and batch training for each client-
clients_batched = {}

for client_name in clients.keys():
    clients_batched[client_name] = batch_data(clients[client_name])

In [21]:
# Process and batch testing set as well-
test_batched = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(batch_size = len(y_test))

### Design LeNet-300-100 Dense Neural Network:

Remember that, FL is mostly suited for parameterized learning — all types of neural networks. Machine learning techniques such as KNN or it likes that merely store training data while learning might not benefit from FL.

In [34]:
def create_lenet300():
    """
    Function to define the architecture of a neural network model
    following 300 100 architecture for MNIST dataset.
    
    Output: Returns designed and compiled neural network model
    """
    
    model = Sequential()
    
    model.add(InputLayer(input_shape = (784, )))
    
    # model.add(Flatten())
    
    model.add(
        Dense(units = 300, activation = 'relu', kernel_initializer=tf.initializers.GlorotUniform())
    )
    
    # model.add(l.Dropout(0.2))
    
    model.add(
        Dense(units = 100, activation = 'relu', kernel_initializer=tf.initializers.GlorotUniform()),
    )
    
    # model.add(l.Dropout(0.1))
    
    model.add(
        Dense(units = 10, activation = 'softmax'),
    )
    
    '''
    'comms_round' is the number of global epochs or, aggregations which will be executed/performed during training.
    
    Rather than decaying the learning rate with respect to the number of local epochs as you might be familiar with,
    here, the learning rate is decayed with respect to the number of global aggregation.
    
    This is a hyper parameter which can be found with experiments.
    
    Refer to the research paper for more details and theory-
    "Federated Learning with Non-IID Data" by Yue Zhao et al.
    '''
    comms_round = 100
    
    lr = 0.01
    
    # Compile model-
    model.compile(
        loss = tf.keras.losses.categorical_crossentropy,
        
        # optimizer='adam',
        # optimizer = tf.keras.optimizers.Adam(lr = 0.0012),
        optimizer = tf.keras.optimizers.SGD(
            lr = lr, decay = lr / comms_round,
            momentum = 0.9
        ),
        
        metrics = ['accuracy'])
    
    return model


### Model Aggregation (Federated Averaging):

The pre-processing steps until now can be considered to be part of a typical deep learning piple with the exception of data sharding/partitioning and the client creation part.

Now, the focus is shifted to the _vanilla version of Federated Learning_. The data being used is horizontally partitioned, so we will simply be doing component wise parameter averaging which will be weighed based on the proportion of data points contributed by each participating client.

The federated averaging equation being used is based on the research paper "Communication-Efficient Learning of Deep Networks from Decentralized Data" by H. Brendan McMahan and is as follows:

$f\left(x\right) = \sum_{k=1}^{K}\frac{n_k}{n}$ where $F_k\left(w\right) = \frac{1}{n_k}\sum_{i \in P_k} f_i\left(w\right)$

On the right hand side, we are estimating the weight parameters for each client, based on the loss values recorded across every data point it was trained with. And, on the left, we scaled each of those parameters and sum all of them, component-wise.

To visualize the difference between horizontally vs. vertically partitioned data:
<img src="D:\\Other_Files\\Research_Works\\Horizontally-and-vertically-partitioned-data.png">
[image source](https://www.researchgate.net/figure/Horizontally-and-vertically-partitioned-data-Horizontal-partitions-have-different_fig1_283708993)

'weight_scaling_factor()' function calculates the proportion of a client’s local training data with the overall training data held by all of the clients.

+ This approach is _not_ applicable in for a real world application. For a real world application, the training data will be disjointed and consequently, no single client can correctly estimate the quantity of the combined set.

+ In such a case, each client will be expected to indicate the number of data points it trained with while updating the server with new parameters after each local training step.

In [43]:
# This procedure is implemented as 3 functions:

def weight_scalling_factor(clients_trn_data, client_name):
    client_names = list(clients_trn_data.keys())
    
    # Get batch_size-
    batch_size = list(clients_trn_data[client_name])[0][0].shape[0]
    
    # Calculate the total number of training data points across all of the clients-
    global_count = sum([tf.data.experimental.cardinality(clients_trn_data[client_name]).numpy() for client_name in client_names]) * batch_size
    
    # Get the total number of data points held by a client-
    local_count = tf.data.experimental.cardinality(clients_trn_data[client_name]).numpy() * batch_size
    
    # Calculate the scaling factor as a fraction-
    return local_count / global_count
 

In [46]:
# 'scale_model_weights()' function scales each of the local model’s weights based on the value of their scaling factor
# which was calculated in the equation shown above.
def scale_model_weights(weight, scalar):
    '''
    Python function for scaling a model's weights
    '''
    weight_final = []
    steps = len(weight)
    
    for i in range(steps):
        weight_final.append(scalar * weight[i])
    
    return weight_final


# 'sum_scaled_weights()' sums all of the clients’ scaled weights together.
def sum_scaled_weights(scaled_weight_list):
    '''
    Return the sum of the listed scaled weights. The is equivalent to scaled average of the weights
    '''
    avg_grad = list()
    
    # Get the average grad accross all client gradients
    for grad_list_tuple in zip(*scaled_weight_list):
        layer_mean = tf.math.reduce_sum(grad_list_tuple, axis=0)
        avg_grad.append(layer_mean)
        
    return avg_grad


def test_model(X_test, Y_test,  model, comm_round):
    cce = tf.keras.losses.CategoricalCrossentropy(from_logits = True)
    
    #logits = model.predict(X_test, batch_size=100)
    logits = model.predict(X_test)
    
    loss = cce(Y_test, logits)
    
    acc = accuracy_score(tf.argmax(logits, axis=1), tf.argmax(Y_test, axis=1))
    
    # Remember that: 'comms_round' is the number of global epochs or, aggregations which will be executed/performed during training.
    # print('comm_round: {} | global_acc: {:.3%} | global_loss: {}'.format(comm_round, acc, loss))
    print(f"comm_round = {comm_round}, global val_accuracy = {acc:.2f}% and global val_loss = {loss:.4f}")
     
    return acc, loss


## Federated Model Training:

The training logic has two main loops, the outer loop is for the global iteration, while the inner loop is for iterating through client’s local training. There’s also an implicit third loop which accounts for the local epochs and will be taken care of by the 'epochs' argument in the 'model.fit()' method.

There are 10 clients, each running 1 local epoch on top of 100 global communication rounds.

In [49]:
# 'comms_round' is the number of global epochs or, aggregations which will be executed/performed during training-
comms_round = 20

In [50]:
# Initialize the global model-
global_model = create_lenet300()


# Begin global training loop-
for comm_round in range(comms_round):
    
    # Get the global model's weights - this will act as the initial weights for all local models
    global_weights = global_model.get_weights()
    
    # Initial list to collect local model weights after scaling-
    scaled_local_weight_list = list()

    # Randomize client data - using keys
    # Shuffle the clients dictionary order to ensure randomness
    client_names = list(clients_batched.keys())
    random.shuffle(client_names)
    
    # Iterate through client training.
    # Loop through each client and create a new local corresponding model-
    for client in client_names:
        # smlp_local = SimpleMLP()
        # local_model = smlp_local.build(784, 10)
        local_model = create_lenet300()
        '''
        local_model.compile(loss=loss, 
                      optimizer=optimizer, 
                      metrics=metrics)
        '''
        
        # Set local model weights to the weights of the global model-
        local_model.set_weights(global_weights)
        
        # Train local model with client's local data
        # local model (client) is then trained for one epoch
        local_model.fit(clients_batched[client], epochs = 1, verbose = 0)
        
        # After training, the new weights are scaled and appended to the 'scaled_local_weight_list'
        # Scale the model weights and add to list-
        scaling_factor = weight_scalling_factor(clients_batched, client)
        scaled_weights = scale_model_weights(local_model.get_weights(), scaling_factor)
        scaled_local_weight_list.append(scaled_weights)
        
        # Clear session to free memory after each communication round-
        K.clear_session()
    
    
    # Sum up all of the scaled local trained weights (by components) and update the global model to this new aggregate.
    # To compute the average for all of the local model, we simply take the sum of the scaled weights-
    average_weights = sum_scaled_weights(scaled_local_weight_list)
    
    # Update global model 
    global_model.set_weights(average_weights)

    # Test global model and print out metrics after each communication round-
    for(X_test, Y_test) in test_batched:
        global_acc, global_loss = test_model(X_test, Y_test, global_model, comm_round)


comm_round = 0, global val_accuracy = 0.88% and global val_loss = 1.6677
comm_round = 1, global val_accuracy = 0.90% and global val_loss = 1.6172
comm_round = 2, global val_accuracy = 0.91% and global val_loss = 1.5951
comm_round = 3, global val_accuracy = 0.93% and global val_loss = 1.5781
comm_round = 4, global val_accuracy = 0.94% and global val_loss = 1.5657
comm_round = 5, global val_accuracy = 0.94% and global val_loss = 1.5560
comm_round = 6, global val_accuracy = 0.95% and global val_loss = 1.5483
comm_round = 7, global val_accuracy = 0.95% and global val_loss = 1.5434
comm_round = 8, global val_accuracy = 0.95% and global val_loss = 1.5399
comm_round = 9, global val_accuracy = 0.96% and global val_loss = 1.5336
comm_round = 10, global val_accuracy = 0.96% and global val_loss = 1.5304
comm_round = 11, global val_accuracy = 0.96% and global val_loss = 1.5273
comm_round = 12, global val_accuracy = 0.96% and global val_loss = 1.5244
comm_round = 13, global val_accuracy = 0.96% and

## SGD Vs Federated Averaging:

The FL model results are appreciable at 96.5% validation accuracy after 20 communication rounds. But how does this compare to a standard SGD model trained on the same data set?

To find out, we will train a single 3-layer MLP model (rather 10 as we did in FL) on the combined training data. Remember the combined data was our training data prior to partitioning.

To ensure an equal playing ground, every hyper-parameter is  retained which were used for the FL training except the batch size. Rather than using 32 , our SGD’s batch size will be 320. With this setting, we are sure that the SGD model would see the exact  same number of training samples per epoch as the global model did per communication round in FL.

In [54]:
SGD_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(y_train)).batch(320)

model_SGD = create_lenet300()

# Train the model on training data using SGD=
history = model_SGD.fit(SGD_dataset, epochs = 20, verbose = 0)

# Test the SGD global model on validation data-
for(X_test, Y_test) in test_batched:
        SGD_acc, SGD_loss = test_model(X_test, Y_test, model_SGD, 1)


comm_round = 1, global val_accuracy = 0.96% and global val_loss = 1.5139


### Independent and identically distributed random variables:

In probability theory and statistics, a collection of random variables is independent and identically distributed if:

- each random variable has the same probability distribution as the others (and)

- all are mutually independent

This property is usually abbreviated as i.i.d. or iid or IID. This property is usually abbreviated as i.i.d. or iid or IID. Herein, i.i.d. is used, because it is the most prevalent.

[source](https://en.wikipedia.org/wiki/Independent_and_identically_distributed_random_variables)

### Note:

If FL neural network outperforms SGD based training, keep in mind that these kind of results are not likely in real world scenario. __Real world federated data held by clients are mostly NON independent and identically distributed (IID)__.

For example, we could have replicated this scenario by constructing our client shards defined above such that each shard comprises of images from a single class — e.g client_1 having only images of digit 1, client_2 having only images of digit 2 and so on. This arrangement would have lead to a significant reduction in the performance of the FL model. 


In [55]:
def non_iid_x(image_list, label_list, x = 1, num_intraclass_clients = 10):
        '''
        Python function to shard any classification data in a non-IID manner.
        This creates 'x' non_IID clients
        
        args: 
            image_list: python list of images or data points
            label_list: python list of labels
            x: none IID severity, 1 means each client will only have one class of data
            num_intraclass_client: number of sub-client to be created from each none IID class,
            e.g for x=1, we could create 10 further clients by splitting each class into 10
            
        return - dictionary 
            keys - clients's name, 
            value - client's non iid 1 data shard (as tuple list of images and labels)
        '''
        
        non_iid_x_clients = dict()
        
        # create unique label list and shuffle
        unique_labels = np.unique(np.array(label_list))
        random.shuffle(unique_labels)
        
        # create sub label lists based on x
        sub_lab_list = [unique_labels[i:i + x] for i in range(0, len(unique_labels), x)]
            
        for item in sub_lab_list:
            class_data = [(image, label) for (image, label) in zip(image_list, label_list) if label in item]
            
            # decouple tuple list into seperate image and label lists
            images, labels = zip(*class_data)
            
            # create formated client initials
            initial = ''
            for lab in item:
                initial = initial + lab + '_'
            
            # create num_intraclass_clients clients from the class 
            intraclass_clients = create_clients(list(images), list(labels), num_intraclass_clients, initial)
            
            # append intraclass clients to main clients'dict
            non_iid_x_clients.update(intraclass_clients)
        
        return non_iid_x_clients

### Research papers:


+ Federated Learning with Non-IID Data, Yue Zhao et al, arXiv: 1806.00582v1

+ Communication-Efficient Learning of Deep Networks from Decentralized Data, H. Brendan McMahan et al, arXiv:1602.05629v3