<a href="https://colab.research.google.com/github/gibsonx/CE888/blob/master/Federated_Minst.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [30]:
#Modified by Gibson Xue on July 25, 2021
!pip uninstall --yes tensorflow tensorboard tb-nightly
!pip install --quiet tensorflow==2.5.0
!pip install --quiet tensorflow-federated==0.19.0
!pip install --quiet nest-asyncio
!pip install --quiet tensorboard  # or tensorboard, but not both
import tensorflow as tf

import nest_asyncio
nest_asyncio.apply()
print(tf.__version__)

Uninstalling tensorflow-2.5.0:
  Successfully uninstalled tensorflow-2.5.0
Uninstalling tensorboard-2.5.0:
  Successfully uninstalled tensorboard-2.5.0
2.5.0


In [37]:
import collections
import functools
import os
import time
from tensorflow.keras.datasets import mnist
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
import tensorflow_datasets as tfds
import pandas as pd
np.random.seed(0)

from tensorflow import reshape, nest, config
from tensorflow.keras import losses, metrics, optimizers
import tensorflow_federated as tff
from matplotlib import pyplot as plt
from pathlib import Path

# Test the TFF is working:
tff.federated_computation(lambda: 'Hello, World!')()

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D, Flatten, Dense


def create_keras_model():
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
        MaxPool2D(pool_size=(2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPool2D(pool_size=(2, 2)),
        Flatten(),
        Dense(512, activation='relu'),
        Dense(10, activation='softmax')
    ])
    return model

In [38]:
this_dir = Path.cwd()
model_dir = this_dir / "saved_models" / experiment_name / method
output_dir = this_dir / "results" / experiment_name / method

if not model_dir.exists():
    model_dir.mkdir(parents=True)

if not output_dir.exists():
    output_dir.mkdir(parents=True)

In [39]:
experiment_name = "mnist"
method = "tff_training"
client_lr = 1e-2
server_lr = 1e-2
split = 4
NUM_ROUNDS = 5
NUM_EPOCHS = 5
BATCH_SIZE = 20
PREFETCH_BUFFER = 10
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.astype(np.float32)[:300]
y_train = y_train.astype(np.int32)[:300]
x_test = x_test.astype(np.float32).reshape(10000, 28, 28, 1)[:100]
y_test = y_test.astype(np.int32).reshape(10000, 1)[:100]


In [40]:
total_image_count = len(x_train)
image_per_set = int(np.floor(total_image_count/split))

client_train_dataset = collections.OrderedDict()
for i in range(1, split+1):
    client_name = "client_" + str(i)
    start = image_per_set * (i-1)
    end = image_per_set * i

    print(f"Adding data from {start} to {end} for client : {client_name}")
    data = collections.OrderedDict((('label', y_train[start:end]), ('pixels', x_train[start:end])))
    client_train_dataset[client_name] = data

train_dataset = tff.simulation.datasets.TestClientData(client_train_dataset)

sample_dataset = train_dataset.create_tf_dataset_for_client(train_dataset.client_ids[0])
sample_element = next(iter(sample_dataset))

SHUFFLE_BUFFER = image_per_set


def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""

    return collections.OrderedDict(
        x=reshape(element['pixels'], [-1, 28, 28, 1]),
        y=reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

preprocessed_sample_dataset = preprocess(sample_dataset)
sample_batch = nest.map_structure(lambda x: x.numpy(), next(iter(preprocessed_sample_dataset)))

def make_federated_data(client_data, client_ids):
    return [preprocess(client_data.create_tf_dataset_for_client(x)) for x in client_ids]

federated_train_data = make_federated_data(train_dataset, train_dataset.client_ids)
print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))
federated_train_data

def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.

  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_sample_dataset.element_spec,
      loss=losses.SparseCategoricalCrossentropy(),
      metrics=[metrics.SparseCategoricalAccuracy()])


iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: optimizers.Adam(learning_rate=client_lr),
    server_optimizer_fn=lambda: optimizers.SGD(learning_rate=server_lr))

print(str(iterative_process.initialize.type_signature))

Adding data from 0 to 75 for client : client_1
Adding data from 75 to 150 for client : client_2
Adding data from 150 to 225 for client : client_3
Adding data from 225 to 300 for client : client_4
Number of client datasets: 4
First dataset: <PrefetchDataset shapes: OrderedDict([(x, (None, 28, 28, 1)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>
( -> <model=<trainable=<float32[3,3,1,32],float32[32],float32[3,3,32,64],float32[64],float32[1600,512],float32[512],float32[512,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER)


In [41]:
state = iterative_process.initialize()

tff_train_acc = []
tff_val_acc = []
tff_train_loss = []
tff_val_loss = []

eval_model = None

In [42]:
# state.model.assign_weights_to(eval_model)
state = iterative_process.initialize()
# eval_model = create_keras_model()
# eval_model.compile(optimizer=optimizers.Adam(learning_rate=client_lr),
#                     loss=losses.SparseCategoricalCrossentropy(),
#                     metrics=[metrics.SparseCategoricalAccuracy()])

eval_model = None
for round_num in range(1, NUM_ROUNDS+1):
    state, tff_metrics = iterative_process.next(state, federated_train_data)
    eval_model = create_keras_model()
    eval_model.compile(optimizer=optimizers.Adam(learning_rate=client_lr),
                       loss=losses.SparseCategoricalCrossentropy(),
                       metrics=[metrics.SparseCategoricalAccuracy()])

    state.model.assign_weights_to(eval_model)

    ev_result = eval_model.evaluate(x_test, y_test, verbose=0)
    print('round {:2d}, metrics={}'.format(round_num, tff_metrics))
    print(f"Eval loss : {ev_result[0]} and Eval accuracy : {ev_result[1]}")


if eval_model:
    eval_model.save(model_dir / (experiment_name + ".h5"))
else:
    print("training didn't started")
    exit()                    

round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.37333333), ('loss', 4.3569903)])), ('stat', OrderedDict([('num_examples', 1500)]))])
Eval loss : 19.59590721130371 and Eval accuracy : 0.23000000417232513
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.23133333), ('loss', 4.6788416)])), ('stat', OrderedDict([('num_examples', 1500)]))])
Eval loss : 18.011337280273438 and Eval accuracy : 0.3100000023841858
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.36066666), ('loss', 3.9689727)])), ('stat', OrderedDict([('num_examples', 1500)]))])
Eval loss : 9.354846954345703 and Eval accuracy : 0.4199999868869