# Distributed Tensorflow


![TensorFlowing](./files/tensors_flowing.gif)


In [None]:
import tensorflow as tf

In [None]:
%load_ext version_information
%version_information numpy, scipy, matplotlib, pandas, tensorflow, sklearn, skflow

## Overview of Components

### Cluster

To define a distributed computation in tensorflow we need to specify two kinds of jobs:

- worker jobs
- parameter server (ps) jobs

Each **job** is defined by one ore more **tasks**. Each task is usually specified with a simple numerical index, i.e. `0,1,2,3, ..`.



In [None]:
CLUSTER_SPEC= """
{
    'ps' : ['tensorflow0.pipeline.io:8888', 'tensorflow1.pipeline.io:8888'],
    'worker' : [ 'tensorflow2.pipeline.io:8888','tensorflow3.pipeline.io:8888'],
}
"""

In [None]:
import ast

cluster_def = ast.literal_eval(CLUSTER_SPEC)

In [None]:
cluster_def

In [None]:
spec = tf.train.ClusterSpec(cluster_def)

In [None]:
spec.jobs

In [None]:
for job in spec.jobs:
    print(job, spec.job_tasks(job))    

In [None]:
workers = ['/job:worker/task:{}'.format(i) for i in range(len(cluster_def['worker']))]
param_servers = ['/job:ps/task:{}'.format(i) for i in range(len(cluster_def['ps']))]

In [None]:
workers

In [None]:
param_servers

### Pinning of Variables
Each Variable is assigned to a specific device.

In [None]:
l = tf.Variable("local_cpu")
l.device

We can enforce the assigned device using the `tf.device` context.

In [None]:
for ps in param_servers:
    with tf.device(ps):
        v = tf.Variable("my_var")
v.device

## Tensorflow Server

The server is responsible to handle the actual communication. On each of the cluster's node we will spawn a simple gRPC Server. 

In [None]:
def launch_worker(job_name, task_id, cluster_def):
    server = tf.train.Server(
        cluster_def,
        job_name=job_name,
        task_index=task_id
    )
    server.join()

### Connecting to a Server

to connect to _any_ server you can specify the 'target' of the session,direct ip:port of the server when creating a [Session](https://www.tensorflow.org/versions/r0.8/api_docs/python/client.html#Session) object.

Note that the server is generic and can assume either the role of parameter server or of worker.The Cluster configuration decides the role.

![ps workers](./ps_workers.png)

The best practice is to create a single Image launching the tensorflow worker. 

Environment variables then specify the exact role for the worker at run time.

### gRPC

[gRPC](http://www.grpc.io) Is a Remote Procedure Call protocol based on [Protocol Buffers](https://developers.google.com/protocol-buffers/).


Each object in tensorflow that has to be sent over the wire has a gRPC definition. 

1. Client figures out what variables need to be serialized to gRPC.
1. Client makes the gRPC remote call to the Server and sends the values.
1. If the Server accepts the call, the serialized tensors are de-serialized
1. The Server runs the requested operation on the graph and all its dependencies
1. The Server serializes the result and sends it back on the same connection to the Client
1. The Client receives the results and deserializes.

![gRPC Communicaton](./grpc_communication.png)

Example of a gRPC declaration for the [Variable ](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/framework/variable.proto)


```javascript
syntax = "proto3";

package tensorflow;

// Protocol buffer representing a Variable.
message VariableDef {
  // Name of the variable tensor.
  string variable_name = 1;

  // Name of the initializer op.
  string initializer_name = 2;

  // Name of the snapshot tensor.
  string snapshot_name = 3;

}
```

Each variable can then be serialized using the `to_proto` method:

In [None]:

v.to_proto()

## Simple reduce sum Example

In [None]:
batch_size = 1000

graph = tf.Graph()
with graph.as_default():
        
    with tf.device('/job:ps/task:0'):
        input_array = tf.placeholder(tf.int32, shape=[None])
        final_result = tf.Variable(0)
        
    # divide the input across the cluster:
    all_reduce = []
    splitted = tf.split(0, len(workers), input_array)
    for idx, (portion, worker) in enumerate(zip(splitted,workers)):
        with tf.device(worker):
           print(worker)
           local_reduce = tf.reduce_sum(portion)
           local_reduce = tf.Print(portion, [local_reduce], message="portion is")
           all_reduce.append(local_reduce)
    
    final_result = tf.reduce_sum(tf.pack(all_reduce))

In [None]:
sess_config = tf.ConfigProto(
    allow_soft_placement=True,
    log_device_placement=True)

We can now run the graph 

In [None]:
import numpy as np
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)

with tf.Session("grpc://tensorflow3.pipeline.io:8888", graph=graph, config=sess_config) as session:
    result = session.run(final_result, feed_dict={ input_array: np.ones([1000]) }, options=run_options)
    print(result)

We can also inspect any remote variable:

In [None]:
final_result.device

In [None]:
with tf.Session("grpc://tensorflow3.pipeline.io:8888", graph=graph, config=sess_config) as session:
    result = session.run(local_reduce, feed_dict={ input_array: np.ones([1000]) }, options=run_options)
    print(result)