In [0]:
# Start a TensorFlow server as a single-process "cluster".
#$ python
import tensorflow as tf
c = tf.constant("Hello, distributed TensorFlow!")
server = tf.train.Server.create_local_server()
sess = tf.Session(server.target)  # Create a session on the server.
sess.run(c)
#'Hello, distributed TensorFlow!'

b'Hello, distributed TensorFlow!'

In [0]:
import tensorflow as tf
from multiprocessing import Process
from time import sleep

cluster = tf.train.ClusterSpec({
    "worker": [
        "localhost:2223",
        "localhost:2224",
    ],
    "ps": [
        "localhost:2225"
    ]
})

def parameter_server():
    with tf.device("/job:ps/task:0"):
        var = tf.Variable(0.0, name='var')

    server = tf.train.Server(cluster,
                             job_name="ps",
                             task_index=0)
    sess = tf.Session(target=server.target)
    
    print("Parameter server: waiting for cluster connection...")
    sess.run(tf.report_uninitialized_variables())
    print("Parameter server: cluster ready!")
    
    print("Parameter server: initializing variables...")
    sess.run(tf.global_variables_initializer())
    print("Parameter server: variables initialized")
    
    for i in range(10):
        val = sess.run(var)
        print("Parameter server: var has value %.1f" % val)
        sleep(1.0)
        if val == 10.0:
          break

    #print("Parameter server: blocking...")
    #server.join() # currently blocks forever    
    print("Parameter server: ended...")

def worker(worker_n):
    with tf.device("/job:ps/task:0"):
        var = tf.Variable(0.0, name='var')
        
    server = tf.train.Server(cluster,
                             job_name="worker",
                             task_index=worker_n)
    sess = tf.Session(target=server.target)
    
    print("Worker %d: waiting for cluster connection..." % worker_n)
    sess.run(tf.report_uninitialized_variables())
    print("Worker %d: cluster ready!" % worker_n)
    
    while sess.run(tf.report_uninitialized_variables()):
        print("Worker %d: waiting for variable initialization..." % worker_n)
        sleep(1.0)
    print("Worker %d: variables initialized" % worker_n)
    
    for i in range(5):
        print("Worker %d: incrementing var" % worker_n)
        sess.run(var.assign_add(1.0))
        
        #aList.append(var)
        
        sleep(1.0)
    
    #print("Worker %d: blocking..." % worker_n)
    #server.join() # currently blocks forever
    print("Worker %d: ended..." % worker_n)
    
#aList = []

ps_proc = Process(target=parameter_server, daemon=True)
w1_proc = Process(target=worker, args=(0, ), daemon=True)
w2_proc = Process(target=worker, args=(1, ), daemon=True)

ps_proc.start()
w1_proc.start()
w2_proc.start()

# if not join, parent will terminate before children 
# & children will terminate as well cuz children are daemon
ps_proc.join() 
#w1_proc.join()
#w2_proc.join()

for proc in [w1_proc, w2_proc, ps_proc]:
    proc.terminate() # only way to kill server is to kill it's process
    
print('All done.')    

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Colocations handled automatically by placer.
Worker 0: waiting for cluster connection...
Worker 1: waiting for cluster connection...
Parameter server: waiting for cluster connection...
Worker 1: cluster ready!
Parameter server: cluster ready!
Parameter server: initializing variables...
Parameter server: variables initialized
Parameter server: var has value 0.0




Worker 1: variables initialized
Worker 1: incrementing var
Worker 0: cluster ready!




Worker 0: variables initialized
Worker 0: incrementing var
Parameter server: var has value 2.0
Worker 1: incrementing var
Worker 0: incrementing var
Parameter server: var has value 4.0
Worker 1: incrementing var
Worker 0: incrementing var
Parameter server: var has value 6.0
Worker 1: incrementing var
Worker 0: incrementing var
Parameter server: var has value 8.0
Worker 1: incrementing var
Worker 0: incrementing var
Parameter server: var has value 10.0
Worker 1: ended...
Worker 0: ended...
Parameter server: ended...
All done.


In [0]:
import tensorflow as tf
import numpy as np
from multiprocessing import Process, Queue
from time import sleep

cluster = tf.train.ClusterSpec({
    "worker": ["localhost:2223", "localhost:2224"],
    "ps": ["localhost:2225"]
})

class Obj(object):
  def __init__(self):   
    self.aVar = tf.Variable(0.0, name='var')
  
  def get_var(self):
    return self.aVar

class Server(object):
  #def __init__(self):   
      
  def parameter_server(self):
    with tf.device("/job:ps/task:0"):
      #var = tf.Variable(0.0, name='var')
      aObj = Obj()
      
    server = tf.train.Server(cluster, job_name="ps", task_index=0)
    sess = tf.Session(target=server.target)
    
    print("Parameter server: waiting for cluster connection...")
    sess.run(tf.report_uninitialized_variables())
    print("Parameter server: cluster ready!")
    
    print("Parameter server: initializing variables...")
    sess.run(tf.global_variables_initializer())
    print("Parameter server: variables initialized")
    
    for i in range(10):
      #val = sess.run(var)
      val = sess.run(aObj.get_var())
      print("Parameter server: var has value %.1f" % val)
      sleep(1.0)
      if val == 10.0:
        break

    #print("Parameter server: blocking...")
    #server.join() # currently blocks forever    
    print("Parameter server: ended...")
      
class Worker(object):
  #def __init__(self):   
        
  def worker(self, worker_n):
    with tf.device("/job:ps/task:0"):
      #var = tf.Variable(0.0, name='var')
      aObj = Obj()
      
    server = tf.train.Server(cluster, job_name="worker", task_index=worker_n)
    sess = tf.Session(target=server.target)
    
    print("Worker %d: waiting for cluster connection..." % worker_n)
    sess.run(tf.report_uninitialized_variables())
    print("Worker %d: cluster ready!" % worker_n)
    
    while sess.run(tf.report_uninitialized_variables()):
      print("Worker %d: waiting for variable initialization..." % worker_n)
      sleep(1.0)
    print("Worker %d: variables initialized" % worker_n)
    
    for i in range(5):
      print("Worker %d: incrementing var" % worker_n)
      #sess.run(var.assign_add(1.0))
      sess.run(aObj.get_var().assign_add(1.0))
      sleep(1.0)
    
    #print("Worker %d: blocking..." % worker_n)
    #server.join() # currently blocks forever
    print("Worker %d: ended..." % worker_n)   
         
s = Server()
w1 = Worker()
w2 = Worker()

ps_proc = Process(target=s.parameter_server, daemon=True)
w1_proc = Process(target=w1.worker, args=(0, ), daemon=True)
w2_proc = Process(target=w2.worker, args=(1, ), daemon=True)

ps_proc.start()
w1_proc.start()
w2_proc.start()

# if not joining, parent will terminate before children 
# & children will terminate as well cuz children are daemon
ps_proc.join() 
#w1_proc.join()
#w2_proc.join()

for proc in [w1_proc, w2_proc, ps_proc]:
    proc.terminate() # only way to kill server is to kill it's process
    
print('done')    

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Instructions for updating:
Colocations handled automatically by placer.

Parameter server: waiting for cluster connection...
Worker 0: waiting for cluster connection...
Worker 1: waiting for cluster connection...
Parameter server: cluster ready!
Worker 1: cluster ready!
Parameter server: initializing variables...
Worker 0: cluster ready!
Worker 1: waiting for variable initialization...
Parameter server: variables initialized
Parameter server: var has value 0.0




Worker 0: variables initialized
Worker 0: incrementing var
Parameter server: var has value 1.0
Worker 0: incrementing var




Worker 1: variables initialized
Worker 1: incrementing var
Parameter server: var has value 3.0
Worker 0: incrementing var
Worker 1: incrementing var
Parameter server: var has value 5.0
Worker 0: incrementing var
Worker 1: incrementing var
Parameter server: var has value 7.0
Worker 0: incrementing var
Worker 1: incrementing var
Parameter server: var has value 9.0
Worker 1: incrementing var
Worker 0: ended...
Parameter server: var has value 10.0
Worker 1: ended...
Parameter server: ended...
done


In [0]:
import tensorflow as tf
from multiprocessing import Process
from time import sleep

cluster = tf.train.ClusterSpec({
    "worker": ["localhost:2223",
               "localhost:2224"
              ],
    "ps": ["localhost:2225"]
})

def parameter_server():
    with tf.device("/job:ps/task:0"):
        var = tf.Variable(0.0, name='var')        
        q = tf.FIFOQueue(10, tf.float32, shared_name="shared_queue") 
        
    server = tf.train.Server(cluster,
                             job_name="ps",
                             task_index=0)
    sess = tf.Session(target=server.target)
    
    print("Parameter server: waiting for cluster connection...")
    sess.run(tf.report_uninitialized_variables())
    print("Parameter server: cluster ready!")
    
    print("Parameter server: initializing variables...")
    sess.run(tf.global_variables_initializer())
    print("Parameter server: variables initialized")
    
    for i in range(10):
        print("Parameter server: var has value %.1f" % sess.run(var))
        #print("ps: r", sess.run(qd))
        sleep(1.0)
        if sess.run(var) == 10.0:
          break
    
    sleep(3.0)
    #print("ps: final r", sess.run(qd))
    print("ps q.size(): ", sess.run(q.size()))  
    
    for j in range(sess.run(q.size())):
        print("ps: r", sess.run(q.dequeue()))

    #print("Parameter server: blocking...")
    #server.join() # currently blocks forever    
    print("Parameter server: ended...")

def worker(worker_n): 
    with tf.device("/job:ps/task:0"):
        q = tf.FIFOQueue(10, tf.float32, shared_name="shared_queue")     
    with tf.device(tf.train.replica_device_setter(
                        worker_device='/job:worker/task:' + str(worker_n),
                        cluster=cluster)):
        var = tf.Variable(0.0, name='var')
        
    server = tf.train.Server(cluster,
                             job_name="worker",
                             task_index=worker_n)
    sess = tf.Session(target=server.target)
    
    print("Worker %d: waiting for cluster connection..." % worker_n)
    sess.run(tf.report_uninitialized_variables())
    print("Worker %d: cluster ready!" % worker_n)
    
    while sess.run(tf.report_uninitialized_variables()):
        print("Worker %d: waiting for variable initialization..." % worker_n)
        sleep(1.0)
    print("Worker %d: variables initialized" % worker_n)
    
    for i in range(5):
        print("Worker %d: incrementing var" % worker_n, sess.run(var))
        sess.run(var.assign_add(1.0))
        qe = q.enqueue(sess.run(var))
        sess.run(qe)
        #qd = q.dequeue()
        #print("Worker %d: r" % worker_n, sess.run(qd))
        sleep(1.0)
      
    #print("Worker %d q.size(): " % worker_n, sess.run(q.size()))  
    
    #print("Worker %d: blocking..." % worker_n)
    #server.join() # currently blocks forever
    print("Worker %d: ended..." % worker_n)
    
ps_proc = Process(target=parameter_server, daemon=True)
w1_proc = Process(target=worker, args=(0, ), daemon=True)
w2_proc = Process(target=worker, args=(1, ), daemon=True)

ps_proc.start()
w1_proc.start()
w2_proc.start()

# if not join, parent will terminate before children 
# & children will terminate as well cuz children are daemon
ps_proc.join() 
#w1_proc.join()
#w2_proc.join()

for proc in [w1_proc, w2_proc, ps_proc]:
    proc.terminate() # only way to kill server is to kill it's process
        
print('All done.')            

Instructions for updating:
Instructions for updating:
Colocations handled automatically by placer.

Instructions for updating:
Colocations handled automatically by placer.
Parameter server: waiting for cluster connection...
Worker 0: waiting for cluster connection...
Worker 1: waiting for cluster connection...
Worker 1: cluster ready!
Worker 1: waiting for variable initialization...
Parameter server: cluster ready!
Parameter server: initializing variables...
Parameter server: variables initialized
Parameter server: var has value 0.0
Worker 0: cluster ready!




Worker 0: variables initialized
Worker 0: incrementing var 0.0




Worker 1: variables initialized
Worker 1: incrementing var 1.0
Parameter server: var has value 2.0
Worker 0: incrementing var 2.0
Worker 1: incrementing var 3.0
Parameter server: var has value 4.0
Worker 0: incrementing var 4.0
Worker 1: incrementing var 5.0
Parameter server: var has value 6.0
Worker 0: incrementing var 6.0
Worker 1: incrementing var 7.0
Parameter server: var has value 8.0
Worker 0: incrementing var 8.0
Worker 1: incrementing var 9.0
Worker 0: ended...
Worker 1: ended...
ps q.size():  10
ps: r 1.0
ps: r 2.0
ps: r 3.0
ps: r 4.0
ps: r 5.0
ps: r 6.0
ps: r 7.0
ps: r 8.0
ps: r 9.0
ps: r 10.0
Parameter server: ended...
All done.


In [0]:
import tensorflow as tf

num_gpus = 1 #2

# place the initial data on the cpu (same as before)
with tf.device('/cpu:0'):
    input_data = tf.Variable([[1., 2., 3.],
                              [4., 5., 6.],
                              [7., 8., 9.],
                              [10., 11., 12.]])
    b = tf.Variable([[1.], [1.], [2.]])

# split the data into chunks for each gpu (new)
inputs = tf.split(input_data, num_gpus)
outputs = []

# loop over available gpus and pass input data (new)
# copies of the same graph but receiving different data
for i in range(num_gpus):
    with tf.device('/gpu:'+str(i)):
        outputs.append(tf.matmul(inputs[i], b))

# merge the results of the devices (new)
with tf.device('/cpu:0'):
    output = tf.concat(outputs, axis=0)

# create a session and run (same as before)
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print(sess.run(output))

Instructions for updating:
Colocations handled automatically by placer.
[[ 9.]
 [21.]
 [33.]
 [45.]]


In [0]:
import sys
import tensorflow as tf

"""
# specify the cluster's architecture
cluster = tf.train.ClusterSpec({'ps': ['192.168.1.1:1111'], 
                                'worker': ['192.168.1.2:1111',
                                           '192.168.1.3:1111']
                               })
"""
# specify the cluster's architecture
cluster = tf.train.ClusterSpec({'ps': ['localhost:3330'], 
                                'worker': ['localhost:3331'
                                           'localhost:3332']
                               })

# parse command-line to specify machine
job_type = sys.argv[1]  # job type: "worker" or "ps"
task_idx = sys.argv[2]  # index job in the worker or ps list
                        # as defined in the ClusterSpec

# create TensorFlow Server. This is how the machines communicate.
server = tf.train.Server(cluster, job_name=job_type, task_index=task_idx)

# parameter server is updated by remote clients. 
# will not proceed beyond this if statement.
if job_type == 'ps':
    server.join()
else:
    # workers only
    with tf.device(tf.train.replica_device_setter(
                        worker_device='/job:worker/task:'+task_idx,
                        cluster=cluster)):
        # build your model here as if you only were using a single machine
        
    with tf.Session(server.target):
        # train your model here