# Distributed TensorFlow Framework
---

Overall there are two main strategies for distributing deep learning:
* **Model Parallelism**
* **Data Parallelism**

**Model Parallelism** is to partition the model across several machines by assigning different nodes to different machines, if the model is too big to fit into the memory on one machine. The benefits is obvious that model parallelism allows us to train extremely large models while the downside of this approach is that one partitioned part has to wait for others during updating.
<div align="center">
<img src="https://raw.githubusercontent.com/bujingyi/distributed-tensorflow-framework/master/model_parallelism.png" >
 </div>

**Data Parallelism** is to distribute training multiple model instances which are assigned on different machines. This approach leads to the parallel architecture called "Parameter Sever (ps) - Worker". Parameter servers will store the entire graph and collect update operations from multiple workers where data batches will be read and gradients will be computed. 
<div align="center">
<img src="https://raw.githubusercontent.com/bujingyi/distributed-tensorflow-framework/master/data_parallelism.png" >
 </div>

Data Parallelism has two ways of training:

* **Synchronous**: all workers read the same parameters from ps and compute updates synchronously.
* **Asynchronous**: workers read from ps (probably different values for the graph parameters), compute and send updates asynchronously.

TensorFlow provides a easy way to implement **Asynchronous Data Parallelism**. This notebook shows how to do it.

## Asunchronous Data Parallelism with TensorFlow

Only three steps to distribute TensorFlow:
1. Define your compute cluster via `tf.trainClusterSpec` and `tf.train.Server`
2. Assgin your model to ps and workers via `tf.device` and `tf.train.replica_device_setter`
3. Launch a distributed TensorFlow session via `tf.train.MonitoredTrainingSession`

Let's go through them.

### 1. Define your compute cluster via tf.trainClusterSpec and tf.train.Server

In [None]:
import tensorflow as tf

In order to share variables between machines, you need to let each member in your cluster know the existence and organization of the cluster. 

With Distributed TensorFlow, each process runs a special execution engine, a TensorFlow server. Servers are linked together as part of a cluster. Each server in the cluster is also known as a task. For example, if we have three computers with IP address 10.189.253.1, 10.189.253.3, 10.189.253.2 repestively, we could define a cluster composed of one ps and two workers as below	

In [None]:
ps_hosts = ['10.189.253.1:2222']
worker_hosts = ['10.189.253.2:2222', '10.189.253.3:2222']

Each task is associated with a role, which is a collection of related tasks. We associate the three tasks with one job called 'ps', and two jobs called 'worker'.

In [None]:
roles = {
    'ps': ps_hosts,
    'worker': worker_hosts
}

Now you can define the cluster by assigning jobs to tasks (servers) via `tf.train.ClusterSpec`

In [None]:
# create a cluster specification to describe the cluster
cluster = tf.train.ClusterSpec(roles)

We have specified our cluster, next is to tell TensorFlow to create servers based on the cluster specification (here we us ps as an example):

In [None]:
server = tf.train.Server(
        server_or_cluster_def=cluster,
        job_name='ps',
        task_index=0
)

Quoted from TensorFlow's offical doc:
>A tf.train.ClusterSpec represents the set of processes that participate in a distributed TensorFlow computation. Every tf.train.Server is constructed in a particular cluster.

>A `tf.train.Server` instance encapsulates a set of devices and a `tf.Session` target that can participate in distributed training. A server belongs to a cluster (specified by a `tf.train.ClusterSpec`), and corresponds to a particular task in a named job. The server can communicate with any other server in the same cluster.

### 2. Assgin your model to ps and workers via tf.device and tf.train.replica_device_setter

After defining cluster and creating servers, you should now assign graph nodes including both operations and variables to a specific server using `tf.device`:

In [None]:
with tf.device(
        tf.train.replica_device_setter(
            worker_device='job:worker/task:{}'.format(0),  # 0 is the task_index
            cluster=cluster
        )
):
    # create the computation graph
    input_placeholder = ...  # placeholders
    layer = ...  # neural network layers
    prediction = ...  # predictions
    loss = ...  # loss
    train_op = ...  # train operation

In Data Parallelism, variables should be assigned to `ps` and operations to `workers`. Fortunately we don't have to manually manipulate them. TensorFlow automatically take care of it by `tf.train.replica_device_setter`.
>`tf.train.replica_device_setter` returns a device function to use when building a Graph for replicas. Device Functions are used in with tf.device(device_function): statement to automatically assign devices to Operation objects as they are constructed, Device constraints are added from the inner-most context first, working outwards. The merging behavior adds constraints to fields that are yet unset by a more inner context. Currently the fields are (job, task, cpu/gpu).

Data Parallelism requires replicating model on different servers. In general there are two ways of doing this:
>* **In-graph replication**. In this approach, the server builds a single `tf.Graph` that contains one set of parameters (in `tf.Variable` nodes pinned to `/job:ps`); and multiple copies of the compute-intensive part of the model, each pinned to a different task in `/job:worker`.  
* **Between-graph replication**. In this approach, there is a separate server for each `/job:worker` task, typically in the same process as the `worker` task. Each server builds a similar graph containing the parameters (pinned to `/job:ps` as before using `tf.train.replica_device_setter` to map them deterministically to the same tasks); and a single copy of the compute-intensive part of the model, pinned to the local task in `/job:worker`.

In a word, in **in-graph replication** all servers run the same graph; in **between-graph replication** each server runs a graph containing only the shared parameters, and whatever variables and operations are relevant to that individual server. It is obvious that the latter is the recommended. The above code replicates the model in this way.

### 3. Launch a distributed TensorFlow session via tf.train.MonitoredTrainingSession

Finally, we can start a TensorFlow session. In distributed settings, `tf.train.MonitoredTrainingSession` is used instead of `tf.Session`. `tf.train.MonitoredTrainingSession` is defined as:
```
tf.train.MonitoredTrainingSession(
    master='',
    is_chief=True,
    checkpoint_dir=None,
    scaffold=None,
    hooks=None,
    chief_only_hooks=None,
    save_checkpoint_secs=USE_DEFAULT,
    save_summaries_steps=USE_DEFAULT,
    save_summaries_secs=USE_DEFAULT,
    config=None,
    stop_grace_period_secs=120,
    log_step_count_steps=100,
    max_wait_secs=7200,
    save_checkpoint_steps=USE_DEFAULT
)
```
From its args we can see that it takes care of graph initializing, checkpoint saving, TensorBoard summaries exporting, and session configuring. Beside these, `tf.train.MonitoredTrainingSession` also takes care of session starting and stopping with `hooks`. By passing a `tf.train.StopAtStepHook` to it, the last step of training is defined, after which all servers will be shut down.

On arg is `is_chief`. In distributed TensorFlow's `ps-worker` architecture, one of the `workers` is appointed as `chief worker`.
> For a chief, this utility sets proper session initializer/restorer. It also creates hooks related to checkpoint and summary saving. For workers, this utility sets proper session creator which waits for the chief to initialize/restore. 

The code looks as follow:

In [None]:
# The StopAtStepHook handles stopping after running given steps.
hooks = [tf.train.StopAtStepHook(last_step=100000)]
config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
with tf.train.MonitoredTrainingSession(
    config=config,
    master=server.target,
    is_chief=(task_index == 0),  # appoint worker 0 as chief worker
    checkpoint_dir='tem/train_logs',
    hooks=hooks
) as sess:
    sess.run(ops)  # run operations

### Now let's put everything together: distributed TensorFlow framework

In [None]:
import tensorflow as tf


# setup hosts
ps_hosts = ['10.189.253.1:2222']
worker_hosts = ['10.189.253.2:2222', '10.189.253.3:2222']

# create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({
    'ps': ps_hosts,
    'worker': worker_hosts
})


def worker(job_name, task_index):
    # create server 
    server = tf.train.Server(
        server_or_cluster_def=cluster,
        job_name=job_name,
        task_index=task_index
    )

    if job_name == 'ps':
        print('Start Parameter Sever: ', task_index)
        server.join()
    elif job_name == 'worker':
        # Assigns ops to the local worker by default.
        with tf.device(
                tf.train.replica_device_setter(
                    worker_device='job:worker/task:{}'.format(task_index),
                    cluster=cluster
                )
        ):
            # count the number of updates
            global_step = tf.Variable(0, name='global_step', trainable=False)

            # TODO: build the graph
            input_placeholder = ...  # placeholders
            layer = ...  # neural network layers
            prediction = ...  # predictions
            loss = ...  # loss
            train_op = ...  # train operation

            # create TensorBoard summaries
            tf.summary.scalar('loss', loss)

            # merge all summaries into a single operation which we can execute in a session
            summary_op = tf.summary.merge_all()

        # the StopAtStepHook handles stopping after running given steps
        hooks = [tf.train.StopAtStepHook(last_step=100000)]
        
        # session configuration
        config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
        config.gpu_options.allow_growth = True
        
        # start session
        with tf.train.MonitoredTrainingSession(
            config=config,
            master=server.target,
            is_chief=(task_index == task_index),
            checkpoint_dir='tem/train_logs',
            hooks=hooks
        ) as sess:
            # tf.MonitoredTrainingSession takes care of session starting and stopping
            while not sess.should_stop():
                # TODO: read input data and train the model
                sess.run(train_op)


if __name__ == '__main__':
    # TODO: add argparse
    worker('ps', 0)  # or worker('worker', task_index)

###Note

TensorFlow will not distributed the code to each server. Therefore when execute the distributed TensorFlow programs, the same code will be sent to all servers. So your script will be sent to the 'workers' and the 'ps'. Run them one all servers. Environment variables are then used to execute a certain code block on the 'workers', another on the 'ps'.

####Reference

[Distributed TensorFlow](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/docs_src/deploy/distributed.md)  
[How to Write Distributed TensorFlow Code — with an Example on Clusterone](https://clusterone.com/blog/2017/09/13/distributed-tensorflow-clusterone/)  
[Distributed TensorFlow: A Gentle Introduction](https://github.com/mrahtz/distributed_tensorflow_gentle_introduction)