# 分布式Tensorflow训练：多机多卡的实现

In [None]:
import tensorflow as tf

In [2]:
var = tf.Variable(initial_value=0.0)

第一步，我们需要为每个进程创建自己的会话。

In [3]:
sess1 = tf.Session()
sess2 = tf.Session()

sess1.run(tf.global_variables_initializer())
sess2.run(tf.global_variables_initializer())

In [4]:
print("Initial value of var in session 1:", sess1.run(var))
print("Initial value of var in session 2:", sess2.run(var))

sess1.run(var.assign_add(1.0))
print("Incremented var in session 1")

print("Value of var in session 1:", sess1.run(var))
print("Value of var in session 2:", sess2.run(var))

Initial value of var in session 1: 0.0
Initial value of var in session 2: 0.0
Incremented var in session 1
Value of var in session 1: 1.0
Value of var in session 2: 0.0


# Distributed TensorFlow

In [None]:
import tensorflow as tf
c = tf.constant("Hello, Distributed TensorFlow!")
# 创建一个本地TensorFlow集群
server = tf.train.Server.create_local_server()
# 在集群上创建一个会话
sess = tf.Session(server.target)
print(sess.run(c)）

第一步是定义集群的规模。我们从最简单的集群开始：即两台服务器（两个任务），它们都在同一台机器上，一个在 2222 端口，一个在 2223 端口。

In [5]:
tasks = ["localhost:2222", "localhost:2223"]

每个任务都与「工作」（job）相关联，该工作是相关任务的集合。我们将这两个任务与一个称为「local」的工作相关联。

In [6]:
jobs = {"local": tasks}

所有这些即定义为一个集群。

In [7]:
cluster = tf.train.ClusterSpec(jobs)

In [8]:
server1 = tf.train.Server(cluster, job_name="local", task_index=0)

server2 = tf.train.Server(cluster, job_name="local", task_index=1)

特性：任何具有相同名称的变量都将在所有服务器之间共享。

In [9]:
tf.reset_default_graph()
var = tf.Variable(initial_value=0.0, name='var')
sess1 = tf.Session(server1.target)
sess2 = tf.Session(server2.target)

In [10]:
sess1.run(tf.global_variables_initializer())
sess2.run(tf.global_variables_initializer())

print("Initial value of var in session 1:", sess1.run(var))
print("Initial value of var in session 2:", sess2.run(var))

sess1.run(var.assign_add(1.0))
print("Incremented var in session 1")

print("Value of var in session 1:", sess1.run(var))
print("Value of var in session 2:", sess2.run(var))

Initial value of var in session 1: 0.0
Initial value of var in session 2: 0.0
Incremented var in session 1
Value of var in session 1: 1.0
Value of var in session 2: 1.0


# 存放

In [11]:
def run_with_location_trace(sess, op):
    run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
    run_metadata = tf.RunMetadata()
    sess.run(op, options=run_options, run_metadata=run_metadata)
    for device in run_metadata.step_stats.dev_stats:
      print(device.device)
      for node in device.node_stats:
        print("  ", node.node_name)

In [12]:
run_with_location_trace(sess1, var)

/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   var


In [13]:
run_with_location_trace(sess1, var.assign_add(1.0))

/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   AssignAdd_1/value
   var
   AssignAdd_1


In [14]:
run_with_location_trace(sess2, var)

/job:local/replica:0/task:1/device:CPU:0
   _SOURCE
/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   var


In [15]:
with tf.device("/job:local/task:0"):
    var1 = tf.Variable(0.0, name='var1')
with tf.device("/job:local/task:1"):
    var2 = tf.Variable(0.0, name='var2')
    
# (This will initialize both variables)
sess1.run(tf.global_variables_initializer())

In [16]:
run_with_location_trace(sess1, var1)

/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   var1


In [17]:
run_with_location_trace(sess1, var2)

/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
/job:local/replica:0/task:1/device:CPU:0
   _SOURCE
   var2


In [18]:
run_with_location_trace(sess2, var2)

/job:local/replica:0/task:1/device:CPU:0
   _SOURCE
   var2


In [19]:
run_with_location_trace(sess2, var1)

/job:local/replica:0/task:1/device:CPU:0
   _SOURCE
/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   var1


# 计算图

In [20]:
cluster = tf.train.ClusterSpec({"local": ["localhost:2224", "localhost:2225"]})
server1 = tf.train.Server(cluster, job_name="local", task_index=0)
server2 = tf.train.Server(cluster, job_name="local", task_index=1)

In [21]:
graph1 = tf.Graph()
with graph1.as_default():
    var1 = tf.Variable(0.0, name='var')
sess1 = tf.Session(target=server1.target, graph=graph1)
print(graph1.get_operations())

[<tf.Operation 'var/initial_value' type=Const>, <tf.Operation 'var' type=VariableV2>, <tf.Operation 'var/Assign' type=Assign>, <tf.Operation 'var/read' type=Identity>]


In [22]:
graph2 = tf.Graph()
sess2 = tf.Session(target=server2.target, graph=graph2)
print(graph2.get_operations())

[]


In [23]:
with graph2.as_default():
    var2 = tf.Variable(0.0, name='var')

In [24]:
sess1.run(var1.assign(1.0))
sess2.run(var2)

1.0

# 实践细节

In [None]:
import tensorflow as tf

## 谁负责初始化共享变量？

In [10]:
def s1():
    server1 = tf.train.Server(cluster,
                              job_name="local",
                              task_index=0)
    var = tf.Variable(0.0, name='var')
    sess1 = tf.Session(server1.target)
    
    print("Server 1: waiting for connection...")
    sess1.run(tf.report_uninitialized_variables())
    while len(sess1.run(tf.report_uninitialized_variables())) > 0:
        print("Server 1: waiting for initialization...")
        sleep(1.0)
    print("Server 1: variables initialized!")

def s2():
    server2 = tf.train.Server(cluster,
                              job_name="local",
                              task_index=1)
    var = tf.Variable(0.0, name='var')
    sess2 = tf.Session(server2.target)
    
    for i in range(3):
        print("Server 2: waiting %d seconds before initializing..."
              % (3 - i))
        sleep(1.0)
    sess2.run(tf.global_variables_initializer())
    
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)
p1.start()
p2.start()

Server 1: waiting for connection...
Server 2: waiting 3 seconds before initializing...
Server 1: waiting for initialization...
Server 2: waiting 2 seconds before initializing...
Server 1: waiting for initialization...
Server 2: waiting 1 seconds before initializing...
Server 1: waiting for initialization...
Server 1: variables initialized!


In [11]:
p1.terminate()
p2.terminate()

# 示例

我们会创建：

- 一个存储单个变量 var 的参数服务器。
- 两个工作站任务（worker task），每个工作站将多次增加变量 var 的值。
我们将让参数服务器多输出几次 var 的值，以便查看其变化。

In [12]:
import tensorflow as tf
from multiprocessing import Process
from time import sleep

cluster = tf.train.ClusterSpec({
    "worker": [
        "localhost:3333",
        "localhost:3334",
    ],
    "ps": [
        "localhost:3335"
    ]
})

def parameter_server():
    with tf.device("/job:ps/task:0"):
        var = tf.Variable(0.0, name='var')

    server = tf.train.Server(cluster,
                             job_name="ps",
                             task_index=0)
    sess = tf.Session(target=server.target)
    
    print("Parameter server: waiting for cluster connection...")
    sess.run(tf.report_uninitialized_variables())
    print("Parameter server: cluster ready!")
    
    print("Parameter server: initializing variables...")
    sess.run(tf.global_variables_initializer())
    print("Parameter server: variables initialized")
    
    for i in range(5):
        val = sess.run(var)
        print("Parameter server: var has value %.1f" % val)
        sleep(1.0)

    print("Parameter server: blocking...")
    server.join()
    

def worker(worker_n):
    with tf.device("/job:ps/task:0"):
        var = tf.Variable(0.0, name='var')
        
    server = tf.train.Server(cluster,
                             job_name="worker",
                             task_index=worker_n)
    sess = tf.Session(target=server.target)
    
    print("Worker %d: waiting for cluster connection..." % worker_n)
    sess.run(tf.report_uninitialized_variables())
    print("Worker %d: cluster ready!" % worker_n)
    
    while sess.run(tf.report_uninitialized_variables()):
        print("Worker %d: waiting for variable initialization..." % worker_n)
        sleep(1.0)
    print("Worker %d: variables initialized" % worker_n)
    
    for i in range(5):
        print("Worker %d: incrementing var" % worker_n)
        sess.run(var.assign_add(1.0))
        sleep(1.0)
    
    print("Worker %d: blocking..." % worker_n)
    server.join()

ps_proc = Process(target=parameter_server, daemon=True)
w1_proc = Process(target=worker, args=(0, ), daemon=True)
w2_proc = Process(target=worker, args=(1, ), daemon=True)

In [13]:
ps_proc.start()

Parameter server: waiting for cluster connection...
Parameter server: cluster ready!
Parameter server: initializing variables...
Parameter server: variables initialized
Parameter server: var has value 0.0
Parameter server: var has value 2.0
Parameter server: var has value 4.0
Parameter server: var has value 5.0
Parameter server: var has value 7.0
Parameter server: blocking...


In [14]:
w1_proc.start()

Worker 0: waiting for cluster connection...
Worker 0: cluster ready!
Worker 0: waiting for variable initialization...
Worker 0: variables initialized
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: blocking...


In [15]:
w2_proc.start()

Worker 1: waiting for cluster connection...
Worker 1: cluster ready!
Worker 1: waiting for variable initialization...
Worker 1: variables initialized
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: blocking...


In [16]:
for proc in [w1_proc, w2_proc, ps_proc]:
    proc.terminate()

# 总结

我们了解了：

- 如何将多个 TensorFlow 执行引擎（运行在不同进程或不同机器上）集成为一个集群，以便共享变量。
- 如何为变量或操作指定服务器。
- 图内复制与图间复制。
- 如何等待变量被集群中的另一个任务初始化。