# Distribuindo o TensorFlow por dispositivos e servidores

### Bibliotecas básicas

In [1]:
import numpy as np
import pandas as pd
from functools import partial
import matplotlib.pyplot as plt
%matplotlib inline

import warnings
warnings.filterwarnings('ignore')

import tensorflow as tf
print(tf.__version__)

2.4.0


In [3]:
tf.compat.v1.disable_eager_execution()

### Servidor Local

In [None]:
tf.compat.v1.reset_default_graph()

c = tf.constant("Hello distributed TensorFlow!")
server = tf.compat.v1.train.Server.create_local_server()

with tf.compat.v1.Session(server.target) as sess:
    print(sess.run(c))

b'Hello distributed TensorFlow!'


#### Posicionamento Simples

In [None]:
with tf.compat.v1.device('/cpu:0'):
  a = tf.Variable(3.0)
  b = tf.constant(4.0)
c = a * b

config = tf.compat.v1.ConfigProto()
config.log_device_placement = True

sess = tf.compat.v1.Session(config=config)
a.initializer.run(session=sess)
sess.run(c)

Device mapping:
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0



12.0

#### Posicionamento dinâmico

In [None]:
def variables_on_cpu(op):
  if op.type == 'Variable':
    return '/cpu:0'
  else:
    return '/gpu:0'

with tf.compat.v1.device(variables_on_cpu):
  a = tf.Variable(3.0)
  b = tf.constant(4.0)
c = a * b

config = tf.compat.v1.ConfigProto()
config.log_device_placement = True

sess = tf.compat.v1.Session(config=config)
a.initializer.run(session=sess)
sess.run(c)

Device mapping:
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0



12.0

#### Posicionamento suave

In [None]:
with tf.compat.v1.device('/gpu:0'):
  a = tf.Variable(3)

config = tf.compat.v1.ConfigProto()
config.log_device_placement = True
config.allow_soft_placement = True

sess = tf.compat.v1.Session(config=config)
sess.run(a.initializer)

Device mapping:
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0



#### Dependência de controle

In [None]:
a = tf.constant(1.0)
b = a + 2.0

with tf.compat.v1.control_dependencies([a, b]):
  x = tf.constant(3.0)
  y = tf.constant(4.0)
z = x + y

### Vários dispositivos em vários servidores

In [None]:
cluster_spec = tf.compat.v1.train.ClusterSpec({
    'ps': [
        '127.0.0.1:2221',
        '127.0.0.1:2222'
    ],
    'worker': [
        '127.0.0.1:2223',
        '127.0.0.1:2224',
        '127.0.0.1:2225'
    ]})


task_ps0 = tf.compat.v1.train.Server(cluster_spec, job_name='ps', task_index=0)
task_ps1 = tf.compat.v1.train.Server(cluster_spec, job_name='ps', task_index=1)

task_worker0 = tf.compat.v1.train.Server(cluster_spec, job_name='worker', task_index=0)
task_worker1 = tf.compat.v1.train.Server(cluster_spec, job_name='worker', task_index=1)
task_worker2 = tf.compat.v1.train.Server(cluster_spec, job_name='worker', task_index=2)

#### Fixar operações em dispositivos e servidores

In [None]:
tf.compat.v1.reset_default_graph()

with tf.compat.v1.device('/job:ps'):
    a = tf.Variable(1.0, name='a')

with tf.compat.v1.device('/job:worker'):
    b = a + 2

with tf.compat.v1.device('/job:worker/task:1'):
    c = a + b

with tf.compat.v1.Session('grpc://127.0.0.1:2221') as sess:
    sess.run(a.initializer)
    print(c.eval())

4.0


#### Particionando as variáveis em múltiplos servidores

In [None]:
tf.compat.v1.reset_default_graph()

with tf.compat.v1.device(tf.compat.v1.train.replica_device_setter(ps_tasks=2, ps_device='/job:ps', worker_device='/job:worker')):
    v1 = tf.Variable(1.0, name='v1')
    v2 = tf.Variable(2.0, name='v2')
    v3 = tf.Variable(3.0, name='v3')
    s = v1 + v2
    with tf.compat.v1.device('/task:1'):
        p1 = 2 * s
        with tf.compat.v1.device('/cpu:0'):
            p2 = 3 * s

config = tf.compat.v1.ConfigProto()
config.log_device_placement = True

with tf.compat.v1.Session('grpc://127.0.0.1:2221', config=config) as sess:
    v1.initializer.run()

### Carregando dados diretamento do Grafo

In [None]:
tf.compat.v1.reset_default_graph()

test_csv = open('my_test.csv', 'w')
test_csv.write('x1, x2 , target\n')
test_csv.write('1., , 0\n')
test_csv.write('4., 5., 1\n')
test_csv.write('7., 8., 0\n')
test_csv.close()

filename_queue = tf.compat.v1.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.compat.v1.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.compat.v1.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)

x1, x2, target = tf.compat.v1.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.compat.v1.stack([x1, x2])

instance_queue = tf.compat.v1.RandomShuffleQueue(capacity=10, min_after_dequeue=2, dtypes=[tf.float32, tf.int32], shapes=[[2],[]], 
                                                 name='instance_q', shared_name='shared_instance_q')
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

with tf.compat.v1.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: 'my_test.csv'})
    sess.run(close_filename_queue)
    try:
        while True:
            sess.run(enqueue_instance)
    except tf.compat.v1.errors.OutOfRangeError as ex:
      print('No more files to read')
    sess.run(close_instance_queue)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.compat.v1.errors.OutOfRangeError as ex:
        print('No more training instances')

### Queue runners e coordinators

In [None]:
tf.compat.v1.reset_default_graph()

filename_queue = tf.compat.v1.FIFOQueue(capacity=10, dtypes=[compat.v1.string], shapes=[()])
filename = tf.compat.v1.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.compat.v1.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)

x1, x2, target = tf.comapt.v1.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.compat.v1.stack([x1, x2])

instance_queue = tf.compat.v1.RandomShuffleQueue( capacity=10, min_after_dequeue=2, dtypes=[tf.float32, tf.int32], shapes=[[2],[]], 
                                       name='instance_q', shared_name='shared_instance_q')
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

n_threads = 5
queue_runner = tf.compat.v1.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)
coord = tf.compat.v1.train.Coordinator()

with tf.compat.v1.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: 'my_test.csv'})
    sess.run(close_filename_queue)
    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print('No more training instances')

#### Com leitores multithreads

In [None]:
tf.compat.v1.reset_default_graph()

def read_and_push_instance(filename_queue, instance_queue):
    reader = tf.compat.v1.TextLineReader(skip_header_lines=1)
    key, value = reader.read(filename_queue)
    x1, x2, target = tf.compat.v1.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
    features = tf.compat.v1.stack([x1, x2])
    enqueue_instance = instance_queue.enqueue([features, target])
    return enqueue_instance

filename_queue = tf.compat.v1.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.compat.v1.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

instance_queue = tf.compat.v1.RandomShuffleQueue(capacity=10, min_after_dequeue=2, dtypes=[tf.float32, tf.int32], shapes=[[2],[]], 
                                       name='instance_q', shared_name='shared_instance_q')

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]
queue_runner = tf.compat.v1.train.QueueRunner(instance_queue, read_and_enqueue_ops)

with tf.compat.v1.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: 'my_test.csv'})
    sess.run(close_filename_queue)
    coord = tf.compat.v1.train.Coordinator()
    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print('No more training instances')

### Definindo um tempo limite

In [4]:
tf.compat.v1.reset_default_graph()

q = tf.compat.v1.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])
v = tf.compat.v1.placeholder(tf.float32)
enqueue = q.enqueue([v])
dequeue = q.dequeue()
output = dequeue + 1

config = tf.compat.v1.ConfigProto()
config.operation_timeout_in_ms = 1000

with tf.compat.v1.Session(config=config) as sess:
    sess.run(enqueue, feed_dict={v: 1.0})
    sess.run(enqueue, feed_dict={v: 2.0})
    sess.run(enqueue, feed_dict={v: 3.0})
    print(sess.run(output))
    print(sess.run(output, feed_dict={dequeue: 5}))
    print(sess.run(output))
    print(sess.run(output))
    try:
        print(sess.run(output))
    except tf.errors.DeadlineExceededError as ex:
        print('Timed out while dequeuing')

2.0
6.0
3.0
4.0
Timed out while dequeuing
