# 1. distributed tensorflow 개요

* Konan HelloWorld / 분산딥러닝 스터디 [1, 3]
* 김무성

# 차례
* 1 실습 환경
* 2 Hello distributed TensorFlow!
* 3 공식 튜토리얼 간략 소개
    - Create a cluster
    - Specifying distributed devices in your model
    - Specifying distributed devices in your model
    - Replicated training
    - Putting it all together: example trainer program

#### 참고

##### tensorflow 개요
* [1] Large Scale Deep Learning with TensorFlow - http://www.slideshare.net/JenAman/large-scale-deep-learning-with-tensorflow

##### distributed tensorflow 개요
* [2] Intro to the Distributed Version of TensorFlow - http://www.slideshare.net/altoros/intro-to-the-distributed-version-of-tensorflow
* [3] Distributed TensorFlow (공식 홈페이지 튜토리얼) -https://www.tensorflow.org/versions/r0.8/how_tos/distributed/index.html#distributed-tensorflow

##### 앞으로의 실습 계획 관련 (docker)
* [4] tensorflow 공식 docker hub - https://hub.docker.com/u/tensorflow/
* [5] Tensorflow in Docker - http://www.slideshare.net/EricAhn/tensorflow-in-docker

## 1. 실습 환경

docker run -it --name run_dist_tf -p 8888:8888 -p 6006:6006 -v [your_dir_path]:/notebooks/work/ gcr.io/tensorflow/tensorflow

## 2. Hello distributed TensorFlow!

In [1]:
# Start a TensorFlow server as a single-process "cluster".
import tensorflow as tf

In [2]:
c = tf.constant("Hello, distributed TensorFlow!")

In [3]:
server = tf.train.Server.create_local_server()

In [4]:
sess = tf.Session(server.target)  # Create a session on the server.

In [5]:
sess.run(c)

'Hello, distributed TensorFlow!'

## 3. 공식 튜토리얼 간략 소개
* Create a cluster
* Specifying distributed devices in your model
* Specifying distributed devices in your model
* Replicated training
* Putting it all together: example trainer program

### Create a cluster

* tf.train.ClusterSpec
* tf.train.Server

#### tf.train.ClusterSpec

<font color="red">Create a tf.train.ClusterSpec</font> that describes all of the tasks in the cluster. This should be the same for each task.

In [None]:
# 예제 1 
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})

In [None]:
# 예제 2
tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222",
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})

#### train.Server

<font color="red">Create a tf.train.Server</font>, passing the tf.train.ClusterSpec to the constructor, and identifying the local task with a job name and task index.

For example, to launch a cluster with two servers running on localhost:2222 and localhost:2223, run the following snippets in two different processes on the local machine:

In [None]:
# In task 0:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)

In [None]:
# In task 1:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)

### Specifying distributed devices in your model

In [None]:
with tf.device("/job:ps/task:0"):
  weights_1 = tf.Variable(...)
  biases_1 = tf.Variable(...)

with tf.device("/job:ps/task:1"):
  weights_2 = tf.Variable(...)
  biases_2 = tf.Variable(...)

with tf.device("/job:worker/task:7"):
  input, labels = ...
  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
  # ...
  train_op = ...

with tf.Session("grpc://worker7.example.com:2222") as sess:
  for _ in range(10000):
    sess.run(train_op)

### Replicated training

* In-graph replication
* Between-graph replication
* Asynchronous training
* Synchronous training

### Putting it all together: example trainer program

#### trainer.py

In [6]:
import tensorflow as tf

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

In [7]:
# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS

In [None]:
import tensorflow as tf

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS


def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # Build model...
      loss = ...
      global_step = tf.Variable(0)

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

      saver = tf.train.Saver()
      summary_op = tf.merge_all_summaries()
      init_op = tf.initialize_all_variables()

    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step,
                             save_model_secs=600)

    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
      # Loop until the supervisor shuts down or 1000000 steps have completed.
      step = 0
      while not sv.should_stop() and step < 1000000:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        _, step = sess.run([train_op, global_step])

    # Ask for all the services to stop.
    sv.stop()

if __name__ == "__main__":
  tf.app.run()

To start the trainer with two parameter servers and two workers, use the following command line (assuming the script is called trainer.py):

In [None]:
# On ps0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=1

# 참고자료 
* [1] Large Scale Deep Learning with TensorFlow - http://www.slideshare.net/JenAman/large-scale-deep-learning-with-tensorflow
* [2] Intro to the Distributed Version of TensorFlow - http://www.slideshare.net/altoros/intro-to-the-distributed-version-of-tensorflow
* [3] Distributed TensorFlow (공식 홈페이지 튜토리얼) -https://www.tensorflow.org/versions/r0.8/how_tos/distributed/index.html#distributed-tensorflow
* [4] tensorflow 공식 docker hub - https://hub.docker.com/u/tensorflow/
* [5] Tensorflow in Docker - http://www.slideshare.net/EricAhn/tensorflow-in-docker