# Basic distributed classifier with TensorFlow

The code in this notebook is copyright 2018 <a href='https://comind.org/'>coMind</a>. Licensed under the Apache License, Version 2.0; you may not use this code except in compliance with the License. You may obtain a copy of the <a href='http://www.apache.org/licenses/LICENSE-2.0'>License</a>.

Join the <a href='https://comindorg.slack.com/join/shared_invite/enQtNDMxMzc0NDA5OTEwLWIyZTg5MTg1MTM4NjhiNDM4YTU1OTI1NTgwY2NkNzZjYWY1NmI0ZjIyNWJiMTNkZmRhZDg2Nzc3YTYyNGQzM2I'>conversation</a> at Slack.

This a series of three tutorials you are in the second one: 
* [Basic Classifier](https://github.com/coMindOrg/federated-averaging-tutorials/blob/master/Basic%20Classifier.ipynb)
* [Basic Distributed Classifier](https://github.com/coMindOrg/federated-averaging-tutorials/blob/master/Basic%20Distributed%20Classifier.ipynb)
* [Basic Federated Classifier](https://github.com/coMindOrg/federated-averaging-tutorials/blob/master/Basic%20Federated%20Classifier.ipynb)

In this tutorial we will see what modifications we would have to make to the code on the <b>"Basic Classifier"</b> tutorial to be able to train it in a distributed way. Usually we would like to make the distribution among several different devices. As we will see later, this time we are going to execute everything in the same computer, hence instead of having the IP addresses of other devices we always have localhost:port. This way we simulate having several devices when really what we are doing is to launch different threads of execution in different ports of the same device.

For those who are not familiar with TensorFlow we recommend taking a look first at the <b>"Basic Classifier"</b> tutorial.

We start, importing the libraries that we will need and defining some variables.

In [1]:
# TensorFlow and tf.keras
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
tf.disable_eager_execution()
from tensorflow import keras

# Helper libraries
import os
import numpy as np
from time import time
import matplotlib.pyplot as plt

BATCH_SIZE = 32
EPOCHS = 5

Instructions for updating:
non-resource variables are not supported in the long term


Our goal with this script is to have 3 "devices" running in parallel. A __parameter server__ that is responsible for saving the global variables and 2 __workers__ who are, in short, those who perform the training operations.

Using these flags is simply a way we can pass externally to this script the first variables that we are going to initialize. These are:

- __job_name__: a string that is "ps" or "worker"
- __task_index__: an integer. Since there will be several workers and you can also have several parameter servers, we have to give them a number to distinguish them.
- __ps_hosts__: string with the IP addresses of the parameters servers
- __worker_hosts__: string with the IP addresses of the workers.

In [2]:
flags = tf.app.flags
flags.DEFINE_integer("task_index", None,
                     "Worker task index, should be >= 0. task_index=0 is "
                     "the master worker task that performs the variable "
                     "initialization ")
flags.DEFINE_string("ps_hosts", "localhost:2222",
                    "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224",
                    "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("job_name", None, "job name: worker or ps")
flags.DEFINE_string('f', '', 'kernel')


FLAGS = flags.FLAGS
FLAGS.job_name = 'worker'
FLAGS.task_index = 1
if FLAGS.job_name is None or FLAGS.job_name == "":
    raise ValueError("Must specify an explicit `job_name`")
if FLAGS.task_index is None or FLAGS.task_index == "":
    raise ValueError("Must specify an explicit `task_index`")

In principle, we are going to use this code with a parameter server and two workers. So we will launch this script 3 times. One with values __job_name = ps, task_index = 0__; another __job_name = worker, task_index = 0__ and last, __job_name = worker, task_index = 1__.

As long as we are working on the same device, we are going to disable the GPU to the 2 scripts of task_index = 0, because if the 3 of them try to access it at once, they will fill the memory.

Then, we create lists of parameters servers and workers separating the strings of IP addresses, and we count the number of workers we have.

In [3]:
if FLAGS.task_index == 0:
    print('--- GPU Disabled ---')
    os.environ['CUDA_VISIBLE_DEVICES'] = ''

#Construct the cluster and start the server
ps_spec = FLAGS.ps_hosts.split(",")
worker_spec = FLAGS.worker_hosts.split(",")
print(ps_spec)
print(worker_spec)
# Get the number of workers.
num_workers = len(worker_spec)
print('{} workers defined'.format(num_workers))

['localhost:2222']
['localhost:2223', 'localhost:2224']
2 workers defined


We define the cluster, with the TensorFlow class <a href="https://www.tensorflow.org/api_docs/python/tf/train/ClusterSpec"><b>tf.train.ClusterSpec</b></a>. As an initialization parameter, it receives a dictionary with the ps and worker keys, which index the lists of IP addresses created in the previous cell.

In [4]:
cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec})

With the cluster we have a map of the network, with all the IPs. Now let's initialize the current server by telling it, with the job_name and the task_index, which of the cluster's devices it is the device that is running the current script.

In [5]:
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)

If this script is running the parameter server, it will stay listening, waiting for the rest to write or read from it. The parameter server script will block here.

In [6]:
if FLAGS.job_name == "ps":
    print('--- Parameter Server Ready ---')
    server.join()

Everything comming from here on will only be executed by the workers.

We load the database as in the previous tutorial.

In [7]:
fashion_mnist = keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
print('Data loaded')

class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

Data loaded


We divide the database between the workers.

In [8]:
train_images = np.split(train_images, num_workers)[FLAGS.task_index]
train_labels = np.split(train_labels, num_workers)[FLAGS.task_index]
print('Local dataset size: {}'.format(train_images.shape[0]))

Local dataset size: 30000


We normalize the data.

In [9]:
train_images = train_images / 255.0
test_images = test_images / 255.0

The distributed classifier works in the following way. Each of the workers trains a batch locally, and calculates the gradient that it would need to apply to its weights to reduce the cost. Then it writes the gradient in the parameter server. Once all the workers (or if there are too many, the minimum number that we have specified) have written their gradients in the parameters server, one of them, that acts as chief, reads all the gradients, averages them and updates the shared model stored in the ps. Finally the chief sends a signal to the rest of the workers so that they can pull the latest model and keep training.


One of the workers has to be the boss therefore, and we have to let it know.

In [10]:
is_chief = (FLAGS.task_index == 0)
print(is_chief)

False


As in the previous tutorial we define here also the directory where we want to save the checkpoints, the summary etc.

In [11]:
checkpoint_dir='logs_dir/{}'.format(time())
print('Checkpoint directory: ' + checkpoint_dir)

Checkpoint directory: logs_dir/1582487205.9155247


We define the name of the current device according to the criteria set by TensorFlow.

In [12]:
worker_device = "/job:worker/task:%d" % FLAGS.task_index
print('Worker device: ' + worker_device + ' - is_chief: {}'.format(is_chief))

Worker device: /job:worker/task:1 - is_chief: False


In the next part we will make the definition of the graph and the training in a way that is practically the same as in the __"Basic Classifier"__ tutorial.

The first difference is seen in the __*with*__ that includes all the rest of the code. __tf.device__ creates a device-type object and __replica_device_setter__ takes care of the synchronization between devices to assign each operation to one of them (ps, workers, chief).

The rest of the lines are all the same as the previous tutorial. The first change comes at the moment of defining the training and the optimizer.

In [None]:
with tf.device(
      tf.train.replica_device_setter(
          worker_device=worker_device,
          cluster=cluster)):
    
    global_step = tf.train.get_or_create_global_step()

    with tf.name_scope('dataset'), tf.device('/cpu:0'):
        images_placeholder = tf.placeholder(train_images.dtype, [None, train_images.shape[1], train_images.shape[2]], 
                                            name='images_placeholder')
        labels_placeholder = tf.placeholder(train_labels.dtype, [None], name='labels_placeholder')
        batch_size = tf.placeholder(tf.int64, name='batch_size')
        shuffle_size = tf.placeholder(tf.int64, name='shuffle_size')

        dataset = tf.data.Dataset.from_tensor_slices((images_placeholder, labels_placeholder))
        dataset = dataset.shuffle(shuffle_size, reshuffle_each_iteration=True)
        dataset = dataset.repeat(EPOCHS)
        dataset = dataset.batch(batch_size)
        iterator = tf.data.Iterator.from_structure(dataset.output_types, dataset.output_shapes)
        dataset_init_op = iterator.make_initializer(dataset, name='dataset_init')
        X, y = iterator.get_next()

    flatten_layer = tf.layers.flatten(X, name='flatten')

    dense_layer = tf.layers.dense(flatten_layer, 128, activation=tf.nn.relu, name='relu')

    predictions = tf.layers.dense(dense_layer, 10, activation=tf.nn.softmax, name='softmax')

    summary_averages = tf.train.ExponentialMovingAverage(0.9)

    with tf.name_scope('loss'):
        loss = tf.reduce_mean(keras.losses.sparse_categorical_crossentropy(y, predictions))
        loss_averages_op = summary_averages.apply([loss])
        tf.summary.scalar('cross_entropy', summary_averages.average(loss))

    with tf.name_scope('accuracy'):
        with tf.name_scope('correct_prediction'):
            correct_prediction = tf.equal(tf.argmax(predictions, 1), tf.cast(y, tf.int64))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name='accuracy_metric')
        accuracy_averages_op = summary_averages.apply([accuracy])
        tf.summary.scalar('accuracy', summary_averages.average(accuracy))

    with tf.name_scope('train'):
        
        # SynReplicasOptimizer is a wrapper for the optimizer.
        # It is responsible for synchronizing the optimizers of all workers, 
        # wait for a certain number of them (replicas_to_aggregate)
        # to have written their gradients in the parameter server and calculates the averages of those gradients.
        optimizer = tf.train.SyncReplicasOptimizer(tf.train.AdamOptimizer(np.sqrt(num_workers) * 0.001), 
                                                   replicas_to_aggregate=num_workers)
        with tf.control_dependencies([loss_averages_op, accuracy_averages_op]):
            train_op = optimizer.minimize(loss, global_step=global_step)
        # This hook is responsible for creating and initializing variables and operations for SyncReplicasOptimizer.
        sync_replicas_hook = optimizer.make_session_run_hook(is_chief)

    print('Graph definition finished')

    sess_config = tf.ConfigProto(
        allow_soft_placement=True,
        log_device_placement=False,
        device_filters=["/job:ps",
        "/job:worker/task:%d" % FLAGS.task_index])

    n_batches = int(train_images.shape[0] / (BATCH_SIZE * num_workers))
    last_step = int(n_batches * EPOCHS)

    print('Training {} batches...'.format(last_step))

    class _LoggerHook(tf.train.SessionRunHook):
        def begin(self):
            self._total_loss = 0
            self._total_acc = 0

        def before_run(self, run_context):
            return tf.train.SessionRunArgs([loss, accuracy, global_step])

        def after_run(self, run_context, run_values):
            loss_value, acc_value, step_value = run_values.results
            self._total_loss += loss_value
            self._total_acc += acc_value
            if (step_value + 1) % n_batches == 0 and not step_value == 0:
                print("Epoch {}/{} - loss: {:.4f} - acc: {:.4f}".format(
                    int(step_value / n_batches) + 1, EPOCHS, self._total_loss / n_batches, self._total_acc / n_batches))
                self._total_loss = 0
                self._total_acc = 0

        def end(self, session):
            print("Epoch {}/{} - loss: {:.4f} - acc: {:.4f}".format(
                int(session.run(global_step) / n_batches) + 1, EPOCHS, self._total_loss / n_batches, self._total_acc / n_batches))

    class _InitHook(tf.train.SessionRunHook):
        def after_create_session(self, session, coord):
            session.run(dataset_init_op, feed_dict={
                images_placeholder: train_images, labels_placeholder: train_labels, 
                batch_size: BATCH_SIZE, shuffle_size: train_images.shape[0]})
    # The last difference comes in the definition of the monitored session.
    # With the first argument we let it know with which server we will be working during the session.
    # Then we tell him if the current script is the chief or not.
    # stop_grace_period_secs causes the session to end 10 seconds after the operations are finished
    with tf.name_scope('monitored_session'):
        with tf.train.MonitoredTrainingSession(
                master=server.target,
                is_chief=is_chief,
                checkpoint_dir=checkpoint_dir,
                hooks=[_LoggerHook(), _InitHook(), sync_replicas_hook],
                config=sess_config,
                stop_grace_period_secs=10,
                save_checkpoint_steps=n_batches) as mon_sess:
            while not mon_sess.should_stop():
                mon_sess.run(train_op)

Instructions for updating:
Use `tf.compat.v1.data.get_output_types(dataset)`.
Instructions for updating:
Use `tf.compat.v1.data.get_output_shapes(dataset)`.
Instructions for updating:
Use `tf.compat.v1.data.get_output_types(iterator)`.
Instructions for updating:
Use `tf.compat.v1.data.get_output_shapes(iterator)`.
Instructions for updating:
Use `tf.compat.v1.data.get_output_classes(iterator)`.
Instructions for updating:
Use keras.layers.Flatten instead.
Instructions for updating:
Please use `layer.__call__` method instead.
Instructions for updating:
Use keras.layers.Dense instead.
Instructions for updating:
The `SyncReplicaOptimizer` class is deprecated. For synchrononous training, please use [Distribution Strategies](https://github.com/tensorflow/tensorflow/tree/master/tensorflow/contrib/distribute).
INFO:tensorflow:SyncReplicasV2: replicas_to_aggregate=2; total_num_replicas=2
Instructions for updating:
To construct input pipelines, use the `tf.data` module.
Graph definition finished


To evaluate the model we will do the same as before. Load the checkpoints, since our model does not exist once the session is closed.

In this case the difference is that we are going to restart the graph completely and reload it, so in addition to loading the operations we need (accuracy and prediction), we have to load the placeholders and the initialization operation of the iterator. This is due to the fact that the original graph had tensors placed in the parameter server which we don't need for the inference operations.

Finally we draw the result.

In [None]:
if is_chief:
    print('--- Begin Evaluation ---')
    tf.reset_default_graph()
    with tf.Session() as sess:
        ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
        saver = tf.train.import_meta_graph(ckpt.model_checkpoint_path + '.meta', clear_devices=True)
        saver.restore(sess, ckpt.model_checkpoint_path)
        print('Model restored')
        graph = tf.get_default_graph()
        images_placeholder = graph.get_tensor_by_name('dataset/images_placeholder:0')
        labels_placeholder = graph.get_tensor_by_name('dataset/labels_placeholder:0')
        batch_size = graph.get_tensor_by_name('dataset/batch_size:0')
        accuracy = graph.get_tensor_by_name('accuracy/accuracy_metric:0')
        predictions = graph.get_tensor_by_name('softmax/BiasAdd:0')
        dataset_init_op = graph.get_operation_by_name('dataset/dataset_init')
        sess.run(dataset_init_op, feed_dict={
            images_placeholder: test_images, labels_placeholder: test_labels, 
            batch_size: test_images.shape[0], shuffle_size: 1})
        print('Test accuracy: {:4f}'.format(sess.run(accuracy)))
        predicted = sess.run(predictions)
        
    plt.figure(figsize=(10, 10))
    for i in range(25):
        plt.subplot(5, 5, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(test_images[i], cmap=plt.cm.binary)
        predicted_label = np.argmax(predicted[i])
        true_label = test_labels[i]
        if predicted_label == true_label:
            color = 'green'
        else:
            color = 'red'
        plt.xlabel("{} ({})".format(class_names[predicted_label],
                                    class_names[true_label]),
                                    color=color)

    plt.show(True)          