In [2]:
import tensorflow as tf
import numpy as np
import multiprocessing

## log_loss

In [4]:
l = tf.placeholder(tf.int64, shape=[3])
p = tf.placeholder(tf.int64, shape=[3])
ll = tf.losses.log_loss(labels=l, predictions=p)

In [5]:
sess = tf.Session()
sess.run(tf.global_variables_initializer())

In [7]:
print(sess.run(ll, feed_dict={l: [1, 2, 3], p: [1, 2, 3]}))

nan


## numpy.random.choice

In [17]:
a = [1, 2, 3]
p = [0.1, 0.1, 0.8]
r = np.random.choice(a, size=100000, p=p)
s1 = np.sum(r == 1)
s2 = np.sum(r == 2)
s3 = np.sum(r == 3)
d = s1 + s2 + s3
print(s1/d, s2/d, s3/d)

0.10181 0.09952 0.79867


## Gradient accumulation

In [32]:
tf.reset_default_graph()

with tf.variable_scope('test_scope'):
    a = tf.Variable(tf.ones(shape=(2, 2)), name='a')
    b = 2 * tf.Variable(tf.ones(shape=(2, 2)), name='b')
loss = tf.reduce_sum(a + b, name='loss')

for v in tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='test_scope'):
    print(v.name)

test_scope/a:0
test_scope/b:0


In [19]:
o = tf.train.GradientDescentOptimizer(learning_rate=0.1)
grads_and_vars = o.compute_gradients(loss, tf.trainable_variables())
grads_dict = {var: grads for grads, var in grads_and_vars}
for var, grads in grads_dict.items():
    print(var, grads)

<tf.Variable 'b:0' shape=(2, 2) dtype=float32_ref> Tensor("gradients/mul_grad/tuple/control_dependency_1:0", shape=(2, 2), dtype=float32)
<tf.Variable 'a:0' shape=(2, 2) dtype=float32_ref> Tensor("gradients/add_grad/tuple/control_dependency:0", shape=(2, 2), dtype=float32)


In [20]:
grad_bufs = {}
for var, grads in grads_dict.items():
    grad_bufs[var] = \
        tf.Variable(tf.zeros(shape=grads.shape), trainable=False)
        
update_ops = []
for var in grads_dict.keys():
    assign = grad_bufs[var]
    add = grads_dict[var]
    update_ops.append(tf.assign_add(assign, add))
update_gradients = tf.group(*update_ops)
    
grad_bufs_and_vars = []
for var, grad_buf in grad_bufs.items():
    grad_bufs_and_vars.append((grad_buf, var))

apply_gradients = o.apply_gradients(grad_bufs_and_vars)

In [21]:
sess = tf.Session()

In [23]:
sess.run(tf.global_variables_initializer())
print(sess.run([a])); print(sess.run([b])); print()
sess.run(update_gradients)
print(sess.run([a])); print(sess.run([b])); print()
sess.run(apply_gradients)
print(sess.run([a])); print(sess.run([b]))

[array([[ 1.,  1.],
       [ 1.,  1.]], dtype=float32)]
[array([[ 2.,  2.],
       [ 2.,  2.]], dtype=float32)]

[array([[ 1.,  1.],
       [ 1.,  1.]], dtype=float32)]
[array([[ 2.,  2.],
       [ 2.,  2.]], dtype=float32)]

[array([[ 0.89999998,  0.89999998],
       [ 0.89999998,  0.89999998]], dtype=float32)]
[array([[ 1.60000002,  1.60000002],
       [ 1.60000002,  1.60000002]], dtype=float32)]


In [24]:
sess.run(tf.global_variables_initializer())
print(sess.run([a])); print(sess.run([b])); print()
sess.run(update_gradients)
sess.run(update_gradients)
print(sess.run([a])); print(sess.run([b])); print()
sess.run(apply_gradients)
print(sess.run([a])); print(sess.run([b]))

[array([[ 1.,  1.],
       [ 1.,  1.]], dtype=float32)]
[array([[ 2.,  2.],
       [ 2.,  2.]], dtype=float32)]

[array([[ 1.,  1.],
       [ 1.,  1.]], dtype=float32)]
[array([[ 2.,  2.],
       [ 2.,  2.]], dtype=float32)]

[array([[ 0.80000001,  0.80000001],
       [ 0.80000001,  0.80000001]], dtype=float32)]
[array([[ 1.20000005,  1.20000005],
       [ 1.20000005,  1.20000005]], dtype=float32)]


## Multithreaded TensorFlow

In [3]:
from multiprocessing import Process

In [4]:
def f(sess, var):
    print("In f!")
    print(sess.run(var))
    print("Done!")

tf.reset_default_graph()
sess = tf.Session()
w1 = tf.Variable(5)
sess.run(tf.global_variables_initializer())
print(sess.run(w1))

p1 = Process(target=f, args=(sess, w1))
p1.start()

5
In f!


## Distributed Tensorflow

In [5]:
import tensorflow as tf
from multiprocessing import Process

## Attempt 1

In [6]:
# Task: a process, running in a node in the cluster

In [7]:
def create_ps_graph():
    with tf.device("/job:ps/task:0"):
        w = tf.Variable(0)
    return w

In [8]:
def create_worker_graph(w):
    with tf.device("/job:ps/task:0"):
        add = tf.assign_add(w, 1)
    return add

In [9]:
cluster = tf.train.ClusterSpec({"worker": ["localhost:2222"],
                                "ps":     ["localhost:2223"]})

In [18]:
def ps():
    # Describes list of "tasks" (i.e. list of processes)
    # Should be the same for all tasks (i.e. all processes)
    # Create a "server",
    # and register the current "task" (i.e. the current process)
    # on the cluster
    server = tf.train.Server(cluster, job_name="ps")
    w = create_ps_graph()
    sess = tf.Session(server.target)
    sess.run(tf.global_variables_initializer())
    print("Joining...")
    server.join()

def worker():
    server = tf.train.Server(cluster, job_name="worker")
    w = create_ps_graph()
    add = create_worker_graph(w)
    sess = tf.Session(server.target)
    while len(sess.run(tf.report_uninitialized_variables())) > 0:
        print("Sleeping...")
    print(sess.run(add))
    
def worker2():
    server = tf.train.Server(cluster, job_name="worker")
    w = create_ps_graph()
    add = create_worker_graph(w)
    sess = tf.Session(server.target)
    while len(sess.run(tf.report_uninitialized_variables())) > 0:
        print("Sleeping...")
    print(sess.run(w))

In [19]:
ps_p.terminate()

In [20]:
ps_p = Process(target=ps)
ps_p.start()

Joining...


In [21]:
worker_p.terminate()

In [24]:
worker_p = Process(target=worker)
worker_p.start()

2


In [25]:
worker_p_2 = Process(target=worker2)
worker_p_2.start()

2
