<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#队列在多线程中的使用" data-toc-modified-id="队列在多线程中的使用-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>队列在多线程中的使用</a></span><ul class="toc-item"><li><span><a href="#tf.Coordinator" data-toc-modified-id="tf.Coordinator-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>tf.Coordinator</a></span></li><li><span><a href="#tf.QueueRunner" data-toc-modified-id="tf.QueueRunner-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>tf.QueueRunner</a></span></li></ul></li></ul></div>

# 队列在多线程中的使用

多线程可以同时向一个队列中写入元素，或者同时读取一个队列的元素。

TensorFlow 提供了 `tf.Coordinator` 和 `tf.QueueRunner` 两个类来完成多线程协同的功能。

## tf.Coordinator

`tf.Coordinator` 主要用于协同多个线程一起停止，并提供了 `should_stop`、`request_stop` 和 `join` 三个函数。`tf.Coordinator` 使用流程如下:

1. 申明一个`tf.Coordinator`类, 并将这个类传入每一个创建的线程中;
2. 启动线程时，需要一直查询`tf.Coordinator`类中提供的`tf.should_stop`函数, 当这个函数返回值为 `True` 时, 则当前线程也要退出;
3. 每一个启动的线程都可以通过调用`request_stop`函数来通知其他线程退出;当某一个线程调用`request_stop`函数之后, `should_stop`函数的返回值将被设置为`True`,这样其他的线程就可以同时终止了。

In [1]:
import tensorflow as tf
import numpy as np
import threading
import time

建立回调函数, 这个函数每隔1s判断是否终止线程;如果不终止线程, 在继续随机获取一个值, 判断其是否大于0.1; 如果大于0.1, 则打印当前线程id,否则, 调用 `request_stop()`

In [9]:
def MyLoop(coord, worker_id):
    # 判断线程是否需要终止
    # 如果线程未终止, 则继续执行
    while not coord.should_stop():
        # 随机停止所有线程
        if np.random.rand() < 0.1:
            # print("Stoping from id: %d\n" % worker_id)
            # 调用coord.request_stop()函数来通知其他线程停止
            coord.request_stop()
        else:
            # 打印当前线程id
            print("Working on id: %d\n" % worker_id)
        #暂停1s
        time.sleep(1)
    print("Stping from id: %d\n" % worker_id)

In [10]:
# 申明一个 tf.train.Coordinator类来协同多个线程
coord = tf.train.Coordinator()

# 申明创建5个线程
threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)]

# 启动所有线程
for t in threads:
    t.start()
    
# 等待所有线程退出
coord.join(threads)

Working on id: 0

Working on id: 1

Working on id: 2

Working on id: 3

Working on id: 4

Stping from id: 1

Stping from id: 2
Stping from id: 3


Stping from id: 4

Stping from id: 0



## tf.QueueRunner

`tf.QueueRunner` 主要用于以指定的操作(入队或出队操作)启动多个线程来操作同一个队列，启动的这些线程可以通过 `tf.Coordinator` 类来统一管理。

使用`tf.QueueRunner`的流程主要如下:

1. 定义`QueueRunner`:以指定的操作创建多个线程;
2. 将定义过的`QueueRunner`加入指定的集合, 默认的集合为`tf.GraphKeys.QUEUE_RUNNERS`;
3. 在sess中, 明确调用`tf.train.start_queue_runners`来启动所有的线程, 否则会因为没有线程运行入栈操作, 当调用出队操作时, 程序会一直等待入队操作;
4. 其他的: 按需要进行操作;

In [11]:
import tensorflow as tf

In [None]:
# 声明一个FIFO的队列，队列最多100个元素, 类型为实数
queue = tf.FIFOQueue(100, tf.float32)

# 定义队列的入队操作
enqueue_op = queue.enqueue([tf.random_normal([1])])

# 定义出队操作
out_tensor = queue.dequeue()

# 使用tf.train.QueueRunner来创建多个线程运行队列的入队操作
# tf.train.QueueRunner的第一个参数给出了被操作的队列, [enqueue_op] * 5
# 表示了需要启动5个线程, 每个线程中运行的是enqueue_op操作
qr = tf.train.QueueRunner(queue=queue, [enqueue_op] * 5)

# 把 qr 加入默认的 tf.GraphKeys.QUEUE_RUNNERS 集合
tf.train.add_queue_runner(qr)

with tf.Session() as sess:
    # 使用tf.train.Coordinator来协同启动线程
    coord = tf.train.Coordinator()
    
    # 启动多个线程
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # 获取队列中的取值
    for _ in range(3):
        print(sess.run(out_tensor)[0])
        
    # 使用tf.train.Coordinator来停止所有的线程
    coord.request_stop()
    coord.join(threads)
    