# Multiprocessng Federated MNIST torch Tutorial

## !!WARNING!! 
Multiprocessing is not stable in Jupyter Notebook. Some processes can live in the memory when the main already finished.
We recomended to restart the kernel after every experiment.

To run federation in multiprocessing way you need: 
1. define functions `build_model` thats return keras model in **separate cell**. 
2. save jupyter noutbook cell as file use `%%writefile <filename>` in the top of the cell. 
3. import build_model in the next cell from import Net, cross_entropy, get_optimizer
    `from <filename> import build_model`

In [1]:
% % writefile model_keras.py

import tensorflow as tf
import tensorflow.keras as ke

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D, Flatten, Dense


def build_model(input_shape,
                num_classes,
                conv_kernel_size=(4, 4),
                conv_strides=(2, 2),
                conv1_channels_out=16,
                conv2_channels_out=32,
                final_dense_inputsize=100,
                **kwargs):
    """
    Define the model architecture.

    Args:
        input_shape (numpy.ndarray): The shape of the data
        num_classes (int): The number of classes of the dataset

    Returns:
        tensorflow.python.keras.engine.sequential.Sequential: The model defined in Keras

    """
    config = tf.compat.v1.ConfigProto()
    config.gpu_options.allow_growth = True
    config.intra_op_parallelism_threads = 112
    config.inter_op_parallelism_threads = 1
    sess = tf.compat.v1.Session(config=config)
    model = Sequential()

    model.add(Conv2D(conv1_channels_out,
                     kernel_size=conv_kernel_size,
                     strides=conv_strides,
                     activation='relu',
                     input_shape=input_shape))

    model.add(Conv2D(conv2_channels_out,
                     kernel_size=conv_kernel_size,
                     strides=conv_strides,
                     activation='relu'))

    model.add(Flatten())

    model.add(Dense(final_dense_inputsize, activation='relu'))

    model.add(Dense(num_classes, activation='softmax'))

    model.compile(loss=ke.losses.categorical_crossentropy,
                  optimizer=ke.optimizers.Adam(),
                  metrics=['accuracy'])

    # initialize the optimizer variables
    opt_vars = model.optimizer.variables()

    for v in opt_vars:
        v.initializer.run(session=sess)

    return model

Writing model_keras.py


In [2]:
!pip install openfl tensorflow





In [4]:
# Workaround to run multiprocessing scenario in Jupyter Notebook
# Don't define those instances in this cell directily
from model_keras import build_model

import time

import numpy as np
from tensorflow.python.keras.utils.data_utils import get_file

import openfl.native as fx
from openfl.federated import FederatedModel, FederatedDataSet
from openfl.interface.cli import setup_logging


def one_hot(labels, classes):
    """
    One Hot encode a vector.

    Args:
        labels (list):  List of labels to onehot encode
        classes (int): Total number of categorical classes

    Returns:
        np.array: Matrix of one-hot encoded labels
    """
    return np.eye(classes)[labels]


setup_logging()

t = time.time()

fx.init('keras_cnn_mnist')

is_multi = True
batch_size = 32
rounds_to_train = 2
collaborators_amount = 5
classes = 10

origin_folder = 'https://storage.googleapis.com/tensorflow/tf-keras-datasets/'
path = get_file('mnist.npz',
                origin=origin_folder + 'mnist.npz',
                file_hash='731c5ac602752760c8e48fbffcf8c3b850d9dc2a2aedcf2cc48468fc17b673d1')

with np.load(path) as f:
    # get all of mnist
    X_train = f['x_train']
    y_train = f['y_train']

    X_valid = f['x_test']
    y_valid = f['y_test']

img_rows, img_cols = 28, 28
X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 1)
X_valid = X_valid.reshape(X_valid.shape[0], img_rows, img_cols, 1)
X_train = X_train.astype('float32')
X_valid = X_valid.astype('float32')
X_train /= 255
X_valid /= 255

classes = 10
y_train = one_hot(y_train, classes)
y_valid = one_hot(y_valid, classes)

feature_shape = X_train.shape[1]

fl_data = FederatedDataSet(X_train, y_train, X_valid, y_valid,
                           batch_size=32, num_classes=classes)
fl_model = FederatedModel(build_model=build_model, data_loader=fl_data)
collaborator_models = fl_model.setup(num_collaborators=collaborators_amount)
collaborators = {str(i): c for i, c in enumerate(collaborator_models)}

print(f'Original training data size: {len(X_train)}')
print(f'Original validation data size: {len(y_train)}\n')

final_fl_model = fx.run_experiment(collaborators, {
    'aggregator.settings.rounds_to_train': rounds_to_train,
}, is_multi=is_multi)
final_fl_model.save_native('final_pytorch_model.h5')

print(f'FINISH in {time.time() - t}')


Creating Workspace Directories
Creating Workspace Templates
Successfully installed packages from /home/dmitry/.local/workspace/requirements.txt.

New workspace directory structure:
workspace
├── torch_model.py
├── requirements.txt
├── final_pytorch_model
├── plan
│   ├── defaults
│   │   ├── tasks_keras.yaml
│   │   ├── data_loader.yaml
│   │   ├── tasks_torch.yaml
│   │   ├── tasks_tensorflow.yaml
│   │   ├── network.yaml
│   │   ├── assigner.yaml
│   │   ├── defaults
│   │   ├── compression_pipeline.yaml
│   │   ├── tasks_fast_estimator.yaml
│   │   ├── aggregator.yaml
│   │   ├── collaborator.yaml
│   │   └── task_runner.yaml
│   ├── cols.yaml
│   ├── data.yaml
│   └── plan.yaml
├── save
│   ├── torch_cnn_mnist_init.pbuf
│   ├── torch_cnn_mnist_best.pbuf
│   ├── keras_cnn_mnist_init.pbuf
│   ├── keras_cnn_mnist_last.pbuf
│   ├── keras_cnn_mnist_best.pbuf
│   └── torch_cnn_mnist_last.pbuf
├── .workspace
├── agg_to_col_two_signed_cert.zip
├── data
│   ├── MNIST
│   │   ├── raw
│   │  

FINISH in 21.47354507446289


In [None]:
## !!NOTICE!!  Don't forget to reload kernel after execution
< img
align = "left"
src = "reload_kernel.png" >