#Tensorflow 数据处理方式

## 使用流水线并行读取数据


步骤是：<br/>
1、创建文件名列表<br/>
2、创建文件名队列<br/>
3、创建Reader 和 Decoder <br/>
4、创建样例队列<br/>

### 1、读取csv文件

In [1]:
import tensorflow as tf
import os
import features.utils as utils



#创建文件名队列
file_path = os.path.join(utils.localDir(), '../data/csv_read_test/')
filename_queue = tf.train.string_input_producer(
    [os.path.join(file_path, 'file_0.csv'),
     os.path.join(file_path, 'file_1.csv'),
     os.path.join(file_path, 'file_2.csv')], num_epochs=1, shuffle=True)

# 创建读取csv的读取器
reader = tf.TextLineReader(skip_header_lines=1)
# 读取第一条数据
# key, value = reader.read(filename_queue)
_, value = reader.read_up_to(filename_queue, 2)
# 设置默认值
id, age, income, outgo = tf.decode_csv(value, record_defaults=[[0], [0], [0], [0]], field_delim=",")
# 将所有特征向量组合为 一条记录
features = tf.stack([id, age, income, outgo])
with tf.Session() as sess:
    sess.run(tf.local_variables_initializer())
    # sess.run(tf.global_variables_initializer())
    tf.train.start_queue_runners()
    for i in range(3):
        print(sess.run(features))


[[   4    5]
 [  23   24]
 [2048 4096]
 [1024 2048]]
[[   2    3]
 [  23   24]
 [2048 4096]
 [1024 2048]]
[[   0    1]
 [  23   24]
 [2048 4096]
 [1024 2048]]


### 2、读取和存储TFRecords

In [2]:
# 写入文件
write = tf.python_io.TFRecordWriter(os.path.join(utils.localDir(), '../data/csv_read_test/stat.tfrecord'))
for i in range(1, 3):
    #创建样例
    example = tf.train.Example(features=tf.train.Features(
        feature={
            "id": tf.train.Feature(int64_list=tf.train.Int64List(value=[i])),
            "age": tf.train.Feature(int64_list=tf.train.Int64List(value=[i * 24])),
            "income": tf.train.Feature(float_list=tf.train.FloatList(value=[i * 2048.0])),
            "outgo": tf.train.Feature(float_list=tf.train.FloatList(value=[i * 1024.0])),
        }
    ))
    write.write(example.SerializeToString())
write.close()


In [3]:
# 读取文件（异常捕捉）
filename_queue = tf.train.string_input_producer([os.path.join(utils.localDir(), '../data/csv_read_test/stat.tfrecord')],
                                                num_epochs=2)
reader = tf.TFRecordReader()
# 取出一条序列化的样例
_, example = reader.read(filename_queue)
features = tf.parse_single_example(example,
                                   features={
                                       "id": tf.FixedLenFeature([], tf.int64),
                                       "age": tf.FixedLenFeature([], tf.int64),
                                       "income": tf.FixedLenFeature([], tf.float32),
                                       "outgo": tf.FixedLenFeature([], tf.float32),
                                   })
sess = tf.Session()
# 使用 Coordinator 需要对local_variables_initializer 进行初始化
sess.run(tf.local_variables_initializer())
# 创建协调器
coord = tf.train.Coordinator()

threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
    for i in range(10):
        if not coord.should_stop():
            print(sess.run(features))
except tf.errors.OutOfRangeError:
    print("Catch OutOfRangeError")
finally:
    # 请求停止所有后台线程
    coord.request_stop()
    print("Finish reading")
# 等待所有后台程序安全退出
coord.join(threads)
sess.close()


{'age': 24, 'id': 1, 'income': 2048.0, 'outgo': 1024.0}
{'age': 48, 'id': 2, 'income': 4096.0, 'outgo': 2048.0}
{'age': 24, 'id': 1, 'income': 2048.0, 'outgo': 1024.0}
{'age': 48, 'id': 2, 'income': 4096.0, 'outgo': 2048.0}
Catch OutOfRangeError
Finish reading


### 3、创建批样例数据


In [4]:
#创建文件名队列
file_path = os.path.join(utils.localDir(), '../data/csv_read_test/')
filename_queue = tf.train.string_input_producer(
    [os.path.join(file_path, 'file_0.csv'),
     os.path.join(file_path, 'file_1.csv'),
     os.path.join(file_path, 'file_2.csv')], num_epochs=3, shuffle=True)

# 创建读取csv的读取器
reader = tf.TextLineReader(skip_header_lines=1)
# 读取第一条数据
_, value = reader.read(filename_queue)
# 设置默认值
id, age, income, outgo = tf.decode_csv(value, record_defaults=[[0], [0], [0], [0]], field_delim=",")
# 将所有特征向量组合为 一条记录
features = tf.stack([id, age, income, outgo])
example_batch = tf.train.shuffle_batch([features], batch_size=6, capacity=18, min_after_dequeue=6)
# 初始化
sess = tf.Session()
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
sess.run(init_op)
# 创建协调器
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

try:
    for _ in range(1000):
        if not coord.should_stop():
            print(sess.run(example_batch))
except tf.errors.OutOfRangeError:
    print("Catch out of range")
finally:
    coord.request_stop()
    print("Finish reading")
coord.join(threads)
sess.close()

[[   0   23 2048 1024]
 [   5   24 4096 2048]
 [   5   24 4096 2048]
 [   1   24 4096 2048]
 [   2   23 2048 1024]
 [   3   24 4096 2048]]
[[   1   24 4096 2048]
 [   4   23 2048 1024]
 [   4   23 2048 1024]
 [   0   23 2048 1024]
 [   4   23 2048 1024]
 [   2   23 2048 1024]]
[[   5   24 4096 2048]
 [   2   23 2048 1024]
 [   3   24 4096 2048]
 [   3   24 4096 2048]
 [   0   23 2048 1024]
 [   1   24 4096 2048]]
Catch out of range
Finish reading
