# Federated MXNex Landmarks Tutorial


In [1]:
# ! pip install dill

In [2]:
# Install dependencies if not already installed
# ! pip install mxnet-cu110==2.0.0a0 # for cuda 11.2 works!

## Connect to the Federation

In [3]:
# Create a federation
from openfl.interface.interactive_api.federation import Federation

# please use the same identificator that was used in signed certificate
client_id = 'api'
cert_dir = 'cert'
director_node_fqdn = 'localhost'
# 1) Run with API layer - Director mTLS 
# If the user wants to enable mTLS their must provide CA root chain, and signed key pair to the federation interface
# cert_chain = f'{cert_dir}/root_ca.crt'
# api_certificate = f'{cert_dir}/{client_id}.crt'
# api_private_key = f'{cert_dir}/{client_id}.key'

# federation = Federation(client_id=client_id, director_node_fqdn=director_node_fqdn, director_port='50051',
#                        cert_chain=cert_chain, api_cert=api_certificate, api_private_key=api_private_key)

# --------------------------------------------------------------------------------------------------------------------

# 2) Run with TLS disabled (trusted environment)
# Federation can also determine local fqdn automatically
federation = Federation(client_id=client_id, director_node_fqdn=director_node_fqdn, director_port='50051', tls=False)


In [4]:
shard_registry = federation.get_shard_registry()
shard_registry

{'env_one': {'shard_info': node_info {
    name: "env_one"
  }
  shard_description: "Landmark dataset, shard number 1 out of 2"
  n_samples: 2820
  sample_shape: "96"
  sample_shape: "96"
  target_shape: "1",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2021-11-09 10:39:35',
  'current_time': '2021-11-09 10:39:55',
  'valid_duration': seconds: 120}}

In [5]:
# First, request a dummy_shard_desc that holds information about the federated dataset 
dummy_shard_desc = federation.get_dummy_shard_descriptor(size=10)
sample, target = dummy_shard_desc[0]
f"Sample shape: {sample.shape}, target shape: {target.shape}"

'Sample shape: (96, 96), target shape: (1,)'

## Describing FL experimen

In [6]:
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment

2021-11-09 10:39:56.557957: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'libcudart.so.10.1'; dlerror: libcudart.so.10.1: cannot open shared object file: No such file or directory
2021-11-09 10:39:56.557988: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [7]:
import os
import numpy as np
import pandas as pd
from mxnet.gluon import data as gdata, loss as gloss
from mxnet import nd, autograd, gluon, init
from mxnet import ndarray as F

### Register model

In [8]:
%%writefile model_creating.py
import mxnet as mx
from mxnet.gluon import nn
from mxnet import init, nd

def create_net():
    ctx=mx.cpu(0)

    net = nn.Sequential()
    net.add(nn.Conv2D(channels=64, kernel_size=3, padding=1, activation='relu'),
            nn.BatchNorm(),
            nn.MaxPool2D(),
            nn.Conv2D(channels=128, kernel_size=3, padding=1, activation='relu'),
            nn.BatchNorm(),
            nn.MaxPool2D(),
            nn.Conv2D(channels=256, kernel_size=3, padding=1, activation='relu'),
            nn.BatchNorm(),
            nn.MaxPool2D(),
            nn.Flatten(),
            nn.Dense(64),
            nn.Activation('relu'),
            nn.Dropout(rate=0.005),
            nn.Dense(30))
    
    net.initialize(force_reinit=True, ctx=ctx, init=init.Xavier())
    net(nd.ones((1, 1, 96, 96), ctx=ctx)) # first forward pass for weight initialization
    print('Model has been returned!')
    return net

Overwriting model_creating.py


In [9]:
%run model_creating.py

In [10]:
! ls

aggregation_function_obj.pkl	model_creating.py      requirements.txt
facial-keypoints-detection	model_obj.pkl	       save
facial-keypoints-detection.zip	MXNet_landmarks.ipynb  tasks_obj.pkl
landmark_shard_descriptor.py	plan		       Untitled.ipynb
loader_obj.pkl			__pycache__


In [11]:
# ! rm -r __pycache__

In [12]:
framework_adapter = 'openfl.plugins.frameworks_adapters.mxnet_adapter.FrameworkAdapterPlugin'

model = create_net()
# model.initialize(force_reinit=True, ctx=ctx, init=init.Xavier())
# model(nd.ones((1, 1, 96, 96), ctx=ctx)) # first forward pass for weight initialization

trainer = gluon.Trainer(model.collect_params(), 'adam', {'learning_rate': 0.001})
MI = ModelInterface(model=model, optimizer=trainer,
                    framework_plugin=framework_adapter,
                    model_create_file='model_creating.py') 

Model has been returned!


[10:40:04] ../src/storage/storage.cc:199: Using Pooled (Naive) StorageManager for CPU


In [13]:
# dir(trainer)

In [16]:
# ! rm  -r plan save *.pkl

### Register dataset

In [15]:
import numpy as np
class LandmarkFedDataset(DataInterface):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @property
    def shard_descriptor(self):
        return self._shard_descriptor

    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor

    def __getitem__(self, index):
        return self.shard_descriptor[index]

    def __len__(self):
        return len(self.shard_descriptor)

    def get_train_loader(self):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        if self.kwargs['train_bs']:
            batch_size = self.kwargs['train_bs']
        else:
            batch_size = 32
        train_data, train_label = self.shard_descriptor.get_train_data()
        train_data, train_label = nd.array(train_data), nd.array(train_label)
        return gdata.DataLoader(gdata.ArrayDataset(train_data, train_label), batch_size, shuffle=True)

    def get_valid_loader(self):
        """
        Output of this method will be provided to tasks without optimizer in contract
        """
        if self.kwargs['valid_bs']:
            batch_size = self.kwargs['valid_bs']
        else:
            batch_size = 32
        
        test_data, test_label = self.shard_descriptor.get_train_data()
        test_data, test_label = nd.array(test_data), nd.array(test_label)
        return gdata.DataLoader(gdata.ArrayDataset(test_data, test_label), batch_size, shuffle=False)

    def get_train_data_size(self):
        """
        Information for aggregation
        """
        
        return self.shard_descriptor.get_train_size()

    def get_valid_data_size(self):
        """
        Information for aggregation
        """
        return self.shard_descriptor.get_test_size()

### Create Mnist federated dataset

In [16]:
from landmark_shard_descriptor import LandmarkShardDescriptor
train_bs, valid_bs = 64, 64
fed_dataset = LandmarkFedDataset(train_bs=64, valid_bs=64)
# fed_dataset.shard_descriptor = LandmarkShardDescriptor()

In [17]:
# with open('fed_dataset_obj.pkl', 'wb') as file:
#     cloudpickle.dump(fed_dataset, file)

## Define and register FL tasks

In [18]:
import time

loss_fn = gloss.L2Loss()
TI = TaskInterface()
@TI.register_fl_task(model='model', data_loader='train_dataset',
                     device='device', optimizer='optimizer')
def train(model, train_dataset, optimizer, device, loss_fn=loss_fn):
    print(model, train_dataset, optimizer, device, loss_fn)
    if device == 'cpu':
        device = mx.cpu(0)
    print('train on:', device)
    
    start = time.time()
    train_sum_l = 0
    for X, y in train_dataset:
        X, y = X.expand_dims(axis=1).as_in_context(device), y.as_in_context(device)
        with autograd.record():
            pred = model(X)
            l = loss_fn(pred, y).mean()
        l.backward()
        optimizer.step(train_bs)
        train_sum_l += l.mean().asscalar()
    train_loss = train_sum_l / len(train_dataset)
    return {'train_mse': train_loss,}


@TI.register_fl_task(model='model', data_loader='val_dataset', device='device')     
def validate(model, val_dataset, device):
    if device == 'cpu':
        device = mx.cpu(0)
    # Run a validation loop at the end of each epoch.
    test_sum_l = 0
    for X, y in val_dataset:
        X, y = X.expand_dims(axis=1).as_in_context(device), y.as_in_context(device)
        pred = model(X)
        l = loss_fn(pred, y)
        test_sum_l += l.mean().asscalar()
    test_loss = test_sum_l / len(val_dataset)
    return {'val_mse': test_loss,}

In [None]:
# train(model, fed_dataset.get_train_loader(), trainer, 'cpu')

Sequential(
  (0): Conv2D(1 -> 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), Activation(relu))
  (1): BatchNorm(axis=1, eps=1e-05, momentum=0.9, fix_gamma=False, use_global_stats=False, in_channels=64)
  (2): MaxPool2D(size=(2, 2), stride=(2, 2), padding=(0, 0), ceil_mode=False, global_pool=False, pool_type=max, layout=NCHW)
  (3): Conv2D(64 -> 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), Activation(relu))
  (4): BatchNorm(axis=1, eps=1e-05, momentum=0.9, fix_gamma=False, use_global_stats=False, in_channels=128)
  (5): MaxPool2D(size=(2, 2), stride=(2, 2), padding=(0, 0), ceil_mode=False, global_pool=False, pool_type=max, layout=NCHW)
  (6): Conv2D(128 -> 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), Activation(relu))
  (7): BatchNorm(axis=1, eps=1e-05, momentum=0.9, fix_gamma=False, use_global_stats=False, in_channels=256)
  (8): MaxPool2D(size=(2, 2), stride=(2, 2), padding=(0, 0), ceil_mode=False, global_pool=False, pool_type=max, layout=NCHW)
  (9): Fl

In [20]:
# train(model, fed_dataset.get_train_loader(), trainer, ctx)

In [21]:
# validate(model, fed_dataset.get_valid_loader(), ctx)

## Time to start a federated learning experiment

In [19]:
# create an experimnet in federation
experiment_name = 'landmark_experiment'
serializer_plugin = 'openfl.plugins.interface_serializer.common_serializer.CommonSerializer'
fl_experiment = FLExperiment(federation=federation,
                             experiment_name=experiment_name,
                             serializer_plugin=serializer_plugin)

In [20]:
# create an experimnet in federation
# experiment_name = 'landmark_experiment'
# fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name,
#                              serializer_plugin='openfl.plugins.interface_serializer.dill_serializer.DillSerializer')

In [21]:
# The following command zips the workspace and python requirements to be transfered to collaborator nodes
fl_experiment.start(model_provider=MI, 
                   task_keeper=TI,
                   data_loader=fed_dataset,
                   rounds_to_train=10,
                   opt_treatment='CONTINUE_GLOBAL')

Model has been returned!


In [23]:
# fl_experiment.stream_metrics()

In [24]:
! ls

facial-keypoints-detection	landmark_shard_descriptor.py  requirements.txt
facial-keypoints-detection.zip	MXNet_landmarks.ipynb	      Untitled.ipynb


In [25]:
# ! rm -r plan save temp_workspace *.pkl