**TensorFlow多线程输入数据处理框架**  

---
当数据量很大时，入队操作从硬盘中读取数据，放入内存中，主线程需要等待入队操作完成，才能进行训练。会话中可以运行多个线程，实现异步读取。

In [1]:
import tensorflow as tf
import os

# 调整警告等级
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

## 队列
----

### 先进先出队列

In [None]:
with tf.Graph().as_default():
    # 模拟一下同步先处理数据，然后才能取数据训练
    # 1.首先定义一个队列
    queue1 = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

    # 初始化，放入一些数据
    init = queue1.enqueue_many([[0.1, 0.2, 0.3], ])

    out_queue = queue1.dequeue()
    data = out_queue + 1
    en_queue = queue1.enqueue(data)

    with tf.Session() as sess:
        # 初始化队列
        sess.run(init)

        for i in range(3):
            sess.run(en_queue)

        # print(queue1.size().eval())

        for i in range(queue1.size().eval()):
            print(sess.run(out_queue))

### 随机队列
RandomShuffleQueue会 将队列中的元素打乱，每次出队列操作得到的是当前队列所有元素中随机选择的一个。

In [4]:
with tf.Graph().as_default():
    queue = tf.RandomShuffleQueue(capacity=5, min_after_dequeue=0, dtypes=tf.float32)

    # 放入一些数据
    init = queue.enqueue_many([[0.1, 0.2, 0.3, 0.4, 0.5], ])

    out_queue = queue.dequeue()

    with tf.Session() as sess:
        for i in range(5):
            sess.run(init)
            print('第{}次实验'.format(i+1))

            for m in range(queue.size().eval()):
                x = sess.run(out_queue)
                print('  第{}次弹出队列：{:.1f}'.format(m+1, x))

第1次实验
  第1次弹出队列：0.4
  第2次弹出队列：0.5
  第3次弹出队列：0.3
  第4次弹出队列：0.2
  第5次弹出队列：0.1
第2次实验
  第1次弹出队列：0.2
  第2次弹出队列：0.4
  第3次弹出队列：0.1
  第4次弹出队列：0.3
  第5次弹出队列：0.5
第3次实验
  第1次弹出队列：0.5
  第2次弹出队列：0.1
  第3次弹出队列：0.4
  第4次弹出队列：0.3
  第5次弹出队列：0.2
第4次实验
  第1次弹出队列：0.5
  第2次弹出队列：0.2
  第3次弹出队列：0.3
  第4次弹出队列：0.4
  第5次弹出队列：0.1
第5次实验
  第1次弹出队列：0.1
  第2次弹出队列：0.3
  第3次弹出队列：0.2
  第4次弹出队列：0.4
  第5次弹出队列：0.5


## 线程（队列管理器）  
---
Tensorflow的计算主要在使用CPU/GPU和内存，而数据读取涉及磁盘操作，速度远低于前者操作。因此通常会使用多个线程读取数据，然后使用一个线程消费数据。QueueRunner就是来管理这些读写队列的线程的。

- 创建一个QueueRunner  
    *tf.train.QueueRunner(queue=..., enqueue_ops=...,)*  
    - queque：A queue  
    - enqueue_ops：添加线程的队列操作列表，[]* 2，指定两个线程  
    

In [10]:
'''
增加计数的进程会不停的后台运行，执行入队的进程会先执行10次（因为队列长度只有10），
然后主线程开始消费数据，当一部分数据消费被后，入队的进程又会开始执行。
最终主线程消费完20个数据后停止，但其他线程继续运行，程序不会结束。
主线程结束，意味着session关闭，即资源释放，子线程依然在进行操作，因此程序会报错
'''
with tf.Graph().as_default():
    queue = tf.FIFOQueue(10, tf.float32)

    var = tf.Variable(0.0)

    # 实现自增
    data = tf.assign_add(var, tf.constant(1.0))

    en_q = queue.enqueue([data])

    # 定义队列管理器op，指定多少个子线程，以及子线程的任务
    qr = tf.train.QueueRunner(queue=queue, enqueue_ops=[en_q] * 2)

    # 初始化变量op
    init_op = tf.global_variables_initializer()

    with tf.Session() as sess:
        # 初始化变量
        sess.run(init_op)
        
        # 开启子线程
        threads = qr.create_threads(sess, start=True)
        
        # 主线程，消费数据
        for i in range(20):
            print(sess.run(queue.dequeue()))

4.0
9.0
13.0
14.0
15.0
17.0
18.0
18.0
19.0
21.0
22.0
22.0
23.0
25.0
25.0
26.0
28.0
29.0
29.0
31.0
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
	 [[{{node fifo_queue_enqueue}} = QueueEnqueueV2[Tcomponents=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](fifo_queue, AssignAdd)]]
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
	 [[{{node fifo_queue_enqueue}} = QueueEnqueueV2[Tcomponents=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](fifo_queue, AssignAdd)]]


Exception in thread QueueRunnerThread-fifo_queue-fifo_queue_enqueue:
Traceback (most recent call last):
  File "c:\program files\python36\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "c:\program files\python36\lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "c:\program files\python36\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 257, in _run
    enqueue_callable()
  File "c:\program files\python36\lib\site-packages\tensorflow\python\client\session.py", line 1257, in _single_operation_run
    self._call_tf_sessionrun(None, {}, [], target_list, None)
  File "c:\program files\python36\lib\site-packages\tensorflow\python\client\session.py", line 1407, in _call_tf_sessionrun
    run_metadata)
tensorflow.python.framework.errors_impl.CancelledError: Enqueue operation was cancelled
	 [[{{node fifo_queue_enqueue}} = QueueEnqueueV2[Tcomponents=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0

## 线程协调器
---
Coordinator是个用来保存线程组运行状态的协调器对象，它和TensorFlow的Queue没有必然关系，是可以单独和Python线程使用的。  
实现一个简单的机制来协调一个线程的终止  
- *tf.train.Coordinator*  
    - request_stop()
    - should_stop()
    - join(thread=None, stop_grace_period_sec=120),等待线程终止  
      
    - return：线程协调员实例

In [11]:
with tf.Graph().as_default():
    queue = tf.FIFOQueue(10, tf.float32)

    var = tf.Variable(0.0)

    # 实现自增
    data = tf.assign_add(var, tf.constant(1.0))

    en_q = queue.enqueue([data])

    # 定义队列管理器op，指定多少个子线程，以及子线程的任务
    qr = tf.train.QueueRunner(queue=queue, enqueue_ops=[en_q] * 2)

    # 初始化变量op
    init_op = tf.global_variables_initializer()

    with tf.Session() as sess:
        # 初始化变量
        sess.run(init_op)
        
        # 开启线程协调器
        coord = tf.train.Coordinator()
        
        # 开启子线程
        threads = qr.create_threads(sess=sess, coord=coord, start=True)
        
        # 主线程，消费数据
        for i in range(20):
            print(sess.run(queue.dequeue()))
        
        # 回收
        coord.request_stop()
        coord.join(threads)

5.0
14.0
15.0
15.0
17.0
18.0
18.0
20.0
21.0
22.0
23.0
23.0
24.0
25.0
27.0
27.0
28.0
30.0
31.0
31.0
