# Chapter 12: Distributing TensorFlow Across Devices and Servers

Since training a large DNN for a complex task on a single CPU can take days or even weeks, this chapter discusses distributing TensorFlow across multiple devices on the same machine then multiple devices across multiple machines. TensorFlow has built in support for distributed computing, making it an ideal machine learning framework for this task.

## Multiple Devices on a Single Machine

You can speed up training a neural network by adding multiple GPUs to your machine. In some cases, it is faster to train a neural network with 8 GPUs on a single machine than 16 GPUs on multiple machines, since network communications can slow down training.

### Installation

Below is code for installing Nvidia's _Compute Unified Device Architecture_ library (CUDA) in Google Colab. TensorFlow uses CUDA for using the GPU for training DNNs.

In [0]:
!wget https://developer.nvidia.com/compute/cuda/9.2/Prod/local_installers/cuda-repo-ubuntu1604-9-2-local_9.2.88-1_amd64 -O cuda-repo-ubuntu1604-9-2-local_9.2.88-1_amd64.deb
!dpkg -i cuda-repo-ubuntu1604-9-2-local_9.2.88-1_amd64.deb
!apt-key add /var/cuda-repo-9-2-local/7fa2af80.pub
!apt-get update
!apt-get install cuda

In [2]:
!nvcc --version

nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2019 NVIDIA Corporation
Built on Wed_Apr_24_19:10:27_PDT_2019
Cuda compilation tools, release 10.1, V10.1.168


The following code installs the GPU-enabled version of TensorFlow.

In [0]:
!pip3 install --upgrade tensorflow-gpu

### Managin the GPU RAM

By default, TensorFlow grabs all the available RAM on GPUs the first time you run a graph. One option is to run each process on different GPU cards. Below is code for doing so:

```bash
CUDA_VISIBLE_DEVICES=0,1 python3 program1.py
CUDA_VISIBLE_DEVICES=2,3 python3 program2.py
```

Another option is to tell TensorFlow to only use a fraction of the available memory. Code for doing so is below:

In [0]:
# Example code telling TensorFlow to grab only 40% of each GPU's memory
# so that multiple TensorFlow programs can run.

import tensorflow as tf

config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.4
session = tf.Session(config=config)
session.close()

In [0]:
# Alternatively you can have TensorFlow only grab memory when it needs to.

config = tf.ConfigProto()
config.gpu_options.allow_growth = True
session = tf.Session(config=config)
session.close()

### Placing Operations on Devices

The [TensorFlow whitepaper](http://download.tensorflow.org/paper/whitepaper2015.pdf) presents a _dynamic placer_ algorithm that automatically distributes operations across all devices. This algorithm is internal to Google and is not released in the open source version of TensorFlow. This is due to the fact that in practice, a small set of placement rules specified by the user can perform just as well or better than dynamic placement.

Until the dynamic placer is made public, the open source version of TensorFlow relies on the _simple placer_.

#### Simple Placer

Whenever you run a graph, if a node has not yet been placed, the simple placer will allocate the operation to a device using the following rules:

- If a node has already been placed in a previous run of the graph, it is left on that device.

- If the user _pinned_ a node to a device (described below) then the placer places it on that device.

- Otherwise, it defaults to GPU #0 or the the CPU if there's no GPU.

Below is an example of using TensorFlow to _pin_ a node to a device, in this case the code pins the variable `a` and the constant `b` on the CPU.

In [9]:
with tf.device('/cpu:0'):
  a = tf.Variable(3.0, name='a')
  b = tf.constant(4.0, name='b')
c = a * b

Instructions for updating:
Colocations handled automatically by placer.


#### Logging Placements

Below is code for logging which device each node is pinned to. The code in the book does not work due to [this TenorFlow issue](https://github.com/tensorflow/tensorflow/issues/3047). Below is an example workaround from 

In [7]:
!pip install wurlitzer

Collecting wurlitzer
  Downloading https://files.pythonhosted.org/packages/60/17/de2820542c755f4630a58d295daad86bfa981fbf48b48e5f9e1f2ed806cc/wurlitzer-1.0.2-py2.py3-none-any.whl
Installing collected packages: wurlitzer
Successfully installed wurlitzer-1.0.2


In [10]:
from wurlitzer import pipes

tf.logging.set_verbosity(tf.logging.INFO)
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
with pipes() as (out, err):
  print(sess.run(a.initializer))

print (out.read())

None
a: (VariableV2): /job:localhost/replica:0/task:0/device:CPU:0
a/Assign: (Assign): /job:localhost/replica:0/task:0/device:CPU:0
a/read: (Identity): /job:localhost/replica:0/task:0/device:CPU:0
mul: (Mul): /job:localhost/replica:0/task:0/device:GPU:0
a/initial_value: (Const): /job:localhost/replica:0/task:0/device:CPU:0
b: (Const): /job:localhost/replica:0/task:0/device:CPU:0



In [11]:
sess.run(c)

12.0

In [0]:
sess.close()

#### Dynamic Placement Function

When you create a device block, you can also define a function which pins the nodes to devices. You can use this to implement more complex pinning algorithms such as pinning across GPUs in a round-robin fashion.

In [0]:
def variables_on_cpu(op):
  if op.type == 'Variable':
    return '/cpu:0'
  return '/gpu:0'

tf.reset_default_graph()

with tf.device(variables_on_cpu):
  a = tf.Variable(3.0)
  b = tf.constant(4.0)
  c = a * b
  
sess = tf.Session()
sess.run(a.initializer)
sess.run(c)

12.0

In [0]:
sess.close()

#### Operations and Kernels

For a TensorFlow variable to run on a device, it needs to have an implementation, or a _kernel_, for that device. Many operations have kernels for GPUs and CPUs. Integer variables, however, do not have a kernel for the GPU. The following code illustrates this:

In [0]:
tf.reset_default_graph()

with tf.device('/gpu:0'):
  i = tf.Variable(3)

try:
  sess = tf.Session()
  sess.run(i.initializer)
except Exception as ex:
  print(type(ex).__name__)

InvalidArgumentError


In [0]:
sess.close()

#### Soft placement

In order to prevent the exception being raised above, you can have TensorFlow fall back on the CPU instead.

In [0]:
with tf.device('/gpu:0'):
  i = tf.Variable(3)
  
config = tf.ConfigProto()
config.allow_soft_placement = True
sess = tf.Session(config=config)
sess.run(i.initializer)

In [0]:
sess.close()

### Parallel Execution

When TensorFlow evaluates a graph, it first evaluates all of the nodes with no dependencies, i.e. the source nodes. Once it evaluates a node which another depends on, the latter node's dependency counter decreases. Once it reaches zero, that node is evaluated. Once all of the nodes TensorFlow needs to evaluate are done, it outputs the result.

For nodes evaluated on the CPU, the evaluations are dispatched into a queue in a thread pool called the _inter-op thread pool_. If the CPU has multiple cores, then the operations are executed in parallel. If the operations themselves have multithreaded kernels, then these kernels split their task into sub-operations which are placed in a queue in another thread pool called the _intra-op thread pool_.

On the GPU, operations in the queue are evaluated sequentially. Operations which have multithreaded kernels are executed in parallel implemented by CUDA, cuDNN, and other GPU libraries that TensorFlow depends on.

### Control Dependencies

Sometimes, we do not want to evaluate nodes right when their dependency counter reaches zero. These nodes may take up a lot of compute resources to evaluate, and we may not need their values later. Or alternatively, some nodes rely on a lot of data not localized in the machine, so it may more make sense to evaluate them sequentially instead of in parallel.

Below is an example of adding _control dependencies_ in a TensorFlow graph, i.e. nodes which need to wait on the evaluation of other nodes even if they do not directly depend on them.

In [0]:
tf.reset_default_graph()

a = tf.constant(1.0)
b = a + 2.0

with tf.control_dependencies([a, b]):
  x = tf.constant(3.0)
  y = tf.constant(4.0)
  
z = x + y

Here, the evaluation of `z` depends on the evaluation of `a` and `b` even though `z`'s value does not depend on `a` or `b`. Since `b` depends on `a`, you need only list `b` as a control dependency, but sometimes it is better to be explicit.

## Distributing Devices Across Multiple Servers

In order to run a graph across multiple devices, you need to define a _cluster_ i.e. a group of TensorFlow servers called _tasks_ spread across several machines. Each task belongs to a _job_ i.e. a group of tasks which perform a common role.

The following code defines a _cluster specification_ which defines two jobs: `ps` and `worker`, the former is a _parameter server_ which records the model parameters whereas workers perform computations.

In [0]:
cluster_spec = tf.train.ClusterSpec({
    'ps': [
        '127.0.0.1:2221',
        '127.0.0.1:2222',
    ],
    'worker': [
        '127.0.0.1:2223',
        '127.0.0.1:2224',
        '127.0.0.1:2225',
    ],
})

The following code instantiates a TensorFlow `Server` object by passing it a cluster spec and then parameters to indicate its job and task number.

In [0]:
ps0 = tf.train.Server(cluster_spec, job_name='ps', task_index=0)
ps1 = tf.train.Server(cluster_spec, job_name='ps', task_index=1)
worker0 = tf.train.Server(cluster_spec, job_name='worker', task_index=0)
worker1 = tf.train.Server(cluster_spec, job_name='worker', task_index=1)
worker2 = tf.train.Server(cluster_spec, job_name='worker', task_index=2)

Typically you run one task per machine, but you can run multiple tasks per machine as long as you ensure that they don't all try to use all of the RAM on each GPU.

If you want the process to do nothing other than run the TensorFlow server, you can block the main thread by using the `join()` method:

```python

```

In [0]:
# This will block the main thread until the server finishes.

server.join()

### Opening a Session

Once all of the tasks are up and running, you can open a session on any of the servers from a client on any machine using the following code:

In [16]:
a = tf.constant(1.0)
b = a + 2
c = b * 3

with tf.Session('grpc://127.0.0.1:2223') as sess:
  print(c.eval())

9.0


The code creates a simple graph then opens a session on machine B and evaluates `c`. The master places operations on the appropriate device, if we do not pin the operation to a particular device then the master will place it on the machine's default device.

### The Master and Worker Services

The client uses _gRPC_ to communicate with the server. A protocol which uses HTTP2 to open a lasting connection for bidirectional communication. It exchanges data using _protocol buffers_.

Every TensorFlow server provides two services: a _master service_ and a _worker service_. The master allows clients to open sessions and run graphs whereas the worker service actually performs computations. This architecture allows a server to open multiple sessions from one or more clients.

### Pinning Operations Across Tasks

Below is an example of using a device block to pin an operation to a particular task and to a particular device on that task.

In [17]:
tf.reset_default_graph()

# This block pins `a` to task 0 of the `ps` job's CPU.
with tf.device('/job:ps/task:0/cpu:0'):
  a = tf.constant(1.0)
  
# This block pins `b` to task 1 of the `worker` job's GPU.
with tf.device('/job:worker/task:1/gpu:0'):
  b = a + 2

c = a + b

with tf.Session('grpc://127.0.0.1:2225') as sess:
  print(c.eval())

4.0


### Sharding Variables Across Multiple Parameter Servers

It is common to have a _parameter server_ job to store parameters while training a complex model. Some models, like DNNs, can have thousands or even millions of parameters. To avoid network saturation, it is common to distribute storing parameters across multiple servers.

Since manually pinning every variable to a different task can be tedious, TensorFlow provides a `replica_device_setter()` which distributes variables across servers. Below is an example:

In [0]:
tf.reset_default_graph()

with tf.device(tf.train.replica_device_setter(ps_tasks=2)):
  v1 = tf.Variable(1.0) # pinned to /job:ps/task:0
  v2 = tf.Variable(2.0) # pinned to /job:ps/task:1
  v3 = tf.Variable(3.0) # pinned to /job:ps/task:0
  v4 = tf.Variable(4.0) # pinned to /job:ps/task:1
  v5 = tf.Variable(5.0) # pinned to /job:ps/task:0

Alternatively you can pass the cluster spec and TensorFlow will automatically compute the number of tasks in the `ps` job.

If you create operations that are not just variables, then by default they are pinned to `/job:worker` which will default to the first device of the first worker task. You can pin them to devices using device blocks. Below is an example of a graph pinned to multiple tasks and multiple devices:

In [19]:
tf.reset_default_graph()

with tf.device(tf.train.replica_device_setter(ps_tasks=2, ps_device='/job:ps',
                                              worker_device='/job:worker')):
  v1 = tf.Variable(1.0) # pinned to /job:ps/task:0
  v2 = tf.Variable(2.0) # pinned to /job:ps/task:1
  v3 = tf.Variable(3.0) # pinned to /job:ps/task:0
  
  s = v1 + v2 # pinned to /job:worker/task:0/cpu:0
  
  with tf.device('/gpu:0'):
    p1 = 2 * s # pinned to /job:worker/task:0/gpu:0
    
    with tf.device('/task:1'):
      p2 = 3 * s # pinned to /job:worker/task:1/cpu:0
      
with tf.Session('grpc://127.0.0.1:2221') as sess:
  v1.initializer.run()
  v2.initializer.run()
  print(s.eval())
  print(p1.eval())
  print(p2.eval())

3.0
6.0
9.0


### Sharing State Across Sessions Using Resource Containers

When using a plain _local session_, variables values are stored in the session object, so when the session ends the values are deleted. Moreover multiple local sessions cannot share any state, even if they run the same graph.

When you are using _distributed sessions_, variable state is managed by _resource containers_ located on the cluster and persist across sessions. An example of this is given by the code below:

In [0]:
tf.reset_default_graph()

x = tf.Variable(0.0, name='x')
increment_x = tf.assign(x, x + 1)

In [21]:
with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(x.initializer)
  sess.run(increment_x)
  print(x.eval())

1.0


In [22]:
with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(increment_x)
  print(x.eval())

2.0


While this feature can be convenient, you have to be careful to not use the same variable names by accident. One way to avoid this is by using a variable scope with a unique name for each computation.

In [0]:
tf.reset_default_graph()

with tf.variable_scope('problem_1'):
  x1 = tf.Variable(0.0, name='x')
  increment_x1 = tf.assign(x1, x1 + 1)
  
with tf.variable_scope('problem_2'):
  x2 = tf.Variable(0.0, name='x')
  increment_x2 = tf.assign(x2, x2 + 1)

In [24]:
with tf.Session('grpc://127.0.0.1:2222') as sess:
  x1.initializer.run()
  print(increment_x1.eval())
  
with tf.Session('grpc://127.0.0.1:2222') as sess:
  x2.initializer.run()
  print(increment_x2.eval())

1.0
1.0


You can even use resource containers to store variables across different graphs. In order to reset a resource container, run the following code:

In [0]:
tf.Session.reset('grpc://127.0.0.1:2222', ['problem_1'])

### Asynchronouse Communication Using TensorFlow Queues

Queues are a way to share data across multiple sessions. One common use case is for passing mini-batches of data between sessions for training. One graph may load the client data and push it to the queue where another pulls the data and trains a neural network.

TensorFlow provides various kinds of queues, the most simple is the _first-in first-out_ (FIFO) queue. Below is an example of a FIFO queue with TensorFlow that can hold up to 10 tensors containing 2 floats:



In [0]:
q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[[2]], name='q',
                 shared_name='shared_q')

TensorFlow uses the `shared_name` parameter to refer to queues across sessions.

#### Enqueuing data

To push data into the queue, you need to use the `enqueue` method. The following code pushes data to the queue:

In [0]:
training_instance = tf.placeholder(tf.float32, shape=[2])
enqueue = q.enqueue([training_instance])

with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(enqueue, feed_dict={training_instance: [1.0, 2.0]})
  sess.run(enqueue, feed_dict={training_instance: [3.0, 4.0]})
  sess.run(enqueue, feed_dict={training_instance: [5.0, 6.0]})

You can enqueue multiple tensors at once using the `enqueue_many` method below:

In [0]:
training_instances = tf.placeholder(tf.float32, shape=(None, 2))
enqueue_many = q.enqueue_many([training_instances])

with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(enqueue_many, feed_dict={
      training_instances: [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],
  })

#### Dequeuing data

You use the `dequeue` method to pull tensors from the queue:

In [31]:
dequeue = q.dequeue()

with tf.Session('grpc://127.0.0.1:2222') as sess:
  print(sess.run(dequeue))
  print(sess.run(dequeue))
  print(sess.run(dequeue))

[1. 2.]
[3. 4.]
[5. 6.]


In order to dequeue multiple items at once, you should use the `dequeue_many` method and specify how many items to dequeue each time. If you call it when there are not enough items in the queue, it will block execution until the queue has the specified amount.

In [32]:
batch_size = 2
dequeue_mini_batch = q.dequeue_many(batch_size)

with tf.Session('grpc://127.0.0.1:2222') as sess:
  print(sess.run(dequeue_mini_batch))

[[1. 2.]
 [3. 4.]]


In [0]:
tf.Session.reset('grpc://127.0.0.1:2222')

#### Queues of tuples

Queues can also hold tuples of tensors of different shapes instead of a single tensor. The following queue stores two tensors: a scalar `int32` tensor and a `float32` tensor with shape `[3,2]`:

In [0]:
q = tf.FIFOQueue(capacity=10, dtypes=[tf.int32, tf.float32], shapes=[[], [3,2]],
                 name='q', shared_name='shared_q')

a = tf.placeholder(tf.int32, shape=())
b = tf.placeholder(tf.float32, shape=(3, 2))
enqueue = q.enqueue((a, b))

with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(enqueue, feed_dict={a: 10, b: [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]})
  sess.run(enqueue, feed_dict={a: 11, b: [[2.0, 4.0], [6.0, 8.0], [0.0, 2.0]]})
  sess.run(enqueue, feed_dict={a: 12, b: [[3.0, 6.0], [9.0, 2.0], [5.0, 8.0]]})

The `dequeue` method now creates a tuple of operations:

In [36]:
dequeue_a, dequeue_b = q.dequeue()

with tf.Session('grpc://127.0.0.1:2222') as sess:
  a, b = sess.run([dequeue_a, dequeue_b])
  print(a)
  print(b)

10
[[1. 2.]
 [3. 4.]
 [5. 6.]]


`dequeue_many` also returns atuple of operations.

In [37]:
batch_size = 2
dequeue_as, dequeue_bs = q.dequeue_many(batch_size)

with tf.Session('grpc://127.0.0.1:2222') as sess:
  a, b = sess.run([dequeue_as, dequeue_bs])
  print(a)
  print(b)

[11 12]
[[[2. 4.]
  [6. 8.]
  [0. 2.]]

 [[3. 6.]
  [9. 2.]
  [5. 8.]]]


#### Closing a queue

It is possible to close a queue so that the other sessions can no longer enqueue data.

In [0]:
close_q = q.close()

with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(close_q)

In [0]:
tf.Session.reset('grpc://127.0.0.1:2222')

Attempting to push to a closed queue will raise an exception, but any pending enqueues will be honored unless you call `q.close(cancel_pending_enqueues=True)`. Subsequent dequeue operations will still work as long as there are enough itmes in the queue as long as there are at least as many items left in the queue, otherwise the operation will fail.

You can use the `dequeue_up_to` method instead which will empty the queue if there are less than `batch_size` items left.

#### RandomShuffleQueue

Another type of queue that TensorFlow supports is the `RandomShuffleQueue` which returns items in the queue in random order. Below is an example:

In [0]:
q = tf.RandomShuffleQueue(capacity=50, min_after_dequeue=10,
                          dtypes=[tf.float32], shapes=[()], name='q',
                          shared_name='shared_q')

In [0]:
training_instances = tf.placeholder(tf.float32, shape=(None))
enqueue = q.enqueue_many([training_instances])

with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(enqueue, feed_dict={
      training_instances: [float(i) for i in range(25)]
  })

The `min_after_dequeue` specifies the minimum number of items that should be left in the queue after a dequeue operation to ensure the random behavior of queue.

In [42]:
batch_size = 5
dequeue = q.dequeue_many(batch_size)

with tf.Session('grpc://127.0.0.1:2222') as sess:
  print(sess.run(dequeue))
  print(sess.run(dequeue))
  print(sess.run(dequeue))

[ 9. 12.  7. 24.  3.]
[18. 19. 14.  8. 11.]
[22.  2. 17. 10.  5.]


In [0]:
tf.Session.reset('grpc://127.0.0.1:2222')

#### PaddingFIFOQueue

A `PaddingFIFOQueue` is a queue which holds tensors of any dimension as long as they are the same rank. When you dequeue them individually the tensors come back as they were enqueued. When you use `dequeue_many` or `dequeue_up_to`, each tensor is padded with zeros so that they are each the same size as the largest tensor in the mini-batch.

In [0]:
q = tf.PaddingFIFOQueue(capacity=50, dtypes=[tf.float32],
                        shapes=[(None, None)], name='q',
                        shared_name='shared_q')
v = tf.placeholder(tf.float32, shape=(None, None))
enqueue = q.enqueue([v])

with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(enqueue, feed_dict={v: [[1., 2.], [3., 4.], [5., 6.]]})
  sess.run(enqueue, feed_dict={v: [[1.]]})
  sess.run(enqueue, feed_dict={v: [[7., 8., 9., 5.], [6., 7., 8., 9.]]})

In [46]:
batch_size = 3
dequeue = q.dequeue_many(batch_size)
with tf.Session('grpc://127.0.0.1:2222') as sess:
  print(sess.run(dequeue))

[[[1. 2. 0. 0.]
  [3. 4. 0. 0.]
  [5. 6. 0. 0.]]

 [[1. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[7. 8. 9. 5.]
  [6. 7. 8. 9.]
  [0. 0. 0. 0.]]]


In [0]:
tf.Session.reset('grpc://127.0.0.1:2222')

This type of queue is useful for variable length inputs such as sequences of words.

### Loading Data Directly from the Graph

So far, we have only fed training data to the TensorFlow clusters using placeholders, which involves three steps:

1. Load the data from the filesystem to the client task.

2. Send the data from the client to the master task.

3. Send the data from the master task to other tasks which need the data for computation.

This process can be very inefficient for large datasets or when the training graph is distributed across many tasks.

#### Preload the data into a variable

If the dataset fits into memory, one option is to load the training data into a variable and use that variable in your graph. This is called _preloading_ the training set. This way the data only needs to be upload the data to the cluster once, though it may need to be transferred across tasks.

In [0]:
tf.reset_default_graph()

data = \
  [[0., 1., 2., 3., 4.],
   [5., 6., 7., 8., 9.],
   [10., 11., 12., 13., 14.],
   [15., 16., 17., 18., 19.]]

training_set_init = tf.placeholder(tf.float32, shape=(4, 5))
training_set = tf.Variable(training_set_init, trainable=False, collections=[],
                           name='training_set')

with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(training_set.initializer, feed_dict={training_set_init: data})

In [0]:
tf.Session.reset('grpc://127.0.0.1:2222')

Setting `trainable=False` so optimizers don't change the value and `collections=[]` prevents a `Saver` from storing the value of the variable in memory.


#### Reading the training data directly from the graph

Below is an example of using TensorFlow to read data from a CSV:

In [0]:
# Writing the CSV file.

csv_data = \
  'x1, x2, target\n' \
  '1., 2., 0\n' \
  '4., 5., 1\n' \
  '7., , 0'
csv_filename = 'my_test.csv'

with open(csv_filename, 'w') as f:
  f.write(csv_data)
  f.close()

In [6]:
# Creating a TextLineReader object, a stateful object which reads data
# from a file.

tf.reset_default_graph()

reader = tf.TextLineReader(skip_header_lines=1)

Instructions for updating:
Queue-based input pipelines have been replaced by `tf.data`. Use `tf.data.TextLineDataset`.


In [0]:
# Creating a queue to keep track of which files we want to read from.
# Including a placeholder for the filename, an enqueue operation, and
# a close operation.

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

In [0]:
# Creating the read operation using the Reader object. The key is a unique for
# each record (filename:line_number) and the value is a string containing the
# content of the line.

key, value = reader.read(filename_queue)

In [0]:
# Parsing the string to create the training set features

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

In [0]:
# Finally push the training instance to a RandomShuffleQueue that will be
# part of the training graph.

instance_queue = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2,
                                       dtypes=[tf.float32, tf.int32],
                                       shapes=[[2], []], name='instance_q',
                                       shared_name='shared_instance_q')
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

In [0]:
# An example of a Session that adds training instances to the instance queue

with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(enqueue_filename, feed_dict={filename: csv_filename})
  sess.run(close_filename_queue)
  try:
    while True:
      sess.run(enqueue_instance)
  except tf.errors.OutOfRangeError:
    pass
  sess.run(close_instance_queue)

In [12]:
# An example of reading training data from the instance queue.

mini_batch_instances, mini_batch_targets = instance_queue.dequeue_up_to(2)

with tf.Session('grpc://127.0.0.1:2222') as sess:
  try:
    while True:
      print(sess.run([mini_batch_instances, mini_batch_targets]))
  except tf.errors.OutOfRangeError:
    pass

[array([[4., 5.],
       [1., 2.]], dtype=float32), array([1, 0], dtype=int32)]
[array([[7., 0.]], dtype=float32), array([0], dtype=int32)]


In [0]:
tf.Session.reset('grpc://127.0.0.1:2222')

In addition to CSV files, you can also have TensorFlow read from fixed-length binary records or TensorFlow's TFRecord format which is baed on protocol buffers.

One limitation of this architecture uses only one thread to read records and push them to the instance queue. 

#### Multithreaded readers using a Coordinator and a QueueRunner

Below is an example of using TensorFlow's `Coordinator` and `QueueRunner` classes which are used to read data from multiple threads.

In [0]:
tf.reset_default_graph()

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.TextLineReader(skip_header_lines=1)

key, value = reader.read(filename_queue)

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

instance_queue = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2,
                                       dtypes=[tf.float32, tf.int32],
                                       shapes=[[2], []], name='instance_q',
                                       shared_name='shared_instance_q')
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

mini_batch_instances, mini_batch_targets = instance_queue.dequeue_up_to(2)

# New code here!
n_threads = 5
queue_runner = tf.train.QueueRunner(instance_queue,
                                    [enqueue_instance] * n_threads)
coord = tf.train.Coordinator()

In [29]:
with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(enqueue_filename, {filename: csv_filename})
  sess.run(close_filename_queue)
  enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
  try:
    while True:
      print(sess.run([mini_batch_instances, mini_batch_targets]))
  except tf.errors.OutOfRangeError:
    pass

[array([[4., 5.],
       [7., 0.]], dtype=float32), array([1, 0], dtype=int32)]
[array([[1., 2.]], dtype=float32), array([0], dtype=int32)]


In [0]:
tf.Session.reset('grpc://127.0.0.1:2222')

Another way you can further parallelize this process is by sharding your training set into multiple CSV files and read from multiple file queues. The following code defines a function which creates a reader which pushes data from a file queue to an instance queue. In order to read from multiple files in parallel, simply provide multiple file queues to the `QueueRunner`.

In [0]:
# Defining a function to read from a file queue and push to an instance queue.

def read_and_push_instance(file_queue, instance_queue, skip_header_lines=1):
  reader = tf.TextLineReader(skip_header_lines=skip_header_lines)
  key, value = reader.read(file_queue)
  x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
  features = tf.stack([x1, x2])
  enqueue_instance = instance_queue.enqueue([features, target])
  return enqueue_instance

In [0]:
# Defining the TensorFlow graph to read from the file.

tf.reset_default_graph()

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

instance_queue = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2,
                                       dtypes=[tf.float32, tf.int32],
                                       shapes=[[2], []], name='instance_q',
                                       shared_name='shared_instance_q')

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue)
                        for _ in range(5)]

queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)
coord = tf.train.Coordinator()

In [35]:
with tf.Session('grpc://127.0.0.1:2222') as sess:
  sess.run(enqueue_filename, {filename: csv_filename})
  sess.run(close_filename_queue)
  enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
  try:
    while True:
      print(sess.run([minibatch_instances, minibatch_targets]))
  except tf.errors.OutOfRangeError:
    pass

[array([[1., 2.],
       [7., 0.]], dtype=float32), array([0, 0], dtype=int32)]
[array([[4., 5.]], dtype=float32), array([1], dtype=int32)]


In [0]:
tf.Session.reset('grpc://127.0.0.1:2222')