## Federated learning using Keras and openfl

**Status:** PUBLIC Distribution <br>
**File Name:** 02_Federated_learning_MNIST.ipynb

**Author:** Jaume Manero  <br> 
**Date created:** 2023/08/23<br>
**Last modified:** 2023/08/23<br>
**Description:** A simple federated learning program

see package requirements at the end of notebook <br>
Based on https://github.com/securefederatedai/openfl

In [2]:
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import backend as K
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D, Flatten, Dense
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import mnist

import openfl.native as fx
from openfl.federated import FederatedModel,FederatedDataSet

2023-08-26 19:49:56.402275: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-08-26 19:49:56.402291: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [3]:
def test_intel_tensorflow():
    """
    Check if Intel version of TensorFlow is installed
    """
    import tensorflow as tf

    print("We are using Tensorflow version {}".format(tf.__version__))

    major_version = int(tf.__version__.split(".")[0])
    if major_version >= 2:
        from tensorflow.python.util import _pywrap_util_port
        print("Intel-optimizations (DNNL) enabled:",
              _pywrap_util_port.IsMklEnabled())
    else:
        print("Intel-optimizations (DNNL) enabled:")

test_intel_tensorflow()

We are using Tensorflow version 2.8.4
Intel-optimizations (DNNL) enabled: False


After importing the required packages, the next step is setting up our openfl workspace. To do this, simply run the `fx.init()` command as follows:

In [4]:
#Setup default workspace, logging, etc.
fx.init('keras_cnn_mnist')

Creating Workspace Directories
Creating Workspace Templates


Successfully installed packages from /home/manero/.local/workspace/requirements.txt.

New workspace directory structure:
workspace
├── plan
│   ├── cols.yaml
│   ├── defaults
│   ├── plan.yaml
│   └── data.yaml
├── logs
│   └── tensorboard
│       ├── events.out.tfevents.1693066901.manero-ThinkPad-P52s
│       ├── events.out.tfevents.1693070529.manero-ThinkPad-P52s
│       ├── events.out.tfevents.1693066958.manero-ThinkPad-P52s
│       ├── events.out.tfevents.1693067951.manero-ThinkPad-P52s
│       ├── events.out.tfevents.1693066680.manero-ThinkPad-P52s
│       ├── events.out.tfevents.1693070397.manero-ThinkPad-P52s
│       ├── events.out.tfevents.1693071816.manero-ThinkPad-P52s
│       ├── events.out.tfevents.1693070678.manero-ThinkPad-P52s
│       └── events.out.tfevents.1693069175.manero-ThinkPad-P52s
├── agg_to_col_two_signed_cert.zip
├── requirements.txt
├── cert
├── agg_to_col_one_signed_cert.zip
├── src
│   ├── keras_cnn.py
│   ├── mnist_utils.py
│   ├── __init__.py
│   └── tfmn

Now we are ready to define our dataset and model to perform federated learning on. The dataset should be composed of a numpy array <br>
We start with a simple fully connected model that is trained on the MNIST dataset. 

In [16]:
#Import and process training, validation, and test images/labels

# Set the ratio of validation imgs, can't be 0.0

VALID_PERCENT = 0.3

(X_train, y_train), (X_test, y_test) = mnist.load_data()
split_on = int((1 - VALID_PERCENT) * len(X_train))
print(split_on, len(X_train))

train_images = X_train[0:split_on,:,:]
train_labels = to_categorical(y_train)[0:split_on,:]

valid_images = X_train[split_on:,:,:]
valid_labels = to_categorical(y_train)[split_on:,:]

test_images = X_test
test_labels = to_categorical(y_test)

def preprocess(images):
    #Normalize
    images = (images / 255) - 0.5
    #Flatten
    images = images.reshape((-1, 784))
    return images

# Preprocess the images.
train_images = preprocess(train_images)
valid_images = preprocess(valid_images)
test_images  = preprocess(test_images)

feature_shape = train_images.shape[1]
classes = 10

fl_data = FederatedDataSet(train_images,train_labels,valid_images,valid_labels,batch_size=32,num_classes=classes)

def build_model(feature_shape,classes):
    #Defines the MNIST model
    model = Sequential()
    model.add(Dense(64, input_shape=feature_shape, activation='relu'))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(classes, activation='softmax'))
    
    model.compile(optimizer='adam',loss='categorical_crossentropy',metrics=['accuracy'])
    return model

42000 60000


In [17]:
#Create a federated model using the build model function and dataset
fl_model = FederatedModel(build_model,data_loader=fl_data)

The `FederatedModel` object is a wrapper around your Keras, Tensorflow or PyTorch model that makes it compatible with openfl. It provides built in federated training and validation functions that we will see used below. Using it's `setup` function, collaborator models and datasets can be automatically defined for the experiment. 

In [18]:
collaborator_models = fl_model.setup(num_collaborators=4)
collaborators = {'one':collaborator_models[0],
                 'two':collaborator_models[1],
                'three':collaborator_models[2],
                'four' :collaborator_models[3],}

In [19]:
#Original MNIST dataset
print(f'Original training data size: {len(train_images)}')
print(f'Original validation data size: {len(valid_images)}\n')

#Collaborator one's data
print(f'Collaborator one\'s training data size: {len(collaborator_models[0].data_loader.X_train)}')
print(f'Collaborator one\'s validation data size: {len(collaborator_models[0].data_loader.X_valid)}\n')

#Collaborator two's data
print(f'Collaborator two\'s training data size: {len(collaborator_models[1].data_loader.X_train)}')
print(f'Collaborator two\'s validation data size: {len(collaborator_models[1].data_loader.X_valid)}\n')

#Collaborator three's data
print(f'Collaborator three\'s training data size: {len(collaborator_models[2].data_loader.X_train)}')
print(f'Collaborator three\'s validation data size: {len(collaborator_models[2].data_loader.X_valid)}')

#Collaborator four data
print(f'Collaborator four\'s training data size: {len(collaborator_models[3].data_loader.X_train)}')
print(f'Collaborator four\'s validation data size: {len(collaborator_models[3].data_loader.X_valid)}')

Original training data size: 42000
Original validation data size: 18000

Collaborator one's training data size: 10500
Collaborator one's validation data size: 4500

Collaborator two's training data size: 10500
Collaborator two's validation data size: 4500

Collaborator three's training data size: 10500
Collaborator three's validation data size: 4500
Collaborator four's training data size: 10500
Collaborator four's validation data size: 4500


We can see the current plan values by running the `fx.get_plan()` function

In [22]:
#Get the current values of the plan. Each of these can be overridden
print(fx.get_plan())

{
    "aggregator.settings.best_state_path": "save/keras_cnn_mnist_best.pbuf",
    "aggregator.settings.db_store_rounds": 2,
    "aggregator.settings.init_state_path": "save/keras_cnn_mnist_init.pbuf",
    "aggregator.settings.last_state_path": "save/keras_cnn_mnist_last.pbuf",
    "aggregator.settings.rounds_to_train": 10,
    "aggregator.settings.write_logs": true,
    "aggregator.template": "openfl.component.Aggregator",
    "assigner.settings.task_groups.0.name": "train_and_validate",
    "assigner.settings.task_groups.0.percentage": 1.0,
    "assigner.settings.task_groups.0.tasks.0": "aggregated_model_validation",
    "assigner.settings.task_groups.0.tasks.1": "train",
    "assigner.settings.task_groups.0.tasks.2": "locally_tuned_model_validation",
    "assigner.template": "openfl.component.RandomGroupedAssigner",
    "collaborator.settings.db_store_rounds": 1,
    "collaborator.settings.delta_updates": false,
    "collaborator.settings.opt_treatment": "RESET",
    "collaborator.t

Now we are ready to run our experiment. If we want to pass in custom plan settings, we can easily do that with the `override_config` parameter

In [29]:
#Run experiment, return trained FederatedModel
final_fl_model = fx.run_experiment(collaborators,override_config={'aggregator.settings.rounds_to_train':1,
                                              'tasks.aggregated_model_validation.kwargs.batch_size':16})



In [None]:
#Save final model and load into keras
final_fl_model.save_native('final_model')
model = tf.keras.models.load_model('./final_model')

In [None]:
#Test the final model on our test set
score= model.evaluate(test_images,test_labels)
print('Test loss:', score[0])
print('Test accuracy:', score[1])