In [1]:
import tensorflow as tf
from multiprocessing import Process
from time import sleep
import random


cluster = tf.train.ClusterSpec({
    "worker": [
        "localhost:3333",
        "localhost:3334",
        "localhost:3335"
    ],
    "ps": [
        "localhost:3336"
    ]
})

def parameter_server(tasks_arr):
    with tf.device("/job:ps/task:0"):
         tasks = tf.Variable(tasks_arr, name = 'tasks')

    server = tf.train.Server(cluster,
                             job_name="ps",
                             task_index=0)
    sess = tf.Session(target=server.target)

    
    print("Master server: oczekiwanie na połączenie...")
    sess.run(tf.report_uninitialized_variables())
    print("Master server: klaster gotowy!")
    
    print("Master server: inicjalizowanie zmiennych...")
    sess.run(tf.global_variables_initializer())
    print("Master server: zmienne zainicjowane")

    while True:
        arr = sess.run(tasks)
        for v in arr:
            print("Wartość " + v.decode())
        print("------------------------")
        sleep(5.0)
        if not any(("True" in item.decode() or "False" in item.decode()) for item in arr): 
            break
    print("Master server: blokowanie...")
    server.join()
    

def worker(worker_n, tasks_arr):
    with tf.device("/job:ps/task:0"):
         tasks = tf.Variable(tasks_arr, name = 'tasks')

    server = tf.train.Server(cluster,
                             job_name="worker",
                             task_index=worker_n)
    sess = tf.Session(target=server.target)

    print("Worker %d: oczekiwanie na połączenie..." % worker_n)
    sess.run(tf.report_uninitialized_variables())
    print("Worker %d: klaster gotowy!" % worker_n)
    
    while sess.run(tf.report_uninitialized_variables()):
        print("Worker %d: oczekiwanie na zainicjowanie zmiennych..." % worker_n)
        sleep(1.0)
    print("Worker %d: zmienne zainicjowane" % worker_n)
    
    while exists_any_task(sess.run(tasks)): 
        for i in range(len(sess.run(tasks))):
            val = sess.run(tasks)[i].decode()
            if("False" in val):
                print("Worker %d: obliczanie" % worker_n + " " + val)
                arrToSave = sess.run(tasks)
                splited = val.split(":")
                arrToSave[i] = splited[0] + ":True"
                sess.run(tasks.assign(arrToSave))

                lev_val = LD(splited[0], "TensorFlow")
                arrToSave = sess.run(tasks)
                arrToSave[i] = splited[0] + ":" + str(lev_val)
                sess.run(tasks.assign(arrToSave))

    print("Worker %d: blokowanie..." % worker_n)
    server.join()

def exists_any_task(arr):
    for item in arr:             
        if "False" in item.decode():
            return True
    return False
                       
def LD(s, t):
    if s == "":
        return len(t)
    if t == "":
        return len(s)
    if s[-1] == t[-1]:
        cost = 0
    else:
        cost = 1
       
    res = min([LD(s[:-1], t)+1,
               LD(s, t[:-1])+1, 
               LD(s[:-1], t[:-1]) + cost])

    return res

tasks_arr = ["asdbabdab:False", 
              "sadaeferfb:False",
              "dswdefergee:False", 
              "ddefrgggrh:False", 
              "dsadeweffew:False",
              "sadqweqerfb:False",
              "loremipsumg:False",
              "dawdwedfer:False",
              "TensorFlow:False",
              "dythhreffew:False",
              "sanyntyqerfb:False",
              "lgrtsumyytg:False",
              "dawdhytwer:False"]
ps_proc = Process(target=parameter_server, args=(tasks_arr, ), daemon=True)
w1_proc = Process(target=worker, args=(0, tasks_arr, ), daemon=True)
w2_proc = Process(target=worker, args=(1, tasks_arr, ), daemon=True)
w3_proc = Process(target=worker, args=(2, tasks_arr, ), daemon=True)


  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
ps_proc.start()

Instructions for updating:
Colocations handled automatically by placer.
Master server: oczekiwanie na połączenie...
Master server: klaster gotowy!
Master server: inicjalizowanie zmiennych...
Master server: zmienne zainicjowane
Wartość asdbabdab:False
Wartość sadaeferfb:False
Wartość dswdefergee:False
Wartość ddefrgggrh:False
Wartość dsadeweffew:False
Wartość sadqweqerfb:False
Wartość loremipsumg:False
Wartość dawdwedfer:False
Wartość TensorFlow:False
Wartość dythhreffew:False
Wartość sanyntyqerfb:False
Wartość lgrtsumyytg:False
Wartość dawdhytwer:False
------------------------
Wartość asdbabdab:True
Wartość sadaeferfb:True
Wartość dswdefergee:True
Wartość ddefrgggrh:False
Wartość dsadeweffew:False
Wartość sadqweqerfb:False
Wartość loremipsumg:False
Wartość dawdwedfer:False
Wartość TensorFlow:False
Wartość dythhreffew:False
Wartość sanyntyqerfb:False
Wartość lgrtsumyytg:False
Wartość dawdhytwer:False
------------------------
Wartość asdbabdab:10
Wartość sadaeferfb:True
Wartość dswdeferg

In [3]:
w1_proc.start()

Instructions for updating:
Colocations handled automatically by placer.
Worker 0: oczekiwanie na połączenie...
Worker 0: klaster gotowy!
Worker 0: oczekiwanie na zainicjowanie zmiennych...




Worker 0: zmienne zainicjowane
Worker 0: obliczanie sadaeferfb:False
Worker 0: obliczanie dsadeweffew:False
Worker 0: obliczanie dawdwedfer:False
Worker 0: obliczanie dythhreffew:False
Worker 0: obliczanie dawdhytwer:False
Worker 0: blokowanie...


In [4]:
w2_proc.start()

Instructions for updating:
Colocations handled automatically by placer.
Worker 1: oczekiwanie na połączenie...
Worker 1: klaster gotowy!
Worker 1: oczekiwanie na zainicjowanie zmiennych...




Worker 1: zmienne zainicjowane
Worker 1: obliczanie dswdefergee:False
Worker 1: obliczanie loremipsumg:False
Worker 1: obliczanie sanyntyqerfb:False
Worker 1: blokowanie...


In [5]:
w3_proc.start()

Instructions for updating:
Colocations handled automatically by placer.
Worker 2: oczekiwanie na połączenie...
Worker 2: klaster gotowy!
Worker 2: oczekiwanie na zainicjowanie zmiennych...




Worker 2: zmienne zainicjowane
Worker 2: obliczanie asdbabdab:False
Worker 2: obliczanie ddefrgggrh:False
Worker 2: obliczanie sadqweqerfb:False
Worker 2: obliczanie TensorFlow:False
Worker 2: obliczanie lgrtsumyytg:False
Worker 2: blokowanie...


In [6]:
for proc in [w1_proc, w2_proc, w3_proc, ps_proc]:
    proc.terminate()