In [1]:
import tensorflow as tf

c = tf.constant("Hello distributed tensorflow !")
server = tf.train.Server.create_local_server()

In [2]:
with tf.Session(server.target) as sess:
    print(sess.run(c))

b'Hello distributed tensorflow !'


In [3]:
server.target

'grpc://localhost:53414'

In [4]:
# simple placement

with tf.device("/cpu:0"):
    a = tf.Variable(3.0)
    b = tf.Variable(4.0)

c = a * b

# cpu:0 aggregates all cpu on a multi-CPU system
# currently there is no way to pin nodes on specific CPUs
# or to use a subset of all cpu

In [5]:
# Loggin placements

config = tf.ConfigProto()
config.log_device_placement = True
sess = tf.Session(config=config)

Device mapping:
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: GeForce GTX 1050, pci bus id: 0000:01:00.0, compute capability: 6.1



In [6]:
init = tf.global_variables_initializer()

In [7]:
i = tf.global_variables()
print(i)

[<tf.Variable 'Variable:0' shape=() dtype=float32_ref>, <tf.Variable 'Variable_1:0' shape=() dtype=float32_ref>]


In [8]:
with tf.Session() as sess:
    a.initializer.run()
    b.initializer.run()
    result = c.eval()

print(result)

12.0


In [9]:
# with using our config variable

with tf.Session(config=config) as sess:
    with tf.device("/cpu:0"):
        init.run()
        result = c.eval()
        print(result)

Device mapping:
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: GeForce GTX 1050, pci bus id: 0000:01:00.0, compute capability: 6.1

12.0


In [10]:
import subprocess

n = str(subprocess.check_output(["nvidia-smi", "-L"]))
print(n)

b'GPU 0: GeForce GTX 1050 (UUID: GPU-b0ade927-3fa3-0bfa-0d06-c1a5174b532d)\r\n'


In [11]:
# Dynamic placment

def variables_on_cpu(op):
    if op.type == "Variable":
        return "/cpu:0"
    else:
        return "/gpu:0"

with tf.device(variables_on_cpu):
    a = tf.Variable(3.0)
    b = tf.constant(4.0)
    # b = tf.Variable(4.0)
    
c = a * b

with tf.Session(config=config) as sess:
    init.run()
    try:
        result = c.eval()
        print(result)
    except Exeption as e:
        print(e)
    #throwing error for some reason

Device mapping:
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: GeForce GTX 1050, pci bus id: 0000:01:00.0, compute capability: 6.1



NameError: name 'Exeption' is not defined

In [12]:
# testing for integer kernel

tf.reset_default_graph()
with tf.device("/gpu:0"):
    i = tf.Variable(3)
    #i = tf.Variable(3.0) # this works

try:
    tf.Session().run(i.initializer)
except Exception as e:
    print(e)

Cannot assign a device for operation Variable: Could not satisfy explicit device specification '/device:GPU:0' because no supported kernel for GPU devices is available.
Colocation Debug Info:
Colocation group had the following types and supported devices: 
Root Member(assigned_device_name_index_=-1 requested_device_name_='/device:GPU:0' assigned_device_name_='' resource_device_name_='/device:GPU:0' supported_device_types_=[CPU] possible_devices_=[]
VariableV2: CPU 
Assign: CPU 
Identity: GPU CPU 

Colocation members, user-requested devices, and framework assigned devices, if any:
  Variable (VariableV2) /device:GPU:0
  Variable/Assign (Assign) /device:GPU:0
  Variable/read (Identity) /device:GPU:0

Op: VariableV2
Node attrs: shape=[], shared_name="", dtype=DT_INT32, container=""
Registered kernels:
  device='CPU'
  device='GPU'; dtype in [DT_HALF]
  device='GPU'; dtype in [DT_FLOAT]
  device='GPU'; dtype in [DT_DOUBLE]
  device='GPU'; dtype in [DT_INT64]

	 [[node Variable (defined at 

In [13]:
# using soft placement to put this onto a cpu

with tf.device("/gpu:0"):
    t = tf.Variable(3)

config = tf.ConfigProto()
config.allow_soft_placement = True
sess = tf.Session(config=config)
sess.run(i.initializer) # this needs to fall back to /cpu:0

# no error

In [14]:
a = tf.constant(1.0)
b = a + 2.0

with tf.control_dependencies([a,b]):
    x = tf.constant(3.0)
    y = tf.constant(4.0)

z = x + y

# obviously since z depends on x and y, 
# evaluating z also implies waiting for a ans b to be evaluated
# even though it is not explicitly in control_dependecies() block
# since b depends on a we could simplify the preceding code by just creating
# control dependency [b] instead of [a, b]

### Cluster

In [15]:
cluster_spec = tf.train.ClusterSpec({
    "ps" : [
        "127.0.0.1:2221", # /job:ps/task:0
        "127.0.0.1:2222", # /jon:ps/task:1
    ],
    "worker" : [
        "127.0.0.1:2223", # /job:worker/task:0
        "127.0.0.1:2224", # /jon:worker/task:1
        "127.0.0.1:2225"  # /job:worker/task:2
    ]
})

In [16]:
task_ps0 = tf.train.Server(cluster_spec, job_name="ps", task_index=0)
task_ps1 = tf.train.Server(cluster_spec, job_name="ps", task_index=1)
task_worker0 = tf.train.Server(cluster_spec,job_name="worker",task_index=0)
task_worker1 = tf.train.Server(cluster_spec,job_name="worker",task_index=1)
task_worker2 = tf.train.Server(cluster_spec, job_name="worker", task_index=2)

### pinning operation across devices and server

In [17]:
import numpy as np
tf.reset_default_graph()
tf.set_random_seed(42)
np.random.seed(42)

with tf.device("/job:ps"):
    a = tf.Variable(1.0,name="a")

with tf.device("/job:worker"):
    b = a + 2

with tf.device("/job:worker/task:1"):
    c = a + b

In [18]:
with tf.Session("grpc://127.0.0.1:2221") as sess:
    sess.run(a.initializer)
    print(c.eval())

4.0


In [19]:
def reset_graph():
    tf.reset_default_graph()
    tf.set_random_seed(42)
    np.random.seed(42)

reset_graph()

In [20]:
with tf.device(tf.train.replica_device_setter(
    ps_tasks=2,
    ps_device="/job:ps",
    worker_device="/job:worker")):
    v1 = tf.Variable(1.0,name="v1")
    v2 = tf.Variable(2.0,name="v2")
    v3 = tf.Variable(3.0,name="v3")
    s = v1 +v2
    with tf.device("/task:1"):
        p1 = 2 * s
        with tf.device("/cpu:0"):
            p2 = 3 * s

config = tf.ConfigProto()
config.log_device_placement = True

with tf.Session("grpc://127.0.0.1:2221",config=config) as sess:
    v1.initializer.run()

### The old way Readers forqueue

In [21]:
reset_graph()

default1 = tf.constant([5.])
default2 = tf.constant([6])
default3 = tf.constant([7])

dec = tf.decode_csv(tf.constant("1.,,44"),
                    record_defaults=[default1, default2, default3])

with tf.Session() as sess:
    print(sess.run(dec))

[1.0, 6, 44]


In [22]:
reset_graph()

test_csv = open("my_test.csv","w")
test_csv.write("x1, x2, target\n")
test_csv.write("1.,, 0\n")
test_csv.write("4.,5.,1\n")
test_csv.write("7.,8.,0\n")
test_csv.close()

In [29]:
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
    name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    try:
        while True:
            sess.run(enqueue_instance)
    except tf.errors.OutOfRangeError as ex:
        print("No more files to read")
    sess.run(close_instance_queue)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print("No more training instances")

No more files to read
[array([[ 4.,  5.],
       [ 1., -1.]], dtype=float32), array([1, 0])]
[array([[7., 8.]], dtype=float32), array([0])]
No more training instances


### Queue runners and coordinators

In [31]:
reset_graph()
reset_graph()

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
    name="instance_q", shared_name="shared_instance_q")

enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

n_threads = 5
queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)
coord = tf.train.Coordinator()

with tf.Session() as sess:
    sess.run(enqueue_filename,feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print("no more training instances")

W0606 07:34:16.354704  3424 deprecation.py:323] From <ipython-input-31-a5064012dc11>:24: QueueRunner.__init__ (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version.
Instructions for updating:
To construct input pipelines, use the `tf.data` module.


[array([[4., 5.],
       [7., 8.]], dtype=float32), array([1, 0])]
[array([[ 1., -1.]], dtype=float32), array([0])]
no more training instances


In [32]:
reset_graph()

def read_and_push_instance(filename_queue, instance_queue):
    reader = tf.TextLineReader(skip_header_lines=1)
    key, value=reader.read(filename_queue)
    x1, x2 , target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
    features = tf.stack([x1, x2])
    enqueue_instance = instance_queue.enqueue([features, target])
    return enqueue_instance

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes = [tf.float32, tf.int32], shapes=[[2], []],
    name = "instance_q", shared_name="shared_instance_q"
)

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)


read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]
queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    coord= tf.train.Coordinator()
    enqueue_threads = queue_runner.create_threads(sess, coord=coord,start=True)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print("No more training instances")

[array([[ 4.,  5.],
       [ 1., -1.]], dtype=float32), array([1, 0])]
[array([[7., 8.]], dtype=float32), array([0])]
No more training instances


### Setting a timeout

In [36]:
# A good example to understand enqueue and dequeue

reset_graph()

q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])
v = tf.placeholder(tf.float32)
enqueue = q.enqueue([v])
dequeue = q.dequeue()
output = dequeue + 1

config = tf.ConfigProto()
config.operation_timeout_in_ms = 1000

with tf.Session(config=config) as sess:
    sess.run(enqueue, feed_dict={v:1.0})
    sess.run(enqueue, feed_dict={v:2.0})
    sess.run(enqueue, feed_dict={v:3.0})
    print(sess.run(output))
    print(sess.run(output, feed_dict={dequeue: 5}))
    print(sess.run(output))
    print(sess.run(output))
    try:
        print(sess.run(output))
    except tf.errors.DeadlineExceededError as ex:
        print("Timed out while dequeueing")

2.0
6.0
3.0
4.0
Timed out while dequeueing


### Data API

In [38]:
# fobr efficient data readeing

reset_graph()

In [41]:
dataset  = tf.data.Dataset.from_tensor_slices(np.arange(10))
dataset = dataset.repeat(3).batch(7)

# the second line creates a new dataset based on the first one repeating its elements three times creating a batch of seven elements

In [42]:
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

W0606 07:59:21.767431  3424 deprecation.py:323] From <ipython-input-42-bc454eaa355d>:1: DatasetV1.make_one_shot_iterator (from tensorflow.python.data.ops.dataset_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `for ... in dataset:` to iterate over a dataset. If using `tf.estimator`, return the `Dataset` object directly from your input function. As a last resort, you can use `tf.compat.v1.data.make_one_shot_iterator(dataset)`.


In [44]:
# repeatedly evaluating the next_element

with tf.Session() as sess:
    try:
        while True:
            print(next_element.eval())
    except tf.errors.OutOfRangeError:
        print("Done")

[0 1 2 3 4 5 6]
[7 8 9 0 1 2 3]
[4 5 6 7 8 9 0]
[1 2 3 4 5 6 7]
[8 9]
Done


In [45]:
# usisgn session run the graph is evaluated only once no matter how many times we run the graph

with tf.Session() as sess:
    try:
        while True:
            print(sess.run([next_element, next_element]))
    except tf.errors.OutOfRangeError:
        print("Done")
# same array is printed twice even when we call nextelement two times

[array([0, 1, 2, 3, 4, 5, 6]), array([0, 1, 2, 3, 4, 5, 6])]
[array([7, 8, 9, 0, 1, 2, 3]), array([7, 8, 9, 0, 1, 2, 3])]
[array([4, 5, 6, 7, 8, 9, 0]), array([4, 5, 6, 7, 8, 9, 0])]
[array([1, 2, 3, 4, 5, 6, 7]), array([1, 2, 3, 4, 5, 6, 7])]
[array([8, 9]), array([8, 9])]
Done


In [46]:
# interleave method demonstation

reset_graph()

In [49]:
dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))
dataset = dataset.repeat(3).batch(7)
dataset = dataset.interleave(
    lambda v: tf.data.Dataset.from_tensor_slices(v),
    cycle_length=3,
    block_length=2
)

iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

In [50]:
with tf.Session() as sess:
    try:
        while True:
            print(next_element.eval(), end=",")
    except tf.errors.OutOfRangeError:
        print("Done")
        
'''
Because cycle_length=3, the new dataset starts by pulling 3 elements from the previous dataset: 
that's [0,1,2,3,4,5,6], [7,8,9,0,1,2,3] and [4,5,6,7,8,9,0]. 
Then it calls the lambda function we gave it to create one dataset for each of the elements. Since we use Dataset.from_tensor_slices(), each dataset is going to return its elements one by one. Next, it pulls two items (since block_length=2) from each of these three datasets, and it iterates until all three datasets are out of items: 0,1 (from 1st), 7,8 (from 2nd), 4,5 (from 3rd), 2,3 (from 1st), 9,0 (from 2nd), and so on until 8,9 (from 3rd), 6 (from 1st), 3 (from 2nd), 0 (from 3rd). Next it tries to pull the next 3 elements from the original dataset, but there are just two left: [1,2,3,4,5,6,7] and [8,9]. Again, it creates datasets from these elements, and it pulls two items from each until both datasets are out of items: 1,2 (from 1st), 8,9 (from 2nd), 3,4 (from 1st), 5,6 (from 1st), 7 (from 1st). Notice that there's no interleaving at the end since the arrays do not have the same length.
'''

0,1,7,8,4,5,2,3,9,0,6,7,4,5,1,2,8,9,6,3,0,1,2,8,9,3,4,5,6,7,Done
