## Reading the data

### Feed Dict

- read from memory with `tf.placeholder`

### Async Queue
- read from external disk async
- `tf.train.string_input_producer(num_epochs, shuffle=True)`
- `tf.train.start_queue_runners`
- `tf.errors.OutOfRangeError`
![](https://pic3.zhimg.com/50/v2-3cd597df7e855af6d59ff60af6b13cb2_hd.jpg)

### DataSet API

- read from both memory 
- Dataset可以看作是相同类型“元素”的有序列表。在实际使用时，单个“元素”可以是向量，也可以是字符串、图片，甚至是tuple或者dict
- `tf.data.Dataset.from_tensor_slices`
- `tf.data.TextLineDataset`: 这个函数的输入是一个文件的列表，输出是一个dataset。dataset中的每一个元素就对应了文件中的一行。可以使用这个函数来读入CSV文件。
- `tf.data.FixedLengthRecordDataset`：这个函数的输入是一个文件的列表和一个record_bytes，之后dataset的每一个元素就是文件中固定字节数record_bytes的内容。通常用来读取以二进制形式保存的文件，如CIFAR10数据集就是这种形式。
- `tf.data.TFRecordDataset()`：顾名思义，这个函数是用来读TFRecord文件的，dataset中的每一个元素就是一个TFExample。
- `tf.errors.OutOfRangeError`

![](https://pic2.zhimg.com/50/v2-f9f42cc5c00573f7baaa815795f1ce45_hd.jpg)


#### Iterator

- one shot itrator
- initializable iterator:  使用initializable iterator，可以将placeholder代入Iterator中，这可以方便我们通过参数快速定义新的Iterator
- reinitializable iterator
- feedable iterator


### Transformation with DataSet API

- 一个Dataset通过Transformation变成一个新的Dataset。通常我们可以通过Transformation完成数据变换，打乱，组成batch，生成epoch等一系列操作
- 常用的Transformation有：
  - map
  - batch
  - shuffle
  - repeat

## Demo

### Queue API

In [1]:
import tensorflow as tf
with tf.Session() as sess:
    filename = ['../data/A.jpg', '../data/C.jpg', '../data/D.jpg']
    filename_queue = tf.train.string_input_producer(filename, num_epochs=3, shuffle=True)
    # reader从文件名队列中读数据。对应的方法是reader.read
    reader = tf.WholeFileReader()
    # filename, content
    key, value = reader.read(filename_queue)
    # tf.train.string_input_producer定义了一个epoch变量，要对它进行初始化
    tf.local_variables_initializer().run()
    threads = tf.train.start_queue_runners(sess=sess)
    i = 0
    while True:
        i += 1
        # 获取图片数据并保存
        image_data = sess.run(value)
        with open('../data/test_%d.jpg' % i, 'wb') as f:
            f.write(image_data)


  from ._conv import register_converters as _register_converters


OutOfRangeError: FIFOQueue '_0_input_producer' is closed and has insufficient elements (requested 1, current size 0)
	 [[Node: ReaderReadV2 = ReaderReadV2[_device="/job:localhost/replica:0/task:0/device:CPU:0"](WholeFileReaderV2, input_producer)]]

Caused by op 'ReaderReadV2', defined at:
  File "/usr/lib/python3.5/runpy.py", line 184, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.5/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "/usr/local/lib/python3.5/dist-packages/traitlets/config/application.py", line 658, in launch_instance
    app.start()
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/kernelapp.py", line 477, in start
    ioloop.IOLoop.instance().start()
  File "/usr/local/lib/python3.5/dist-packages/zmq/eventloop/ioloop.py", line 177, in start
    super(ZMQIOLoop, self).start()
  File "/usr/local/lib/python3.5/dist-packages/tornado/ioloop.py", line 888, in start
    handler_func(fd_obj, events)
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
    self._handle_recv()
  File "/usr/local/lib/python3.5/dist-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
    self._run_callback(callback, msg)
  File "/usr/local/lib/python3.5/dist-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
    callback(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/kernelbase.py", line 235, in dispatch_shell
    handler(stream, idents, msg)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/ipkernel.py", line 196, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/zmqshell.py", line 533, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/IPython/core/interactiveshell.py", line 2728, in run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "/usr/local/lib/python3.5/dist-packages/IPython/core/interactiveshell.py", line 2850, in run_ast_nodes
    if self.run_code(code, result):
  File "/usr/local/lib/python3.5/dist-packages/IPython/core/interactiveshell.py", line 2910, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-1-0c1148731a07>", line 7, in <module>
    key, value = reader.read(filename_queue)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/ops/io_ops.py", line 195, in read
    return gen_io_ops._reader_read_v2(self._reader_ref, queue_ref, name=name)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/ops/gen_io_ops.py", line 673, in _reader_read_v2
    queue_handle=queue_handle, name=name)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/framework/ops.py", line 2956, in create_op
    op_def=op_def)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/framework/ops.py", line 1470, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

OutOfRangeError (see above for traceback): FIFOQueue '_0_input_producer' is closed and has insufficient elements (requested 1, current size 0)
	 [[Node: ReaderReadV2 = ReaderReadV2[_device="/job:localhost/replica:0/task:0/device:CPU:0"](WholeFileReaderV2, input_producer)]]


In [2]:
print(tf.train.string_input_producer.__doc__)

Output strings (e.g. filenames) to a queue for an input pipeline.

  Note: if `num_epochs` is not `None`, this function creates local counter
  `epochs`. Use `local_variables_initializer()` to initialize local variables.

  Args:
    string_tensor: A 1-D string tensor with the strings to produce.
    num_epochs: An integer (optional). If specified, `string_input_producer`
      produces each string from `string_tensor` `num_epochs` times before
      generating an `OutOfRange` error. If not specified,
      `string_input_producer` can cycle through the strings in `string_tensor`
      an unlimited number of times.
    shuffle: Boolean. If true, the strings are randomly shuffled within each
      epoch.
    seed: An integer (optional). Seed used if shuffle == True.
    capacity: An integer. Sets the queue capacity.
    shared_name: (optional). If set, this queue will be shared under the given
      name across multiple sessions. All sessions open to the device which has
      this queue

In [3]:
tf.train.slice_input_producer?

In [None]:
tf.WholeFileReader?

### DataSet API

In [46]:
# 切分传入Tensor的第一个维度，生成相应的dataset
dataset = tf.data.Dataset.from_tensor_slices(np.array([1.0, 2.0, 3.0, 4.0, 5.0]))
dataset = dataset.map(lambda x: x + 1)
dataset = dataset.shuffle(buffer_size=10000, reshuffle_each_iteration=True)
# https://github.com/tensorflow/tensorflow/issues/14857
# dataset.shuffle?
dataset = dataset.batch(2)
# epochs
dataset = dataset.repeat(5)
iterator = dataset.make_one_shot_iterator()

# Returns a nested structure of `tf.Tensor`s containing the next element.
one_element = iterator.get_next()
with tf.Session() as sess:
    for i in range(16):
        print(sess.run(one_element))

[4. 3.]
[5. 6.]
[2.]
[4. 6.]
[5. 2.]
[3.]
[5. 6.]
[4. 3.]
[2.]
[6. 5.]
[2. 4.]
[3.]
[6. 5.]
[3. 4.]
[2.]


OutOfRangeError: End of sequence
	 [[Node: IteratorGetNext_27 = IteratorGetNext[output_shapes=[[?]], output_types=[DT_DOUBLE], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator_27)]]

Caused by op 'IteratorGetNext_27', defined at:
  File "/usr/lib/python3.5/runpy.py", line 184, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.5/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "/usr/local/lib/python3.5/dist-packages/traitlets/config/application.py", line 658, in launch_instance
    app.start()
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/kernelapp.py", line 477, in start
    ioloop.IOLoop.instance().start()
  File "/usr/local/lib/python3.5/dist-packages/zmq/eventloop/ioloop.py", line 177, in start
    super(ZMQIOLoop, self).start()
  File "/usr/local/lib/python3.5/dist-packages/tornado/ioloop.py", line 888, in start
    handler_func(fd_obj, events)
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
    self._handle_recv()
  File "/usr/local/lib/python3.5/dist-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
    self._run_callback(callback, msg)
  File "/usr/local/lib/python3.5/dist-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
    callback(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/kernelbase.py", line 235, in dispatch_shell
    handler(stream, idents, msg)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/ipkernel.py", line 196, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "/usr/local/lib/python3.5/dist-packages/ipykernel/zmqshell.py", line 533, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/IPython/core/interactiveshell.py", line 2728, in run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "/usr/local/lib/python3.5/dist-packages/IPython/core/interactiveshell.py", line 2850, in run_ast_nodes
    if self.run_code(code, result):
  File "/usr/local/lib/python3.5/dist-packages/IPython/core/interactiveshell.py", line 2910, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-46-cfc112af2b70>", line 13, in <module>
    one_element = iterator.get_next()
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/data/ops/iterator_ops.py", line 259, in get_next
    name=name))
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/ops/gen_dataset_ops.py", line 706, in iterator_get_next
    output_shapes=output_shapes, name=name)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/framework/ops.py", line 2956, in create_op
    op_def=op_def)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/framework/ops.py", line 1470, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

OutOfRangeError (see above for traceback): End of sequence
	 [[Node: IteratorGetNext_27 = IteratorGetNext[output_shapes=[[?]], output_types=[DT_DOUBLE], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator_27)]]


In [15]:
with tf.Session() as sess:
    try:
        while True:
            print(sess.run(one_element))
    except tf.errors.OutOfRangeError:
        print('end')

1.0
2.0
3.0
4.0
5.0
end


In [17]:
import tensorflow.contrib.eager as tfe
tfe.enable_eager_execution()

dataset = tf.data.Dataset.from_tensor_slices(np.array([1.0, 2.0, 3.0, 4.0, 5.0]))

for one_element in tfe.Iterator(dataset):
    print(one_element)

AttributeError: module 'tensorflow.contrib.eager' has no attribute 'enable_eager_execution'

In [21]:
# tf.data.Dataset.from_tensor_slices就会切分它形状上的第一个维度，最后生成的dataset中一个含有5个元素，每个元素的形状是(2, )，即每个元素是矩阵的一行
dataset = tf.data.Dataset.from_tensor_slices(np.random.uniform(size=(5, 2)))

# 在图像识别问题中，一个元素可以是{"image": image_tensor, "label": label_tensor}的形式，这样处理起来更方便
dataset = tf.data.Dataset.from_tensor_slices(
    {
        "a": np.array([1.0, 2.0, 3.0, 4.0, 5.0]),                                       
        "b": np.random.uniform(size=(5, 2))
    }
)
# 这时函数会分别切分"a"中的数值以及"b"中的数值，最终dataset中的一个元素就是类似于{"a": 1.0, "b": [0.9, 0.1]}的形式。

In [22]:
tf.data.Dataset.from_tensor_slices?

In [47]:
dataset = tf.data.Dataset.from_tensor_slices(
  (np.array([1.0, 2.0, 3.0, 4.0, 5.0]), np.random.uniform(size=(5, 2)))
)
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
print(iterator.output_shapes)
with tf.Session() as sess:
    for i in range(5):
        print(sess.run(one_element))

(TensorShape([]), TensorShape([Dimension(2)]))
(1.0, array([0.13130811, 0.1273082 ]))
(2.0, array([0.45789475, 0.40324293]))
(3.0, array([0.18529983, 0.31733896]))
(4.0, array([0.18864479, 0.42943603]))
(5.0, array([0.40007379, 0.92600191]))


In [48]:
# 读入磁盘中的图片和图片相应的label，并将其打乱，组成batch_size=32的训练样本。在训练时重复10个epoch


In [56]:
limit = tf.placeholder(dtype=tf.int32, shape=[])
dataset = tf.data.Dataset.from_tensor_slices(tf.range(start=0, limit=limit))
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
with tf.Session() as sess:
    # initialize the iterator
    sess.run(iterator.initializer, feed_dict={limit: 10})
    for i in range(10):
      value = sess.run(next_element)
      assert i == value

在使用tf.data.Dataset.from_tensor_slices(array)时，实际上发生的事情是**将array作为一个tf.constants保存到了计算图中**。当array很大时，会导致计算图变得很大，给传输、保存带来不便。这时，我们可以用一个placeholder取代这里的array，并使用initializable iterator，只在需要时将array传进去，这样就可以避免把大数组保存在图里，

In [None]:
with np.load("/var/data/training_data.npy") as data:
  features = data["features"]
  labels = data["labels"]

features_placeholder = tf.placeholder(features.dtype, features.shape)
labels_placeholder = tf.placeholder(labels.dtype, labels.shape)

dataset = tf.data.Dataset.from_tensor_slices((features_placeholder, labels_placeholder))
iterator = dataset.make_initializable_iterator()
sess.run(iterator.initializer, feed_dict={features_placeholder: features,
                                          labels_placeholder: labels})

In [2]:
from bp.dl.dataset import make_one_shot_iterator
filenames = ['../data/A.jpg', '../data/C.jpg', '../data/D.jpg']
labels = [1, 2, 3]
one_element = make_one_shot_iterator(filenames, labels, batchsize=3, epochsize=3)
with tf.Session() as sess:
    print(sess.run(one_element))

ValueError: 'images' contains no shape.