# tensorflow 队列

### 两种队列

> FIFOQueue

> RandomShuffleQueue

### 多线程协同功能

> tf.Coordinator 主要用于协同多个线程一起停止， 并提供should_stop,request_stop,join三个函数

> tf.QueueRunner 主要用于启动多个线程来操作同一个队列，启动的线程可以通过tf.Coordinator类统一管理

### 输入文件队列

> tf.train.match_filenames_once 获取符合一个正则表达式的所有文件

> tf.train.string_input_producer 会使用初始化时提供的文件列表创建一个输入队列。此函数生成的输入队列，可以同时被多个文件读取线程操作； 输入队列会将队列中的文件均匀的分给不同的线程。 shuffle为True时，文件加入队列之前会被打乱顺序； num_epochs 限制加载出事文件列表的最大轮数。

### 组合batch训练数据

> tf.train.batch， tf.train.shuffle_batch 将单个的样例组织成batch的形式输出。 队列的入队操作是生成单个样例的方法，而每次出队列得到的是一个batch的样例

> tf.train.shuffle_batch和tf.train.shuffle_batch_join都和完成多线程并行化的方式来进行数据预处理。

> tf.train.shuffle_batch，不同的线程会读取同一个文件，但如果一个文件中的样例比较相似，那么神经网络的训练效果可能会受到影响。所以在使用tf.train.shuffle_batch时，需要将同一个TFRecord文件中的样例随机打乱。

> tf.train.shuffle_batch_join，不同的线程会读取不同文件。如果读取的线程数比总文件数还大，那么多多个线程可能会读取同一个文件中相近部分的数据。而多个线程读取多个文件可能过的硬盘寻址，从而使得读取效率降低。

In [8]:
import tensorflow as tf
import numpy as np
import threading
import time

# 创建队列并操作里面的元素

In [9]:

# 创建一个先进先出队列，指定队列中最多可以保存的两个元素，并指定类型为整数
q = tf.FIFOQueue(2, 'int32')

# 使用enqueue_many初始化队列中的元素
init = q.enqueue_many(([0, 10], ))

# 使用dequeue将队列中的第一个元素出队列
x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y])

with tf.Session() as sess:
    # 运行初始化队列
    init.run()
    for _ in range(5):
        v, v1 = sess.run([x, q_inc])
        
        # 打印出队列取值
        print v
        
        print y


0
Tensor("add_3:0", dtype=int32)
10
Tensor("add_3:0", dtype=int32)
1
Tensor("add_3:0", dtype=int32)
11
Tensor("add_3:0", dtype=int32)
2
Tensor("add_3:0", dtype=int32)


# 多线程协调

In [13]:
# 线程中运行的程序，每隔1秒钟判断是否需要停止并打印自己的ID

def MyLoop(coord, worker_id):
    # 使用tf.Coordinator类提供的协同工具判断当前线程是否需要停止
    while not coord.should_stop():
        # 随机停止所有的线程
        if np.random.rand() < 0.1:
            print "Stoping from id: %d\n" % worker_id
            # 调用coord.request_stop通知其它线程停止
            coord.request_stop()
        else:
            print "working on id: %d\n" % worker_id
        time.sleep(1)

# 声明一个tf.train.Coordinator类来协同多个线程
coord = tf.train.Coordinator()
# 声明5个线程
threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in xrange(5)]
# 启动所有线程
for t in threads:
    t.start()
# 等待所有线程推出
coord.join(threads)

Stoping from id: 0
working on id: 1




# 多个线程操作一个(写)队列

In [18]:
# 声明一个先进先出队列，队列中最多100个元素，类型为实数
queue = tf.FIFOQueue(100, "float")
# 定义队列的入队操作
enqueue_op = queue.enqueue([tf.random_normal([1])])

# tf.train.QueueRunner创建多个线程运行队列的入队操作
# [enqueue_op] * 5 表示需要启动5个线程，每个线程中运行的是enqueue_op操作
qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)

# 将定义过的QueueRunner加入到TensorFlow计算图上指定的集合
# tf.train.add_queue_runner没有指定的集合，则加入默认集合tf.GraphKeys.QUEUE_RUNNERS
tf.train.add_queue_runner(qr)

# 定义出队列操作
out_tensor = queue.dequeue()

with tf.Session() as sess:
    # 使用tf.train.Coordinator来协同启动线程
    coord = tf.train.Coordinator()
    # 使用tf.train.QueueRunner时，需要明确调用tf.train.start_queue_runners来启动所有的线程
    # tf.train.start_queue_runners会默认启动tf.GraphKeys.QUEUE_RUNNERS集合中所有的QueueRunner
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # 获取队列中的取值
    for _ in range(3):
        print sess.run(out_tensor)[0]
    
    # 使用tf.train.Coordinator来停止所有的线程
    coord.request_stop()
    coord.join(threads)


-1.70156
-0.751789
-0.992687


# 输入文件队列

In [23]:
# 模拟海量数据情况下将数据写入不同的文件

# 创建TFRecord文件
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 定西总共写入多少个文件
num_shards = 2
# 定义每个文件中有多少个数据
instances_per_shard = 2

for i in range(num_shards):
    # 将数据氛围多个文件时，命令方式0000n-of-0000m后缀区分。
    # m表示数据总共被存在来多少个文件中，n表示当前文件的编号
    filename = ('/Users/xxx/tmp/data.tfrecords-%.5d-of-%.5d' % (i, num_shards))
    print "filename: ", filename
    writer = tf.python_io.TFRecordWriter(filename)
    
    # 将数据封装成Example结构并写入TFRecord文件 
    for j in range(instances_per_shard):
        example = tf.train.Example(features=tf.train.Features(feature={
            'i': _int64_feature(i),
            'j': _int64_feature(j)
        }))
        writer.write(example.SerializeToString())
        
    writer.close()

filename:  /Users/xxx/tmp/data.tfrecords-00000-of-00002
filename:  /Users/xxx/tmp/data.tfrecords-00001-of-00002


In [26]:
# 文件队列操作

# 使用tf.train.match_filenames_once获取文件列表
files = tf.train.match_filenames_once("/Users/xxx/tmp/data.tfrecords-*")

# 使用tf.train.string_input_producer创建输入队列
filename_queue = tf.train.string_input_producer(files, shuffle=False)

# 读取并解析一个样本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(serialized_example,
                                  features={
                                   'i': tf.FixedLenFeature([], tf.int64),
                                    'j': tf.FixedLenFeature([], tf.int64)
                                  })

with tf.Session() as sess:
    # 虽然本段程序没有声明任何变量，但使用tf.train.match_filenames_once函数时需要初始化一些变量
    tf.initialize_all_variables().run()
    
    # 打印文件列表
    print sess.run(files)
    
    # 声明tf.train.Coordinator类协同不同的线程，并启动线程
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # 多次执行获取数据的操作
    for i in range(10):
        print sess.run([features['i'], features['j']])
        
    coord.request_stop()
    coord.join(threads)


['/Users/xxx/tmp/data.tfrecords-00000-of-00002'
 '/Users/xxx/tmp/data.tfrecords-00001-of-00002']
[0, 0]
[0, 1]
[1, 0]
[1, 1]
[0, 0]
[0, 1]
[1, 0]
[1, 1]
[0, 0]
[0, 1]


In [32]:

# 组合训练数据batching
# 这里假设Example结构中i表示一个样例的特征向量，j表示样例对应的标签
example, label = features['i'], features['j']
print "example: ", example, " ,label: ", label

# 一个batch中样例的个数
batch_size = 3

# 组合样例的队列中最多可以存储的样例个数。 一般这个队列的大小会和每一个batch的大小相关
# 如果队列太大，那么需要占用很多内存资源； 如果太小，那么出队列可以会因为没有数据而被阻碍，从而导致训练效率降低
capacity = 1000 + 3 * batch_size

# 使用tf.train.batch组合样例
# [example, label]参数给出类需要组合的元素
# batch_size 参数给出了每个batch中样例的个数
# capacity给出了队列的最大容量。 
# 当队列长度等于最大容量时，Tensorflow将暂停入队操作，而只是等待元素出队；当元素个数小于容量时，将自动重新启动入队操作
example_batch, label_batch = tf.train.batch([example, label], batch_size=batch_size, capacity=capacity)

# 使用tf.train.shuffle_batch组合样例
# min_after_dequeue参数限制类出队列时队列中元素的最少个数
# 当队列中元素个数太少时，随机打乱样例的顺序作用就不大
# num_threads参数，可以指定多个线程同时执行入队操作。当大于1时，多个线程会同时去读一个文件中的不同样例并进行预处理（重要）
# example_batch, label_batch = tf.train.shuffle_batch([example, label], 
#                                                     batch_size=batch_size, capacity=capacity, min_after_dequeue==30)

# 使用tf.train.shuffle_batch_join组合样例
# 此函数会从输入文件队列中获取不同的文件分配给不同的线程

with tf.Session() as sess:
    tf.initialize_all_variables().run()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # 获取并打印组合之后的样例
    for i in range(2):
        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])
        print 'cur_example_batch: ', cur_example_batch,' ,cur_label_batch: ', cur_label_batch
        
    coord.request_stop()
    coord.join(threads)




example:  Tensor("ParseSingleExample_2/Squeeze_i:0", shape=(), dtype=int64)  ,label:  Tensor("ParseSingleExample_2/Squeeze_j:0", shape=(), dtype=int64)
cur_example_batch:  [0 0 1]  ,cur_label_batch:  [0 1 0]
cur_example_batch:  [0 1 1]  ,cur_label_batch:  [1 1 1]
