# Threading and Queues

队列(queue)是TensorFlow中实现异步计算的重要机制。

一个队列也是计算图中的一个节点。和variable一样，队列是有状态的(stateful)节点：其他的节点可以修改它的值。具体地，可以对队列节点进行入队和出队操作。

先看一个简单的例子。创建一个FIFOQueue，初始元素值为0。然后我们构建一个计算图，它从队列中取出一个元素，加1，再入队。

tensorflow官网这个gif把整个过程描述的很清楚：


 ![](http://odw1x7kgr.bkt.clouddn.com/IncremeterFifoQueue.gif)

看一下FIFOQueue的构造方法:

FIFOQueue supports multiple concurrent producers and consumers.

FIFOQueue保存了一个列表的元素，capacity个元素。每个元素都是固定长度的tuple of tensors，它们的类型是dtypes参数给出，shapes由shapes参数给出。如果不给出shapes参数，不同的元素可以有不同的shape哦，但是这个时候就不能用dequeue_many方法了。

__init__(capacity, dtypes, shapes=None, names=None, shared_name=None,
        name='fifo_queue')

* capacity: 整数。队列的最大容量。
* dtypes: 一个DType列表。注意dtypes的长度必须等于每个队列元素的tensor个数。

Enqueue、EnqueueMany、Dequeue是特殊的节点，他们接收一个指向队列的**指针**，可以更改队列元素。

注意：队列方法（比如q.enqueu()）必须和队列在同一个device。

## Queue usage overview

队列，tf.FIFOQueue、tf.RandomShuffleQueue在=TF中很重要，主要用于异步计算tensor。


比如，一个典型的输入框架需要用到RandomShuffleQueue：
* 多个线程准备训练集样本，负责把样本push到队列
* 一个训练线程执行训练op，它从队列中每次取mini-batch个样本

`Session`是多线程的，所以多个线程可以使用同一个session，并行执行。然而，Python下编写多线程程序不容易：所有的线程必须能够全部stop，还要做到捕获异常，最后需要关掉队列。

为此，tensorflow提供了两个类来协助多线程操作：tf.train.Coordinator和tf.train.QueueRunner。**这两个类要一起使用**。

Coordinator类帮助多个线程stop，如果stop时发生异常，也负责报告异常。

QueueRunner类用于创建多个线程来进行同一个队列的入队操作。

### Coordinator

Coordinator帮助多个线程stop。

它的主要几个方法：
* tf.train.Coordinator.should_stop: 如果线程应该stop返回True
* tf.train.Coordinator.request_stop: 请求线程应该stop
* tf.train.Coordinator.join: 等待直到指定的线程已经stopped

你首先创建一个Coordinator对象，然后创建多个线程使用这个coordinator。

任意一个线程都可以决定计算应该stop。它只需要调用`request_stop()`放，其他线程会stop只要should_stop()返回True。

In [None]:
# 线程主体： 循环，直到coordinator indicat 某个线程发起了stop请求
def MyLoop(coord):
    while not coord.should_stop():
        do something
        if some condition:
            coord.request_stop()
            
# 主线程：创建一个Corodinator
coord = tf.train.Coordinator()

# 创建10个线程，执行MyLoop()
threads = [threading.Thread(target=MyLoop, args=(coord, )) for i in range(10)]

# 启动线程，等待他们stop
for t in threads:
    t.start()

coord.join(threads)

不同的线程可以做不同的事情，并不是必须执行相同的代码，不论线程的任务相不相同，都可以用Coordinator管理。

如果stop时发生异常，Coordinator也会捕捉并报告。

### QueueRunner

QueueRunner类用于创建多个线程重复地执行入队列操作。这些线程可以用coordinator来stop。另外，queue runner还执行了一个colser线程，如果coordinator报告有异常，closer线程会自动地关闭队列。


数据读取的框架是少不了queue runner的。

首先创建一个是用队列的图用于输入样本。然后往图中添加处理样本的节点(op)和出队列的节点(op)。添加训练节点。

In [None]:
example = ...ops to create one example...
# Create a queue, and an op that enquees examples one at a time in the queue
queue = tf.RandomShuffleQueue(...)
enqueue_op = queue.enqueue(example)
# Create a training graph that starts by dequeuing a batch of examples
inputs = queue.dequeue_many(batch_size)
train_op = ...use 'inputs' to build the training part of the graph...

In [None]:
# Create a queue runner that will run 4 threads in parallel to enqueue
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4) # 如此简单

# 启动计算图
sess = tf.Session()
# 创建一个Coordinator, 启动queue runner threads
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# 执行循环，是用coordinator来控制线程的停止
for step in xrange(1000000):
    if coord.should_stop(): 
        break
    sess.run(train_op)
# 结束后，要求线程stop
coord.request_stop()
# 等待
coord.join(enqueue_threads)

 关于should_stop()
 
 Check if stop was requested. Return True if a stop was requested.
 

### 处理异常

queue runner启动的线程不仅仅要做入队操作，它们还肩负捕获、处理队列异常的任务，包括tf.errors.OutOfRangeError异常，这个异常说明队列已经关闭了。

A training program that uses a coordinator must similarly catch and report exceptions in its main loop.

In [None]:
try:
    for step in xrange(1000000):
        if coord.should_stop():
            break
        sess.run(train_op)
except Exception, e:
    # Report exceptions to the coordinator.
    coord.request_stop(e)
finally:
    # Terminate as usual. It is safe to call `coord.request_stop()` twice.
    coord.request_stop()
    coord.join(threads)