In [10]:
import tensorflow as tf

## tensorflow数据读取机制中内存队列，文件名队列详解见[十图详解tensorflow数据读取机制](https://zhuanlan.zhihu.com/p/27238630)

## 多线程与queue（内存队列）
Tensorflow中提供了FIFOQueue和RandomShuffleQueue两种内存队列。

In [11]:
# 创建一个先进先出队列，指定队列中最多可以保存两个元素，并指定类型
q = tf.FIFOQueue(2, dtypes='int32')

# 创建一个随机队列，它会将队列中元素打乱，每次出队操作得到的是从当前队列
# 所有元素中随机挑选的一个，必须传入min_after_dequeue（队列中最少元素个数）参数
# q = tf.RandomShuffleQueue(2, min_after_dequeue=0, dtypes='int32')

# 使用enqueue_many初始化函数，使用队列之前必须明确调用初始化
init = q.enqueue_many(([0, 10], ))

# 使用dequeue函数出队
x = q.dequeue()

y = x + 1

# 重新入队
q_inc = q.enqueue([y])

with tf.Session() as sess:
    # 运行初始化队列操作
    init.run()
    for _ in range(5):
        v, _  = sess.run([x, q_inc])
        print(v)

0
10
1
11
2


Tensorflow提供了**tf.Coordinator**和**tf.QueueRunner**两个类来完成多线程协同的功能。**tf.Coordinator**主要用于协同多个线程一起停止，并提供了**should_stop**，**request_stop**和**join**三个函数。

启动线程之前，需要声明一个`tf.Coordinator`类，并将这个类传入每一个创建的建成中。启动的线程需要一直查询`tf.Coordinator`类中提供的`should_stop`函数，当这个函数的返回值为True时，当前线程退出。每一个启动的线程都可以通过调用`request_stop`函数通知其他线程退出。

当一个线程调用`request_stop`函数后，`should_stop`函数的返回值被设置为True。这样其他线程就可以同时终止。

In [12]:
import tensorflow as tf
import numpy as np
import threading
import time

# 线程中运行的程序，该程序每隔1s判断是否需要停止并打印自己的ID
def my_loop(coord, worker_id):
    # 使用tf.Coordinator类提供的协同工具判断当前线程是否需要停止
    while not coord.should_stop():
        # 随机停止所有线程
        if(np.random.rand() < 0.1):
            print('Stoping from id: %d\n' % worker_id)
            # 调用coord.request_stop()通知其他线程停止
            coord.request_stop()
        else:
            # 打印当前线程id
            print('Working on id: %d\n' % worker_id)
        time.sleep(1)
        
# 声明一个tf.train.Coordinator类协同多个线程
coord = tf.train.Coordinator()
# 创建5个线程
threads = [threading.Thread(target=my_loop, args=(coord, i)) for i in range(5)]
# 启动所有线程
for t in threads:
    t.start()

# 等待所有线程退出
coord.join(threads)

Working on id: 1
Working on id: 0
Working on id: 2


Working on id: 3
Working on id: 4



Working on id: 0

Working on id: 1

Working on id: 2

Working on id: 3

Working on id: 4

Working on id: 0

Working on id: 1

Working on id: 2

Working on id: 3

Working on id: 4

Working on id: 0

Working on id: 1

Working on id: 2

Working on id: 3

Working on id: 4

Working on id: 0

Working on id: 1

Working on id: 2

Working on id: 3

Working on id: 4

Working on id: 0

Working on id: 1

Working on id: 2

Working on id: 3

Working on id: 4

Working on id: 0

Working on id: 1

Stoping from id: 2



**tf.QueueRunner**用于启动多个线程操作同一个队列，启动的这些线程可以通过`tf.Coordinator`统一管理。

以下程序利用多线程将随机数入队，单线程出队打印。

In [1]:
import tensorflow as tf

queue = tf.FIFOQueue(100, 'float')
# 定义队列的入队操作，tf.random_normal()从正态分布输出随机值
enqueue_op = queue.enqueue([tf.random_normal([1])])

# 创建多个线程运行队列的入队操作，queue是被操作的队列，[enqueue_op] * 5表示启动5个
# 线程，每个线程中运行的是enqueue_op操作
qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)

# 将定义过的QueueRunner加入Tensorflow计算图上指定的集合
# tf.train.add_queue_runner函数没有指定集合
# 则加入默认集合tf.GraphKeys.QUEUE_RUNNERS
tf.train.add_queue_runner(qr)
# 定义出队操作
out_tensor = queue.dequeue()

with tf.Session() as sess:
    # 使用tf.train.Coordinator协同启动的线程
    coord = tf.train.Coordinator()
    # 使用tf.train.QueueRunner时，需要明确调用tf.train.start_queue_runners
    # 启动所有线程。tf.train.start_queue_runners函数默认启动tf.GraphKeys.QUEUE_RUNNERS
    # 集合中所有的QueueRunner。
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 获取队列中的取值
    for _ in range(3):
        print(sess.run(out_tensor)[0])
    
    # 读取完值后，使用tf.train.Coordinator停止所有线程
    coord.request_stop()
    coord.join(threads)

1.67256
-0.947661
2.3614


## 输入文件名队列
**tf.train.string_input_producer**函数会使用初始化时提供的文件列表创建一个**文件名队列**。

通过设置**shuffle**参数，可以打乱文件名队列中的顺序。当`shuffle`参数为True时，文件在加入文件名队列之前就会被打乱顺序。`tf.train.string_input_producer`生成的文件名队列可以同时被多个文件读取线程操作，而且会将队列中的文件均匀地分给不同线程。

通过设置**num_epochs**参数限制文件名队列加载文件列表的最大轮数。

In [15]:
import os
import tensorflow as tf

data_path = os.path.join('.', 'data')
if(not os.path.exists(data_path)):
    os.mkdir(data_path)

# 模拟海量数据情况下将数据写入不同的文件。num_shards定义了总共写入多少个TFRecords文件。
# instatnces_per_shard定义了每个TFRcords文件中有多少个数据。
num_shards = 2
instances_per_shard = 2

for i in range(num_shards):
    # 将数据分为多个文件时，可以将不同文件以类似0000n-of-0000m的前缀区分，m表示TFRecords数据
    # 总个数（2个），n表示第几个TFRecords数据
    filename = os.path.join(data_path, 'data_000%d_of_000%d.tfrecords' % (i, num_shards))
    writer = tf.python_io.TFRecordWriter(filename)
    # 将数据封装为Example结构写入TFRecord文件。这里instances_per_shard=2，意味着每个TFRecords中
    # example的数量为2。第一个example中'i'的值为i,'j'的值为0，第二个example中'i'的值为i，
    # 'j'的值为1 
    for j in range(instances_per_shard):
        example = tf.train.Example(features=tf.train.Features(feature={
            'i': tf.train.Feature(int64_list=tf.train.Int64List(value=[i])),
            'j': tf.train.Feature(int64_list=tf.train.Int64List(value=[j]))
        }))
        writer.write(example.SerializeToString())
    writer.close()

使用`tf.train.match_filenames_once`和`tf.train.string_input_producer`读取TFRecrods中的数据。

使用`tf.train.match_filenames_once`创建文件名列表读取TFRecords数据时，一定要进行**变量初始化**。

```python
...
init = (tf.global_variables_initializer(), tf.local_variables_initializer())

with tf.Session() as sess:
   sess.run(init)
   ... 
```

In [10]:
import os
import tensorflow as tf

data_path = os.path.join('.', 'data')

# 要读取train.match_filenames_once中的结果，在run之前需要先调用
# global_variables_initializer()和local_variables_initializer()初始化。
# train.match_filenames_once中的参数是一个正则表达式pattern，用以匹配文件名，
# 例如这里传入的参数是./data/*，会匹配data中所有文件。
# 它会返回一个文件名列表，其中每个元素是bytes格式的文件名。
filenames = tf.train.match_filenames_once(os.path.join(data_path, '*'))

# 如果不给定num_epochs参数值，则会一直生成下去（类似于python generator）
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)

features = tf.parse_single_example(
    serialized_example,
    features={
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    }
)

init = (tf.global_variables_initializer(), tf.local_variables_initializer())
with tf.Session() as sess:
    sess.run(init)
    
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # 多次执行获取数据的操作
    for _ in range(6):
        # 打印出来的结果，左边一列是sample值，右边一列是label值
        print(sess.run([features['i'], features['j']]))
    
    coord.request_stop()
    coord.join(threads)

[0, 0]
[0, 1]
[1, 0]
[1, 1]
[0, 0]
[0, 1]


## 组合训练数据（batching）
Tensorflow提供了**tf.train.batch**和**tf.train.shuffle_batch**函数将单个样例组织成batch形式输出。这两个函数会生成一个**队列**，队列的入队操作是**生成单个sample**的方法，而每次出队得到的是一个**batch**。

batching队列同文件名队列一样，使用之前必须**初始化变量**。

下面一个cell中展示的时`tf.train.batch`的使用方法。

In [15]:
import os
import tensorflow as tf

data_path = os.path.join('.', 'data')

# 要读取train.match_filenames_once中的结果，在run之前需要先调用
# global_variables_initializer()和local_variables_initializer()初始化。
# train.match_filenames_once返回一个文件名列表，其中每个元素是bytes格式的文件名
filenames = tf.train.match_filenames_once(os.path.join(data_path, '*'))

# 如果不给定num_epochs参数值，则会一直生成下去（类似于python generator）
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)

features = tf.parse_single_example(
    serialized_example,
    features={
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    }
)

# ------ 以上是上一节中匹配文件名并创建读取TFRecorder op的代码 ------

# 使用上一节中的Example读取到的数据。假设i表示一个sample的特征向量，
# 比如一张图像的像素矩阵；j表示该样例对应的label。
example, label = features['i'], features['j']

batch_size = 3
# capacity参数是batch queue中元素个数的最大值
capacity = 1000 + 3 * batch_size

example_batch, label_batch = tf.train.batch([example, label], batch_size=batch_size, capacity=capacity)

init = (tf.global_variables_initializer(), tf.local_variables_initializer())

with tf.Session() as sess:
    
    sess.run(init)
    
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    for i in range(2):
        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])
        # batch为3，一个列表中是3个example值或3个label值
        print(cur_example_batch, cur_label_batch)
        
    coord.request_stop()
    coord.join(threads)

[0 0 1] [0 1 0]
[1 0 0] [1 0 1]


下面一个cell中展示了`tf.train.shuffle_batch`的使用方法。

In [20]:
import os
import tensorflow as tf

data_path = os.path.join('.', 'data')

# 要读取train.match_filenames_once中的结果，在run之前需要先调用
# global_variables_initializer()和local_variables_initializer()初始化。
# train.match_filenames_once返回一个文件名列表，其中每个元素是bytes格式的文件名
filenames = tf.train.match_filenames_once(os.path.join(data_path, '*'))

# 如果不给定num_epochs参数值，则会一直生成下去（类似于python generator）
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)

features = tf.parse_single_example(
    serialized_example,
    features={
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    }
)

# ------ 以上是上一节中匹配文件名并创建读取TFRecorder op的代码 ------

example, label = features['i'], features['j']

# tf.train.shuffle_batch中，min_after_dequeue参数是特有的。min_after_dequeue参数限制
# 了出队队列中元素的最少个数。当队列中元素太少时，随机打乱sample顺序的作用不大。当出
#队函数被调用但是队列中元素不够时，出队操作会等待更多元素入队才会完成。

batch_size = 3
capacity = 1000 + batch_size * 3

example_batch, label_batch = tf.train.shuffle_batch(
    [example, label], batch_size=batch_size,
    capacity=capacity, min_after_dequeue=30
)

init = (tf.global_variables_initializer(), tf.local_variables_initializer())

with tf.Session() as sess:
    
    sess.run(init)
    
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    for i in range(2):
        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])
        # batch为3，一个列表中是3个example值或3个label值。可以看到sample和对应label的
        # 顺序已经被打乱了
        print(cur_example_batch, cur_label_batch)
        
    coord.request_stop()
    coord.join(threads)

<class 'tensorflow.python.framework.ops.Tensor'>
[1 0 0] [1 1 1]
[1 1 0] [1 1 1]


`tf.train.batch`和`tf.train.shuffle_batch`也提供了并行化处理输入数据的方法。通过设置`tf.train.shuffle_batch`（`tf.train.batch`同理）中的**num_thread**参数，可以指定多个线程同时执行入队操作。这样多个线程会同时读取**一个文件**中的不同**sample**进行预处理。

如果需要多个线程处理**不同文件**中的sample时，可以使用**tf.train.shuffle_batch_join**（不打乱顺序使用**tf.train.batch_join**）。

+ **tf.train.shuffle_batch**和**tf.train.shuffle_batch_join**的优劣：

   + **tf.train.shuffle_batch**中不同线程会读取同一个文件，如果一个文件中的sample比较相似，那么神经网络训练效果会有影响。所有使用`tf.train.shuffle_batch`前创建TFRecord文件时就要把sample随即打乱。
   + **tf.train.shuffle_batch_join**中不同线程会读取不同文件，如果读取的线程数比总文件数还大，那么多个线程会读取同一个文件中相近部分的数据。而且多个线程读取多个文件可能会导致过多的硬盘寻址。从而使读取效率降低。

## 输入数据处理框架

将以上内存队列、文件名队列、batch队列组合处理输入数据。

假设TFRecord文件全部放置在`/data/`目录中。TFRecord文件中每个example存储的是一张图片的bytes。格式如下：
```python
example = tf.train.Example(features=tf.train.Features(feature={
    'image': tf.train.Feature(int64_list=tf.train.BytesList(value=[image_raw])),
    'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),
    'height': tf.train.Feature(int64_list=tf.train.Int64List(value=[height])),
    'width': tf.train.Feature(int64_list=tf.train.Int64List(value=[weight])),
    'channels': tf.train.Feature(int64_list=tf.train.Int64List(value=[channels])),
}))
```

**注：案例中没有原始数据，不能直接运行。需要根据以上格式处理源数据并放置到`/data/`目录中方可运行。另外引入了preprocess模块。**

In [None]:
import os
import tensorflow as tf

import preprocess

# TRAINING_THREADS是并行的线程个数
TRAINING_THREADS = 8

# 创建文件名队列。在调用输入数据处理流程前，需要统一所有原始文件格式并存储到TFRecord
# 文件中。并放置到/data/目录中。

data_path = os.path.join('.', 'data')

filenames = tf.train.match_filenames_once(os.path.join(data_path, '*'))
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)

# 使用TFRecordReader读取并解析TFRecords数据。
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
    serialized_example,
    features={
        'image': tf.FixedLenFeature([], tf.string),
        'label': tf.FixedLenFeature([], tf.int64),
        'height': tf.FixedLenFeature([], tf.int64),
        'width': tf.FixedLenFeature([], tf.int64),
        'channels': tf.FixedLenFeature([], tf.int64),
    }
)

image, label = features['image'], features['label']
height, width = features['height'], features['width']
channels = features['channels']

# 从原始图像数据解析出像素矩阵，并根据图像尺寸还原图像
decode_image = tf.decode_raw(image, tf.uint8)
decode_image.set_shape([height, width, channels])

# 定义神经网络输入层图片的大小
image_size = 299

# 图像预处理，调用preprocess.py中的preprocess_for_train函数。
distorted_image = preprocess.preprocess_for_train(decoded_image, image_size, image_size, None)

# 将处理后的图像和标签数据通过tf.train.shuffle_batch整理成神经网络训练时需要的batch
min_after_dequeue = 10000
batch_size = 100
capacity = min_after_dequeue + 3 * batch_size
image_batch, label_batch = tf.train.shuffle_batch(
    [distorted_image, label], batch_size=batch_size,
    capacity=capacity, min_after_dequeue=min_after_dequeue
)

# 定义神经网络的结构以及优化过程。image_batch可以作为输入提供给神经网络的输入层。
# label_batch则提供了输入batch中对应的label
logit = inference(image_batch)
loss = calc_loss(logit, label_batch)
train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)

# 初始化器
init = (tf.global_variables_initializer(), tf.local_variables_initializer())

with tf.Session() as sess:
    sess.run(init)
    
    # 创建线程协同器
    coord = tf.train.Coordinator()
    # 启动多线程
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # 训练
    for i range(TRAINING_THREADS):
        sess.run(train_step)
    
    # 停止所有线程
    coord.request_stop()
    coord.join(threads)