In [2]:
# REFERENCES

# [1] https://www.tensorflow.org/federated/api_docs/python/tff/federated_computation
# [2] TensorFlow Federated Tutorial Session, Google Tech Talk, Youtube, https://www.youtube.com/watch?v=JBNas6Yd30A

# REQUIREMENTS

# Might need to first install CMake
# $ pip install CMake

# tensorflow-federated: A Python open-source framework for machine learning and other computations on decentralized data.
# $ pip install --upgrade tensorflow-federated

# nest-asyncio: A Python module that patches asyncio to allow nested use of asyncio.run and loop.run_until_complete.
# $ pip install --upgrade nest-asyncio

In [3]:
# !pip install --quiet --upgrade tensorflow-federated==0.20.0
#!pip install --quiet --upgrade nest-asyncio

# Local networking stuff. Cant run on Google Colab due to security reasons.
# Allowing the nested use of asyncio.run and Loop.run_until_complete.
import nest_asyncio
nest_asyncio.apply()


In [4]:
# TensorBoard provides the visualization and tooling needed for machine learning experimentation
# %reload_ext tensorboard

In [5]:
import collections

import numpy as np
from matplotlib import pyplot as plt
# print("here")
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)
# print("here")
# A Federated Computation (or FC) is a computation where the data stays on clients' machines [2]
# This function simulates a federated computation given any python computation [1]
#tff.federated_computation(lambda: 1+1)()
# print(tff.federated_computation(lambda: 'Hello, World!')())


# Federated Computations are made of three steps:
#       1. Federated Broadcast: Publicly broadcast Global Model to clients.
#       2. Federated Map: Privately train local parameters using private data and Global Model on the client-side.
#       3. Federated Mean: Aggregate clients' locally trained parameters.

print(tf.__version__)
print(tff.__version__)

2023-04-12 18:35:49.295132: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-04-12 18:35:49.344879: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-04-12 18:35:49.345763: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


2.12.0
0.53.0


In [6]:
# tff.backends.native.set_local_python_execution_context()
print(tff.federated_computation(lambda: 'Hello, World!')())

2023-04-12 18:35:52.792400: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2023-04-12 18:35:52.792989: I tensorflow/core/grappler/clusters/single_machine.cc:358] Starting new session
E0412 18:35:52.924944400     424 socket_utils_common_posix.cc:221] check for SO_REUSEPORT: {"created":"@1681338952.924914900","description":"Protocol not available","errno":92,"file":"external/com_github_grpc_grpc/src/core/lib/iomgr/socket_utils_common_posix.cc","file_line":199,"os_error":"Protocol not available","syscall":"getsockopt(SO_REUSEPORT)"}
E0412 18:35:52.925516700     424 socket_utils_common_posix.cc:327] setsockopt(TCP_USER_TIMEOUT) Protocol not available


b'Hello, World!'


In [7]:
# Federated data is split amongst the clients and privately used in client-side Federated Mapping
# To make things easy for demos, Tensorflow made MNIST into a federated data set by 
# keying the data by the original writer of the digits.
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

print(type(emnist_train))

<class 'tensorflow_federated.python.simulation.datasets.client_data.PreprocessClientData'>


In [8]:
# Client IDs can be accessed from the federated datasets
client_ids = emnist_train.client_ids
print(len(client_ids))
# print(client_ids)

3383


In [10]:
# Example of a single client's dataset from index 533
# Returns a t
client_dataset = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[533])

# View the dataset's specs.
# Specs will show the dataset holds labels and a single label is mapped to a pixels dataset with 28 rows and collumns.
client_dataset.element_spec

OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)),
             ('pixels',
              TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

In [13]:
NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):

  def batch_format_fn(element):
    
    return collections.OrderedDict(
        # Reshape pixels for this Digit from a 28x28 2D array, into a 1D array with 784 pixels
        # and return the (Label, [Pixels]) as an `OrderedDict`.
        x=tf.reshape(element['pixels'], [-1, 784]),
        y=tf.reshape(element['label'], [-1, 1]))
  # Return reformated, batched, shuffled OrderedDicts of each Digit in the given client dataset
  # prefetch is used to fetch a set of batches (10 in this case) in order to speed up the ML processing.
  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

In [15]:
preprocessed_example_dataset = preprocess(client_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

print(type(sample_batch), type(preprocessed_example_dataset))

<class 'collections.OrderedDict'> <class 'tensorflow.python.data.ops.prefetch_op._PrefetchDataset'>


In [16]:
def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]

In [17]:
# Sample of 10 clients
sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]
# 
federated_train_data = make_federated_data(emnist_train, sample_clients)

print(f'Number of client datasets: {len(federated_train_data)}')
print(f'Set of OrderedDicts: {federated_train_data}')

Number of client datasets: 10
Set of OrderedDicts: [<PrefetchDataset element_spec=OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int32, name=None))])>, <PrefetchDataset element_spec=OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int32, name=None))])>, <PrefetchDataset element_spec=OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int32, name=None))])>, <PrefetchDataset element_spec=OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int32, name=None))])>, <PrefetchDataset element_spec=OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int32, name=None))])>, <PrefetchDataset element_spec=OrderedDict([('x', TensorSpec(shape=(N

In [24]:
def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

In [25]:
print(training_process.initialize.type_signature.formatted_representation())

( -> <
  global_model_weights=<
    trainable=<
      float32[784,10],
      float32[10]
    >,
    non_trainable=<>
  >,
  distributor=<>,
  client_work=<>,
  aggregator=<
    value_sum_process=<>,
    weight_sum_process=<>
  >,
  finalizer=<
    int64,
    float32[784,10],
    float32[10]
  >
>@SERVER)


In [26]:
train_state = training_process.initialize()

In [27]:
# tff.learning.Model interface [https://www.tensorflow.org/federated/api_docs/python/tff/learning/Model]
# 
def model_fn():
    # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  # wrap in an instance of the tff.learning.Model interface
  return tff.learning.from_keras_model( 
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

In [28]:
# The Federated Averaging algorithm below, there are 2 optimizers: a _clientoptimizer and a _serveroptimizer.
# _clientoptimizer is only used to compute local model updates on each client.
# _serveroptimizer applies the averaged update to the global model at the server.
# We can experiment with different learning rates, but the idea is that clients can be simulated to have a lower learning rate.
training_process = tff.learning.algorithms.build_weighted_fed_avg(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

# TFF has constructed 2 federated computations and packaged them into a tff.templates.IterativeProcess [https://www.tensorflow.org/federated/api_docs/python/tff/templates/IterativeProcess]

In [29]:
print(training_process.initialize.type_signature.formatted_representation())


train_state = training_process.initialize()

( -> <
  global_model_weights=<
    trainable=<
      float32[784,10],
      float32[10]
    >,
    non_trainable=<>
  >,
  distributor=<>,
  client_work=<>,
  aggregator=<
    value_sum_process=<>,
    weight_sum_process=<>
  >,
  finalizer=<
    int64,
    float32[784,10],
    float32[10]
  >
>@SERVER)


In [30]:
# ".next()" is used to run a single round of a Federated Computation.
result = training_process.next(train_state, federated_train_data)
train_state = result.state
train_metrics = result.metrics

# Essentially, a Federated Computation is described in the context of a simulation,
# as an initial state of the total system [Server_State, Federated Data],
# And a transformation into a new state of the total system [Server_State, Training Metrics]
print('round  1, metrics={}'.format(train_metrics))

round  1, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.12345679), ('loss', 3.1193738), ('num_examples', 4860), ('num_batches', 248)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', OrderedDict([('update_non_finite', 0)]))])


In [31]:
# For our implementation, it would be a challenge to pick randomly selected datasets, to simulate real world interactions.

NUM_ROUNDS = 11
# Just running the above Federated Computation 11 times, using .next()
# As you can see the system converges to a higher level of accuracy with each round.
for round_num in range(2, NUM_ROUNDS):
  result = training_process.next(train_state, federated_train_data)
  train_state = result.state
  train_metrics = result.metrics
  print('round {:2d}, metrics={}'.format(round_num, train_metrics))

round  2, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.13518518), ('loss', 2.9834733), ('num_examples', 4860), ('num_batches', 248)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', OrderedDict([('update_non_finite', 0)]))])
round  3, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.14382716), ('loss', 2.861665), ('num_examples', 4860), ('num_batches', 248)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', OrderedDict([('update_non_finite', 0)]))])
round  4, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.17407407), ('loss', 2.7957022), ('num_examples', 4860), ('num_batches', 248)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('fin