# Chapter 12 – Distributed TensorFlow

11장에서는 훈련 속도를 개선하기 위한 여러 기술을 논의했다:
- 가중치 초기화
- Batch Normalization
- sophisticated optimizer

하지만, 단일 machine에 단일 CPU에서 큰 nn을 학습하는 데는 수일, 수주가 걸릴 수 있다

이번 장에서 우리는 다중 device(CPUs and GPUS)에 걸쳐 분산 컴퓨팅 환경에서 TensorFlow를 병렬로 구동하는 방법을 배울 것이다. [Fig 12-1]
![](https://i.imgur.com/KAcAtXa.png)

1. 먼저, [Fig 12-1]과 같이 단일 머신위 다중 devices에서의 분산 계산을 수행하고나서,
2. 다중 머신위 다중 devices에서 분산 계산을 수행할 것이다.

TensorFlow의 분산 컴퓨팅은 경쟁하는 다른 nn F/W과 비교해 조명을 받는 것 중 하나다:
- 계산 graph를 device와 server 별로 쪼게고, 복제하기 위한 완벽한 제어를 할 수 있다.
- 유연하게 tensor ops를 병렬처리하고 동기화하여, 모든 종류의 병렬화 접근방법을 선택할 수 있다.

nn의 훈련과 실행을 병렬화하는 가장 인기있는 접근방법 중 몇가지를 통해:
- 몇주가 걸릴 수 있는 훈련을 단 몇시간에 끝낼 수 있으며,
- 시간 절약 뿐아니라, 다양한 모델을 실험할 수 있고
- 좀더 신선한 data로 훈련시킨 최신의 모델을 유지할 수 있다.
- fine-tuning시 더 많은 hyperparamer를 탐색하고, 더 많은 앙상블을 사용할 수 있다.

그러나 뛰기 전에 먼저 걷자. 단일 머신에 다중 CPU를 달고 간단한 graph로 시작해보자.

## 12.0 Setup
먼저 python2, 3에서 notebook이 잘 작동하지는 확인하기 위해 몇 공통 모듈을 import해보고 MatplotLib로 그린 그림을 notebook에 그리고, 저장할 수 있도록하자:

In [1]:
# To support both python 2 and python 3
from __future__ import division, print_function, unicode_literals

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)

# To plot pretty figures
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "distributed"

def save_fig(fig_id, tight_layout=True):
    path = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID, fig_id + ".png")
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format='png', dpi=300)

## 12.1 단일 머신 위 다중 devices
단일 머신에 단지 GPU 하나 추가해서 놀라운 성능향상을 맛볼 수 있다. 사실상 많은 경우 이것만으로도 충분하다. 예를 들면, 다중 머신 위 16GPU보다 단일 머신 8GPU가 더 빠를 수 있다.(network 전송이 발목을 잡는다.)

이번 절에서는 이러한 환경에서 TensorFlow의 환경을 설정하고, 가용 device들에 ops를 분산 배치하여 이를 병렬로 처리하는 방법을 배울 것이다.

### 12.1.1 설치
1) 다중 GPU 카드에서 TensorFlow를 구동하기 위해서는: 
- GPU들의 NVidia Comput Capability가 3 이상이어야 하며,
- 이러한 카드들은 Nvidia의 Titan, Titan X, K20, K40카드가 있으며,
- [여기](https://developer.nvidia.com/cuda-gpus)에서 다른 카드들의 Capability를 확인할 수 있다.

>GPU가 없다면 클라우드 서비스 업체로부터 호스팅 서비스를 받을 수 있다:
- 아마존 AWS를 이용하는 방법은 http://goo.gl/kbge5b 참고
- 구글에서는 [Cloud Machine Learning](https://cloud.google.com/ml)을 이용할 수 있으며,
- GPU 구매는 정기적으로 업데이트되는 [Time Dettets의 blog](https://goo.gl/pCtSAn)을 참고하라.

2) Nvidia's Deep Learning SDK 관련 라이브러리 설치:
TensorFlow는 GPU 카드를 제어하고 고속 연산을 위해 CUDA와 cuDNN을 필요로 한다.
- CUDA(Compute Unifed Device Architecture) : tensorflow 1.3에서는 v8.0
- cuDNN(CUDA Deep Neural Network) library: v5.1이 필요

![](https://i.imgur.com/lGPiLLI.png)

nvidia-smi 명령어로 CUDA가 적절히 설치되었는지 확인할 수 있다.

In [2]:
!nvidia-smi

Tue Sep  5 14:59:22 2017       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 375.26                 Driver Version: 375.26                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  TITAN X (Pascal)    Off  | 0000:05:00.0      On |                  N/A |
| 23%   39C    P8    11W / 250W |    149MiB / 12181MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  TITAN X (Pascal)    Off  | 0000:06:00.0     Off |                  N/A |
| 23%   40C    P8    10W / 250W |      1MiB / 12189MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  TITAN X (Pascal)    Off  | 0000:09:00.0     Off |                  N/

- 가용 GPU list와 각 카드에서 구동되는 process들을 보여준다.

3) CUDA와 cuDNN의 경로를 환경변수로 설정한다.
- MacOSX에서는 다음 코드를 .bash_profile에 추가한다.
```bash
export CUDA_HOME=/usr/local/cuda
export PATH=$PATH:$CUDA_HOME/bin
export CUDA_LIBS=$CUDA_HOME/lib64:$CUDA_HOME/extras/CUPTI/lib64
export DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:$CUDA_LIBS
```
- Linux에서는 DYLD_LIBRARY_PATH을 LD_LIBRARY_PATH로 바꾼다.
```bash
export CUDA_HOME=/usr/local/cuda
export PATH=$PATH:$CUDA_HOME/bin
export CUDA_LIBS=$CUDA_HOME/lib64:$CUDA_HOME/extras/CUPTI/lib64
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CUDA_LIBS
```

4) 마지막으로 TensorFlow GPU 버전을 설치한다.
- TensorFlow 홈페이지를 참고하자.

이제 모든 설치가 끝났으니 python shell에서 TensorFlow를 인식하고 CUDA와 cuDNN을 적절히 이용하는 지 확인해보자.

#### local
python shell에서 실행할 때 device를 모두 확인해볼 수 있다.
```python
import tensorflow as tf
sess = tf.Session()
```
출력 log를 보면:
```bash
...
2017-09-05 15:42:47.962202: I tensorflow/core/common_runtime/gpu/gpu_device.cc:977] Creating TensorFlow device (/gpu:0) -> (device: 0, name: TITAN X (Pascal), pci bus id: 0000:0a:00.0)
2017-09-05 15:42:47.962213: I tensorflow/core/common_runtime/gpu/gpu_device.cc:977] Creating TensorFlow device (/gpu:1) -> (device: 1, name: TITAN X (Pascal), pci bus id: 0000:09:00.0)
2017-09-05 15:42:47.962221: I tensorflow/core/common_runtime/gpu/gpu_device.cc:977] Creating TensorFlow device (/gpu:2) -> (device: 2, name: TITAN X (Pascal), pci bus id: 0000:06:00.0)
2017-09-05 15:42:47.962228: I tensorflow/core/common_runtime/gpu/gpu_device.cc:977] Creating TensorFlow device (/gpu:3) -> (device: 3, name: TITAN X (Pascal), pci bus id: 0000:05:00.0)
```
- TensorFlow가 CUDA와 cdDNN 라이브러리를 검출하고, 
- GPU 카드를 검출하기 위해 CUDA 라이브러리를 이용한다.

In [2]:
import tensorflow as tf

In [3]:
c = tf.constant("Hello distributed TensorFlow!")
server = tf.train.Server.create_local_server()

In [4]:
with tf.Session(server.target) as sess:
    print(sess.run(c))

b'Hello distributed TensorFlow!'


### 12.1.2 GPU 램 관리하기
디폴트로 TensorFlow는 처음 graph를 구동할 때, 자동으로 가용 GPU의 모든 RAM을 차지하기 때문에, 첫 프로그램이 종료하기 전까지 두번째 프로그램을 구동할 수 없다.

이때, 두번째 프로그램을 구동하면 다음과 같은 error가 발생한다:
```bash
E [...]/cuda_driver.cc:965] failed to allocate 3.66G (3928915968 bytes) from
device: CUDA_ERROR_OUT_OF_MEMORY
```

이에 대한 해결방안은:

1) 사용가능한 GPU를 지시:
- 단순히 CUDA_VISIBLE_DEVICES 환경 변수를 설정한다.
- 예를 들면 program을 아래와 같이 시작한다.
```bash
$ CUDA_VISIBLE_DEVICES=0,1 python3 program_1.py
# and in another terminal:
$ CUDA_VISIBLE_DEVICES=3,2 python3 program_2.py
```
    - 프로그램 1은 GPU 카드 0과 1만 인식하고(프로그램 내부에서는 0, 1로 번호매김)
    - 프로그램 2는 GPU 카드 2와 3만 인식한다.(프로그램 내부에서는 0, 1로 번호매김)

![](https://i.imgur.com/K4jb76J.png)

2) 메모리의 부분 사용을 지시:
- ConfigProto 객체를 생성하여, 각 GPU 메모리의 40%만 사용하도록할 수 있다.
```bash
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.4
session = tf.Session(config=config)
```

![](https://i.imgur.com/VkfGB2a.png)

In [7]:
!CUDA_VISIBLE_DEVICES=0,1 python ../program1.py
!CUDA_VISIBLE_DEVICES=2,3 python ../program2.py

2017-09-05 16:34:21.623756: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 16:34:21.623786: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 16:34:21.623795: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 16:34:21.623801: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 16:34:21.623808: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't

In [9]:
!cat ../program3.py

import tensorflow as tf
import time

c = tf.constant("Hello distributed TensorFlow 3!")
server = tf.train.Server.create_local_server()

time.sleep(5)

config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.4

with tf.Session(server.target, config=config) as sess:
	print(sess.run(c))


In [11]:
!python ../program3.py

2017-09-05 16:46:32.745415: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 16:46:32.745443: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 16:46:32.745451: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 16:46:32.745457: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 16:46:32.745463: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't

>만약 RAM 40%를 사용하는 3개의 프로그램이 가동되면:
- 3 * .4 > 1 이므로, 3번째 프로그램은 error가 발생한다.

>프로그램 구동 중, nvidia-smi 명령어를 실행하면, 각 카드의 40%의 메모리를 사용하는 것을 볼 수 있다.

3) 다른 옵션은 TensorFlow에게 필요할 때만 RAM을 제어하도록 할 수 있다.
- config.gpu_options.allow_growth=True로 설정.
- 그러나 실제로 Tensorflow는 메모리 파편화를 피하기 위해 한번 차지한 RAM을 release하지 않는다.
- 따라서 별로 권장하는 옵션은 아니다.

### 12.1.3 devices에 ops을 할당
TensorFlow 백서에는 친숙한 dynamic placer 알고리즘을 제공한다:
- 다음 사항들을 고려하여:
    - graph의 이전 수행에서 측정된 연산시간 등, 
    - input size 추정치, 
    - 각 ops의 출력 tensor,
    - 각 devices의 가용 메모리 량,
    - devices 들간의 data 입출력에 따른 전송 지연
    - 사용자로 부터의 제약사항 등
- 모든 가용 devices에 자동적으로 ops를 분산 배치한다.
- 불행히도, 이런 현한적인 알고리즘은 open source로 제공된 tensorflow에서는 공개되지 않았다.
- 추론하기를 small set에 대한 배치는 dynamic placer보다 사용자가 지정하는 것이 더 효율적이라 생각된다.

google의 TensorFlow 팀은 이 알고리즘을 개선하고 있으며, 언제가는 오픈할 것으로 예상한다.

#### Simple placement
graph를 구동할 때마다, TensorFlow는 아직 device들에 배치되지 않은 node들을 평가할 필요가 있다면, 단순 배치를 사용한다.

단순 배치자는 다음 규칙을 따른다:
- node가 graph의 이전 반복에서 어떤 device에 배치되었다면, 해당 device에 남겨둔다.
- 그렇지 않고, 사용자가 특정 device에 그 node를 배치해다면, 그렇게 한다.
- 그렇지 않다면, default로 GPU #0 또는 CPU(GPU가 없다면)에 배치한다.

따라서 특정 ops를 적절한 device에 배치하는 것은 전적으로 사용자에 달려있다.
- 아무것도 하지 않으면, 전체 graph는 default device에만 배치된다.

특정 device에 node들을 지정하려면 device() 함수를 이용하여 device block을 생성해야 한다:
- 다음 코드는 변수 a와 상수 b를 CPU에 배치하고,
- 곱 node C는 아무 device에도 배치하지 않았다.
- 따라서 c는 default device에 배칠될 것이다.
```python
with tf.device("/cpu:0"):
    a = tf.Variable(3.0)
    b = tf.constant(4.0)
c = a * b
```

>"/cpu:0" device는 다중 CPU 시스템에 있는 모든 CPU들을 aggregate한다.
- 아직까지는 특정 CPU에 배치하거나, CPUs의 subet에 배치할 방법은 없다.

In [12]:
with tf.device("/cpu:0"):
    a,b = tf.Variable(3.0), tf.Variable(4.0)
c = a*b
c

<tf.Tensor 'mul:0' shape=() dtype=float32>

#### Logging placements
사용자가 지정한 배치에 따라 단순 배치자가 동작하는 지 확인해보자:
- 이를 위해 log_device_placement 옵션을 True로 설정한다.
- 이는 배치자가 node를 배치할 때 log를 남기도록 한다.

다음과 같은 logPlace.py을 생성하고 이를 실행해보자.

In [13]:
!cat ../logPlace.py

import tensorflow as tf

with tf.device("/cpu:0"):
	a = tf.Variable(3.0)
	b = tf.constant(4.0)

c = a * b

config = tf.ConfigProto()
config.log_device_placement = True
init = tf.global_variables_initializer()

sess = tf.Session(config=config)

init.run(session=sess)

res = sess.run(c)
print (res)


In [15]:
!python ../logPlace.py

2017-09-05 18:31:14.448522: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 18:31:14.448551: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 18:31:14.448559: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 18:31:14.448567: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-09-05 18:31:14.448573: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't

- 교재와 log가 좀 다르지만, 변수에 대한 device 매핑정보에서 'cpu'로 할당된 것을 확인할 수 있다.

우리가 session을 생성할 때, TensorFlow는 GPU 카드를 발견했다는 정보를 log로 알려준다.
- graph를 처음 구동할 때(a를 초기화할 때) 단순 배치자가 구동되고
    - 배치자는 각 node를 할당한 device에 배치한다.
    - 그러나, mul로 name된 node는 default device인 '/gpu:0'에 배치되었다.
- graph를 다음 2번 째로 구동할 때(c를 계산할 때)
    - c 계산을 위한 모든 node들이 이미 배치었기 때문에
    - 단순 배치자는 더이상 사용되지 않는다.

>"/job:localhost/replica:0/task:0"와 같은 prefix는 제거할 수도 있다.

#### 동적할당 함수
device block을 생성할 때, device 명 대신 함수를 지정할 수 있다.
- TF는 이 device block 내에 배치할 각 ops에 대해 이 함수를 호출하고,
- 이 함수는 ops에 부착할 device 명을 리턴한다.

예로 다음 코드는 모든 변수 노드를 "/cpu:0"에 배치하고, 그외 다른 노드는 "/gpu:0"에 배치한다:

In [16]:
def variables_on_cpu(op):
    if op.type == "Variable":
        return "/cpu:0"
    else:
        return "/gpu:0"
with tf.device(variables_on_cpu):
    a = tf.Variable(3.0)
    b = tf.constant(4.0)
    c = a * b

- 위 예로부터 좀 더 복잡한 방식(각 변수를 round-robin 방식)으로 배치할 수 있다.

#### Operations and kernels


# Cluster: Multi-Machine

In [5]:
cluster_spec = tf.train.ClusterSpec({
    "ps": [
        "127.0.0.1:2221",  # /job:ps/task:0
        "127.0.0.1:2222",  # /job:ps/task:1
    ],
    "worker": [
        "127.0.0.1:2223",  # /job:worker/task:0
        "127.0.0.1:2224",  # /job:worker/task:1
        "127.0.0.1:2225",  # /job:worker/task:2
    ]})

In [6]:
task_ps0 = tf.train.Server(cluster_spec, job_name="ps", task_index=0)
task_ps1 = tf.train.Server(cluster_spec, job_name="ps", task_index=1)
task_worker0 = tf.train.Server(cluster_spec, job_name="worker", task_index=0)
task_worker1 = tf.train.Server(cluster_spec, job_name="worker", task_index=1)
task_worker2 = tf.train.Server(cluster_spec, job_name="worker", task_index=2)

# Pinning operations across devices and servers

In [7]:
reset_graph()

with tf.device("/job:ps"):
    a = tf.Variable(1.0, name="a")

with tf.device("/job:worker"):
    b = a + 2

with tf.device("/job:worker/task:1"):
    c = a + b

In [8]:
with tf.Session("grpc://127.0.0.1:2221") as sess:
    sess.run(a.initializer)
    print(c.eval())

4.0


In [9]:
reset_graph()

with tf.device(tf.train.replica_device_setter(
        ps_tasks=2,
        ps_device="/job:ps",
        worker_device="/job:worker")):
    v1 = tf.Variable(1.0, name="v1")  # pinned to /job:ps/task:0 (defaults to /cpu:0)
    v2 = tf.Variable(2.0, name="v2")  # pinned to /job:ps/task:1 (defaults to /cpu:0)
    v3 = tf.Variable(3.0, name="v3")  # pinned to /job:ps/task:0 (defaults to /cpu:0)
    s = v1 + v2            # pinned to /job:worker (defaults to task:0/cpu:0)
    with tf.device("/task:1"):
        p1 = 2 * s         # pinned to /job:worker/task:1 (defaults to /cpu:0)
        with tf.device("/cpu:0"):
            p2 = 3 * s     # pinned to /job:worker/task:1/cpu:0

config = tf.ConfigProto()
config.log_device_placement = True

with tf.Session("grpc://127.0.0.1:2221", config=config) as sess:
    v1.initializer.run()

# Readers

In [10]:
reset_graph()

test_csv = open("my_test.csv", "w")
test_csv.write("x1, x2 , target\n")
test_csv.write("1.,    , 0\n")
test_csv.write("4., 5. , 1\n")
test_csv.write("7., 8. , 0\n")
test_csv.close()

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
    name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    try:
        while True:
            sess.run(enqueue_instance)
    except tf.errors.OutOfRangeError as ex:
        print("No more files to read")
    sess.run(close_instance_queue)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print("No more training instances")

No more files to read
[array([[  4.00000000e+00,   5.00000000e+00],
       [  1.00000000e+00,   8.62997533e-19]], dtype=float32), array([1, 0], dtype=int32)]
[array([[ 7.,  8.]], dtype=float32), array([0], dtype=int32)]
No more training instances


In [11]:
#coord = tf.train.Coordinator()
#threads = tf.train.start_queue_runners(coord=coord)
#filename_queue = tf.train.string_input_producer(["test.csv"])
#coord.request_stop()
#coord.join(threads)

# Queue runners and coordinators

In [12]:
reset_graph()

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
    name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

n_threads = 5
queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)
coord = tf.train.Coordinator()

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print("No more training instances")

[array([[ 7.,  8.],
       [ 4.,  5.]], dtype=float32), array([0, 1], dtype=int32)]
[array([[  1.00000000e+00,   8.62997533e-19]], dtype=float32), array([0], dtype=int32)]
No more training instances


In [13]:
reset_graph()

def read_and_push_instance(filename_queue, instance_queue):
    reader = tf.TextLineReader(skip_header_lines=1)
    key, value = reader.read(filename_queue)
    x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
    features = tf.stack([x1, x2])
    enqueue_instance = instance_queue.enqueue([features, target])
    return enqueue_instance

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
    name="instance_q", shared_name="shared_instance_q")

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]
queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    coord = tf.train.Coordinator()
    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print("No more training instances")



[array([[  4.00000000e+00,   5.00000000e+00],
       [  1.00000000e+00,   8.62997533e-19]], dtype=float32), array([1, 0], dtype=int32)]
[array([[ 7.,  8.]], dtype=float32), array([0], dtype=int32)]
No more training instances


# Setting a timeout

In [14]:
reset_graph()

q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])
v = tf.placeholder(tf.float32)
enqueue = q.enqueue([v])
dequeue = q.dequeue()
output = dequeue + 1

config = tf.ConfigProto()
config.operation_timeout_in_ms = 1000

with tf.Session(config=config) as sess:
    sess.run(enqueue, feed_dict={v: 1.0})
    sess.run(enqueue, feed_dict={v: 2.0})
    sess.run(enqueue, feed_dict={v: 3.0})
    print(sess.run(output))
    print(sess.run(output, feed_dict={dequeue: 5}))
    print(sess.run(output))
    print(sess.run(output))
    try:
        print(sess.run(output))
    except tf.errors.DeadlineExceededError as ex:
        print("Timed out while dequeuing")


2.0
6.0
3.0
4.0
Timed out while dequeuing


# Exercise solutions

**Coming soon**