# Demo: how to distribute run

In [1]:
from __future__ import division, print_function, unicode_literals

# Common imports
import numpy as np
import os

In [2]:
import tensorflow as tf

  from ._conv import register_converters as _register_converters


In [3]:
# to make this notebook's output stable across runs
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)

# Cluster

In [4]:
cluster_spec = tf.train.ClusterSpec({
    "ps": [
        "127.0.0.1:2221",  # /job:ps/task:0
        "127.0.0.1:2222",  # /job:ps/task:1
    ],
    "worker": [
        "127.0.0.1:2223",  # /job:worker/task:0
        "127.0.0.1:2224",  # /job:worker/task:1
        "127.0.0.1:2225",  # /job:worker/task:2
    ]})

## Assignment 1
if task has the port for process, what will happen?

In [None]:
# coding for the same port


In [5]:
task_ps  = tf.train.Server(cluster_spec, job_name="ps", task_index=0)
## Your bug: don't use join(), until need to wait for finishing 
## move this to multi-thread context solution, in case that you need to put they into main thread like, this will block
# task_ps.join()
task_ps1 = tf.train.Server(cluster_spec, job_name="ps", task_index=1)
task_worker0 = tf.train.Server(cluster_spec, job_name="worker", task_index=0)
task_worker1 = tf.train.Server(cluster_spec, job_name="worker", task_index=1)
task_worker2 = tf.train.Server(cluster_spec, job_name="worker", task_index=2)

### Example for main thread waiting
# if __name__ == '__main__':
#     1. invoke your server
#     2. multithread to trigger server creation and return server variables 
#        task_ps, task_ps1, task_worker0, task_worker1, task_worker3 = asyncreateserver()
#     3. join() group
#           task_ps.join()
#           task_ps1.join()
#           task_worker0.join()
#           task_worker1.join()
#           task_worker2.join()

## Assignment 2 (Optional)
main.py - 
1 . task_ps, task_ps1, task_worker0, task_worker1, task_worker3 = asyncreateserver()
 - create server task_ps, task_ps1, task_worker0, task_worker1, task_worker3
 - waitfor input in thread2()
2. join() to wait for server finish()

In [None]:
# coding for the thread

In [6]:
reset_graph()

with tf.device("/job:ps"):
    a = tf.Variable(1.0, name="a")

with tf.device("/job:worker"):
    b = a + 2

with tf.device("/job:worker/task:1"):
    c = a + b

with tf.Session("grpc://127.0.0.1:2221") as sess:
    sess.run(tf.global_variables_initializer())
    print(a.eval())
    print(b.eval())
    print(c.eval())

1.0
3.0
4.0


In [7]:
reset_graph()

# using cross-context for distributed tensorflow
with tf.device(tf.train.replica_device_setter(
        ps_tasks=2,
        ps_device="/job:ps",
        worker_device="/job:worker")):
    v1 = tf.Variable(1.0, name="v1")  # pinned to /job:ps/task:0 (defaults to /cpu:0)
    v2 = tf.Variable(2.0, name="v2")  # pinned to /job:ps/task:1 (defaults to /cpu:0)
    v3 = tf.Variable(3.0, name="v3")  # pinned to /job:ps/task:0 (defaults to /cpu:0)
    s = v1 + v2            # pinned to /job:worker (defaults to task:0/cpu:0)
    with tf.device("/task:1"):
        p1 = 2 * s         # pinned to /job:worker/task:1 (defaults to /cpu:0)
        with tf.device("/cpu:0"):
            p2 = 3 * s     # pinned to /job:worker/task:1/cpu:0
    # cross device 
    c = p1 + p2  # pinned to /job:worker (defaults to task:1/cpu:0)

config = tf.ConfigProto()
config.log_device_placement = True

with tf.Session("grpc://127.0.0.1:2221", config=config) as sess:
    sess.run(tf.global_variables_initializer())
    # single device
    print(s.eval())
    print(p1.eval())
    print(p2.eval())
    # cross device
    print(c.eval())

3.0
6.0
9.0
15.0


# Demo for data sum

In [8]:
# for sum on different works
with tf.device(tf.train.replica_device_setter(
        ps_tasks=2, # for automatically assign for /job:ps/task:[index]
        ps_device="/job:ps",
        worker_device="/job:worker")):
    v1 = tf.placeholder(tf.float32, name="v1")  # pinned to /job:ps/task:0 (defaults to /cpu:0)
    v2 = tf.placeholder(tf.float32, name="v2")  # pinned to /job:ps/task:1 (defaults to /cpu:0)
    with tf.device("/task:1"):
        with tf.device("/cpu:0"):
            sum1 = tf.reduce_mean(v1)     # pinned to /job:worker/task:1/cpu:0
    with tf.device("/task:2"):
        with tf.device("/cpu:0"):
            sum2 = tf.reduce_mean(v2)     # pinned to /job:worker/task:2/cpu:0
            
config = tf.ConfigProto()
config.log_device_placement = True

with tf.Session("grpc://127.0.0.1:2221", config=config) as sess:
    sess.run(tf.global_variables_initializer())
    print(sess.run(sum1, feed_dict={v1:np.random.randn(10)}))
    print(sess.run(sum2, feed_dict={v2:np.random.randn(10)}))

0.44806108
-0.79065824


# Queue runners and coordinators

In [14]:
# generate random csv
import pandas as pd
columns = ['A','B', 'C']
data = [(1, 2, 3) for _ in range(100000)]
df = pd.DataFrame(data, columns=columns)
df.to_csv("my_test.csv", index=False, header=False)

In [17]:
# generate shuffle batch for input
reset_graph()

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
    name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

n_threads = 5
queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)
coord = tf.train.Coordinator()

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
#             print(sess.run([minibatch_instances, minibatch_targets]))
            sess.run([minibatch_instances, minibatch_targets])
    except tf.errors.OutOfRangeError as ex:
        print("No more training instances")

No more training instances
