# **Communication-Efficient Learning of Deep Networks from Decentralized Data**

McMahan, B., Moore, E., Ramage, D., Hampson, S., & y Arcas, B. A. (2017, April). Communication-efficient learning of deep networks from decentralized data. In Artificial Intelligence and Statistics (pp. 1273-1282). PMLR.

Ref: *https://www.tensorflow.org/federated/tutorials/federated_learning_for_image_classification*

The implementation environment is as follows.

* ML Framework

    * Python 3.6.9
    * Tensorflow 2.4.1
    * Tensorflow-Federated 0.18.0

* Hardware

    * GPU: NVIDIA Tesla V100 (16G)
    * RAM: 25G

* Dataset & Model

    * MNIST (#: 60,000)
    * Base 2D-CNN (introduced in the paper)

* Parameter

    * C = 0.1
    * B = 10
    * E = 1

where C, B, E mean as:

* C: the fraction of clients that performs on each round
    
    (한 번의 가중치 업데이트에 참여하는 클라이언트 비율)

* E: the number of training passes each client makes over its local dataset on each round

    (하나의 클라이언트가 한 번의 라운드에서 수행하는 연산(배치 iter)의 횟수)

* B: the local minibatch size used for the client updates 

    (하나의 클라이언트에 할당되는 미니배치 크기)

Therefore, the content executed in this code will show a result similar to the red solid line in the upper left and upper right graphs in Figure 2 of the paper.

The results are commited on Tensorboard.dev.

*https://tensorboard.dev/experiment/BmDncAGATNmukm7VdNxR9w/*


In order to present convincing experimental results, the client's choice is important. Depending on whether `tf.data.Dataset.shard()` or `tf.data.Dataset.take()` is used, and depending on the selection of `client_dataset_list[i::num_clients_per_round]` or `client_dataset_list[i*num_clients_per_round:(i+1)*num_clients_per_round]`, the experimental results will be greatly biased. Is expected. However, we did not proceed with this, as it is far beyond the scope of this code, and it will be left as a future research project.

## **Default Setting**

In [None]:
!pip install --quiet --upgrade tensorflow_federated
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

In [2]:
import tensorflow as tf
import tensorflow_federated as tff

print(f" tf.__version__: {tf.__version__}")
print(f"tff.__version__: {tff.__version__}")

import collections
import sklearn

import numpy as np

np.random.seed(0)

%load_ext tensorboard

 tf.__version__: 2.4.1
tff.__version__: 0.18.0


In [142]:
!nvidia-smi

Wed Feb 10 06:20:15 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.39       Driver Version: 418.67       CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   40C    P0    33W / 250W |   8767MiB / 16280MiB |      3%      Default |
|                               |                      |                 ERR! |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

## **Load MNIST Datasets**

In [3]:
## Load MNIST datasets.
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

## Print the shapes.
print(f"train_images.shape: {train_images.shape}")
print(f"train_labels.shape: {train_labels.shape}")
print(f"test_images.shape: {test_images.shape}")
print(f"test_labels.shape: {test_labels.shape}")

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
train_images.shape: (60000, 28, 28)
train_labels.shape: (60000,)
test_images.shape: (10000, 28, 28)
test_labels.shape: (10000,)


In [134]:
SHUFFLE_BUFFER = 100_000
BATCH_SIZE = 10
PREFETCH_BUFFER = 100

def preprocess_mnist_cnn(dataset, iid):
    assert iid in [True, False]

    def batch_rescaling_fn(pixels, label):
        return (
            tf.image.convert_image_dtype(pixels, tf.dtypes.float32), 
            tf.cast(label, tf.int32))

    def batch_format_fn(pixels, label):
        """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
        return collections.OrderedDict(
            x = tf.reshape(pixels, [-1, 28, 28, 1]),
            y = tf.reshape(label, [-1, 1]))

    if iid:
        dataset = dataset.shuffle(SHUFFLE_BUFFER)

    return dataset.batch(BATCH_SIZE).map(batch_rescaling_fn).map(
        batch_format_fn).prefetch(PREFETCH_BUFFER)

### **IID MNIST**

In [135]:
## IID, where the data is shuffled, and then partitioned 
## into 100 clients each receiving 600 examples.
iid_tr_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
iid_tr_ds = preprocess_mnist_cnn(iid_tr_ds, iid = True)

## Shard it.
num_clients = 100
federated_iid_tr_ds = [iid_tr_ds.shard(num_clients, i) for i in range(num_clients)]

print(f"Number of client datasets: {len(federated_iid_tr_ds)}")
print(f"First dataset: {federated_iid_tr_ds[0]}")

Number of client datasets: 100
First dataset: <ShardDataset shapes: OrderedDict([(x, (None, 28, 28, 1)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>


In [136]:
## Each clients have 60 mini-batches (each mini-batches have 10 items).
len(list(federated_iid_tr_ds[0].as_numpy_iterator()))

60

### **Non-IID MNIST**

In [78]:
np.reshape(np.arange(200), (100, 2))[0]

array([0, 1])

In [126]:
## Non-IID, where we first sort the data by digital label, 
## divide it into 200 shards of size 300, and assign each of 100 clients 2 shards.
idx = np.argsort(train_labels)

non_iid_tr_ds = tf.data.Dataset.from_tensor_slices((train_images[idx], train_labels[idx]))
non_iid_tr_ds = preprocess_mnist_cnn(non_iid_tr_ds, iid = False)

## Shard it.
num_shards = 200 
num_shard_size = 300 // BATCH_SIZE ## 15

splits = [non_iid_tr_ds.skip(num_shard_size * i).take(num_shard_size) for i in range(num_shards)]
federated_non_iid_tr_ds = [splits[i].concatenate(splits[i + 100]) for i in range(num_clients)]

## Do we need to reshuffle the data set?
# federated_non_iid_tr_ds = [ds.unbatch().shuffle(SHUFFLE_BUFFER).batch(BATCH_SIZE) for ds in federated_non_iid_tr_ds]

print(f"Number of client datasets: {len(federated_non_iid_tr_ds)}")
print(f"First dataset: {federated_non_iid_tr_ds[0]}")

Number of client datasets: 100
First dataset: <ConcatenateDataset shapes: OrderedDict([(x, (None, 28, 28, 1)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>


In [127]:
for element in federated_non_iid_tr_ds[0].as_numpy_iterator():
    print(np.reshape(element["y"], (-1,)))

[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4]
[4 4 4 4 4

In [98]:
## Each clients have 60 mini-batches (each mini-batches have 10 items).
len(list(federated_non_iid_tr_ds[0].as_numpy_iterator()))

60

## **Build Model based on tf.keras**

In [9]:
def create_keras_model():
    """Make MNIST-CNN model with 1,663,370 params."""
    return tf.keras.models.Sequential([
        tf.keras.layers.Input(shape = [28, 28, 1], dtype = tf.dtypes.float32),
        tf.keras.layers.Conv2D(32, 5, padding = "same", activation = "relu"),
        tf.keras.layers.MaxPool2D(2, padding = "same"),
        tf.keras.layers.Conv2D(64, 5, padding = "same", activation = "relu"),
        tf.keras.layers.MaxPool2D(2, padding = "same"),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(512, activation = "relu"),
        tf.keras.layers.Dense(10),
        tf.keras.layers.Activation(tf.nn.softmax)], name = "MNIST_CNN")

In [10]:
tmp = create_keras_model()
tmp.summary()

Model: "MNIST_CNN"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_2 (Conv2D)            (None, 28, 28, 32)        832       
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 14, 14, 32)        0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 14, 14, 64)        51264     
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 7, 7, 64)          0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 3136)              0         
_________________________________________________________________
dense_2 (Dense)              (None, 512)               1606144   
_________________________________________________________________
dense_3 (Dense)              (None, 10)                51

In [11]:
del tmp

In [12]:
def model_fn():
    # We must create a new model here, and not capture it from an external
    # scope. TFF will call this within different graph contexts.
    keras_model = create_keras_model()
    return tff.learning.from_keras_model(
        keras_model,
        input_spec = federated_iid_tr_ds[0].element_spec,
        loss = tf.keras.losses.SparseCategoricalCrossentropy(),
        metrics = [tf.keras.metrics.SparseCategoricalAccuracy()])

## **Training the Model on Federated Data**

In [137]:
## The client_optimizer is only used to compute local model updates on each client. 
## The server_optimizer applies the averaged update to the global model at the server.
tmp_iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn = lambda: tf.keras.optimizers.Adam(learning_rate = 2e-4),
    server_optimizer_fn = lambda: tf.keras.optimizers.Adam(learning_rate = 1e-3))

In [138]:
str(tmp_iterative_process.initialize.type_signature)

'( -> <model=<trainable=<float32[5,5,1,32],float32[32],float32[5,5,32,64],float32[64],float32[3136,512],float32[512],float32[512,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64,float32[5,5,1,32],float32[32],float32[5,5,32,64],float32[64],float32[3136,512],float32[512],float32[512,10],float32[10],float32[5,5,1,32],float32[32],float32[5,5,32,64],float32[64],float32[3136,512],float32[512],float32[512,10],float32[10]>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER)'

In [139]:
del tmp_iterative_process

### **IID**

In [133]:
# !rm -rf logs/fit/iid

In [140]:
## New model and optimizers.
iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn = lambda: tf.keras.optimizers.Adam(learning_rate = 2e-4),
    server_optimizer_fn = lambda: tf.keras.optimizers.Adam(learning_rate = 1e-3))

logdir = "logs/fit/iid"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

In [141]:
%%time
NUM_ROUNDS = 1_000
NUM_FRACTION = 10 ## i.e. C=0.1

with summary_writer.as_default():
    for round_num in range(NUM_ROUNDS):
        state, metrics = iterative_process.next(
            state, federated_iid_tr_ds[round_num % NUM_FRACTION::NUM_FRACTION])

        ## Record it.
        for name, value in metrics["train"].items():
            tf.summary.scalar(name, value, step = round_num)

        ## Print the metrics every 50 rounds.
        if not (round_num % 50):
            print(f"round {round_num:2d}, metrics = {metrics}")

round  0, metrics = OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.47783333), ('loss', 1.7420615)])), ('stat', OrderedDict([('num_examples', 6000)]))])
round 50, metrics = OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.97283334), ('loss', 0.08015102)])), ('stat', OrderedDict([('num_examples', 6000)]))])
round 100, metrics = OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.989), ('loss', 0.040546186)])), ('stat', OrderedDict([('num_examples', 6000)]))])
round 150, metrics = OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.99233335), ('loss

### **Non-IID**

In [130]:
# !rm -rf logs/fit/non-iid

In [131]:
## New model and optimizers.
iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn = lambda: tf.keras.optimizers.Adam(learning_rate = 2e-4),
    server_optimizer_fn = lambda: tf.keras.optimizers.Adam(learning_rate = 1e-3))

logdir = "logs/fit/non-iid"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

In [132]:
%%time
NUM_ROUNDS = 1_000
NUM_FRACTION = 10 ## i.e. C=0.1

with summary_writer.as_default():
    for round_num in range(NUM_ROUNDS):
        state, metrics = iterative_process.next(
            state, federated_non_iid_tr_ds[round_num % NUM_FRACTION::NUM_FRACTION])

        ## Record it.
        for name, value in metrics["train"].items():
            tf.summary.scalar(name, value, step = round_num)

        ## Print the metrics every 50 rounds.
        if not (round_num % 50):
            print(f"round {round_num:2d}, metrics = {metrics}")

ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<ReferenceResolvingExecutor._evaluate() running at /usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/reference_resolving_executor.py:513> wait_for=<_GatheringFuture finished exception=KeyboardInterrupt()> cb=[Task._wakeup()]>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<ReferenceResolvingExecutor._evaluate() running at /usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/reference_resolving_executor.py:513> wait_for=<_GatheringFuture pending cb=[Task._wakeup()]> cb=[Task._wakeup()]>


round  0, metrics = OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.716), ('loss', 2.1189778)])), ('stat', OrderedDict([('num_examples', 6000)]))])
round 50, metrics = OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.9033333), ('loss', 0.42257154)])), ('stat', OrderedDict([('num_examples', 6000)]))])
round 100, metrics = OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.95033336), ('loss', 0.21872847)])), ('stat', OrderedDict([('num_examples', 6000)]))])
round 150, metrics = OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.96466666), ('loss',

## **Displaying Model Metrics in TensorBoard**

In [None]:
# %tensorboard --logdir logs/fit

In [144]:
!tensorboard dev upload --logdir ./logs \
    --name "Simple experiment with MNIST" \
    --description "Training results from the paper 'Communication-Efficient Learning of Deep Networks from Decentralized Data'" \
    --one_shot


***** TensorBoard Uploader *****

This will upload your TensorBoard logs to https://tensorboard.dev/ from
the following directory:

./logs

This TensorBoard will be visible to everyone. Do not upload sensitive
data.

Your use of this service is subject to Google's Terms of Service
<https://policies.google.com/terms> and Privacy Policy
<https://policies.google.com/privacy>, and TensorBoard.dev's Terms of Service
<https://tensorboard.dev/policy/terms/>.

This notice will not be shown again while you are logged into the uploader.
To log out, run `tensorboard dev auth revoke`.

Continue? (yes/NO) yes

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=373649185512-8v619h5kft38l4456nm2dj4ubeqsrvh6.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email&state=FlBjGuvpuDcY5GuZLB3Pr3sp9SoS1E&prompt=consent&access_type=offline
Enter 