## Input Pipeline
-----
 In input pipeline, multiple threats can help us reduce the bottleneck at the reading in data phase because reading in data is a lot of waiting. For example, in using queues to prepare inputs for training a model, we have:
* Multiple threads prepare training examples and push them in the queue.
* A training thread executes a training op that dequeues mini-batches from the queue

Three concepts for multi-thread programming in Tensorflow
* Queue: threads operate queues.
* QueueRunner: the wrapper of threads.
* Coordinator: coordinate all threads.
Please check [threading and queues](https://www.tensorflow.org/programmers_guide/threading_and_queues) for details.
All related interfaces can be found from [inputs and readers](https://www.tensorflow.org/api_guides/python/io_ops).

Example:

In [None]:
import tensorflow as tf
import numpy as np

N_SAMPLES = 1000
NUM_THREADS = 4

# Generating some simple data
# create 1000 random samples, each is a 1D array from the normal distribution (10, 1)
data = 10 * np.random.randn(N_SAMPLES, 4) + 1

# create 1000 random labels of 0 and 1
target = np.random.randint(0, 2, size=N_SAMPLES)
queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])
enqueue_op = queue.enqueue_many([data, target])
dequeue_op = queue.dequeue()

# create NUM_THREADS to do enqueue
qr = tf.train.QueueRunner(queue, [enqueue_op] * NUM_THREADS)
with tf.Session() as sess:
    # Create a coordinator, launch the queue runner threads.
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
    for step in range(100): # do to 100 iterations
        if coord.should_stop():
            break
        data_batch, label_batch = sess.run(dequeue_op)
        #print(data_batch)
        #print(label_batch)
    coord.request_stop()
    coord.join(enqueue_threads)

You also don’t need to use tf.Coordinator with TensorFlow queues, but can use it to manage threads of any thread you create. For example, you use the Python package threading to create threads to do some crazy job, you can still use tf.Coordinator to manage these threads too. The syntax of target and args are similar to the classic threadpool. For more details on threading, you
should take CS 110. The example below is from TensorFlow documentation.
```python
import threading
# thread body: loop until the coordinator indicates a stop was requested.
# if some condition becomes true, ask the coordinator to stop.
def my_loop(coord):
    while not coord.should_stop():
        ...do something...
    if ...some condition...:
        coord.request_stop()
        
# main code: create a coordinator.
coord = tf.Coordinator()

# create 10 threads that run 'my_loop()'
# you can also create threads using QueueRunner as the example above
threads = [threading.Thread(target=my_loop, args=(coord,)) for _ in xrange(10)]

# start the threads and wait for all of them to stop.
for t in threads:
    t.start()
coord.join(threads)
```

## Data Readers
----
We have talked about this title in [manage experiments](https://github.com/AppleFairy/CS20SI-Tensorflow-for-Deep-Learning-Research/blob/master/manage-experiments.ipynb)
```python
tf.TextLineReader
Outputs the lines of a file delimited by newlines
E.g. text files, CSV files

tf.FixedLengthRecordReader
Outputs the entire file when all files have same fixed lengths
E.g. each MNIST file has 28 x 28 pixels, CIFAR-10 32 x 32 x 3

tf.WholeFileReader
Outputs the entire file content. This is useful when each file contains a sample

tf.TFRecordReader
Reads samples from TensorFlow's own binary format (TFRecord)
    
tf.ReaderBase
Allows you to create your own readers
```
To use data reader, we first need to create a queue to hold the names of all the files you want to read in through tf.train.string_input_producer, it creates a FIFOQueue under the hood, so to run the queue, we’ll need tf.Coordinator and tf.QueueRunner.

In [None]:
filename_queue = tf.train.string_input_producer("data/heart.csv")
reader = tf.TextLineReader(skip_header_lines=1) # skip the first line in the file
key, value = reader.read(filename_queue)
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    print(sess.run(key)) # data/heart.csv:2
    print(sess.run(value)) # 144,0.01,4.41,28.61,Absent,55,28.87,2.06,63,1
    coord.request_stop()
    coord.join(threads)

Below line of code above will parse value into the tensor record defaults which we have to create ourselves. The record defaults serve two purposes:
* First, it tells the decoder what types of data to expect in each column.
* Second, if a space in a column happens to be empty, it’ll fill in that space with the default value of the data type that we specify.

In [None]:
    content = tf.decode_csv(value, record_defaults=record_defaults)

For the record_defaults of this specific dataset, we’d like it to have 10 elements. All elements are either integers or floats, except for the fifth element that is a string. To make it easier, we assume
that all feature integers are floats (we’ll still specify the 10th column to be integer, because we like our labels to be integer).
```python
record_defaults = [[1.0] for _ in range(N_FEATURES)] # define all features to be floats
record_defaults[4] = [''] # make the fifth feature string
record_defaults.append([1])
content = tf.decode_csv(value, record_defaults=record_defaults)
```
You can also do all the kind of pre-processing you need for your data before feeding it in. For example, now we have our content is a list of 10 elements, 8 are floats, 1 is string, and 1 is integer. We’ll have to convert the string to float (Absent as 0 and Present as 1), and then convert the first 9 features into a tensor that can be fed into the model.
```python
# convert the 5th column (present/absent) to the binary value 0 and 1
condition = tf.equal(content[4], tf.constant('Present'))
content[4] = tf.select(condition, tf.constant(1.0), tf.constant(0.0))
# pack all 9 features into a tensor
features = tf.pack(content[:N_FEATURES])
# assign the last column to label
label = content[-1]
```
With that, every time the reader reads in a line from our CSV file, it’ll convert that line into a feature tensor and a label!
But we often don’t want to feed in a single sample into our model, but instead, we would want to batch ‘em up. You can do so using tf.train.batch, or tf.train.shuffle_batch if you want to shuffle your batches.
```python
# minimum number elements in the queue after a dequeue, used to ensure
# that the samples are sufficiently mixed
# I think 10 times the BATCH_SIZE is sufficient
min_after_dequeue = 10 * BATCH_SIZE
# the maximum number of elements in the queue
capacity = 20 * BATCH_SIZE
# shuffle the data to generate BATCH_SIZE sample pairs
data_batch, label_batch = tf.train.shuffle_batch([features, label], batch_size=BATCH_SIZE, capacity=capacity, min_after_dequeue=min_after_dequeue)
```
And with that we’re done. You can simply use data_batch and label_batch the way you would have used input_placeholder and label_placeholder in our previous model, except you don’t need to feed them in through the feed_dict parameters.
## TFRecord
----
Like many machine learning frameworks, TensorFlow has its own binary data format which is called TFRecord. A TFRecord is a serialized tf.train.Example Protobuf object. They can be created in a few lines of code. Below is an example to convert an image into a TFRecord.
First, we need to read in the image and convert it to byte string.
```python
def get_image_binary(filename):
    image = Image.open(filename)
    image = np.asarray(image, np.uint8)
    shape = np.array(image.shape, np.int32)
    return shape.tobytes(), image.tobytes() # convert image to raw data bytes in the array.
```
Next, you write these byte strings into a TFRecord file using tf.python_io.TFRecordWriter and tf.train.Features. You need the shape information so you can reconstruct the image from the binary format later.
```python
def write_to_tfrecord(label, shape, binary_image, tfrecord_file):
    """ This example is to write a sample to TFRecord file. If you want to write
    more samples, just use a loop.
    """
    writer = tf.python_io.TFRecordWriter(tfrecord_file)
    # write label, shape, and image content to the TFRecord file
    example = tf.train.Example(features=tf.train.Features(feature={
            'label': tf.train.Feature(bytes_list=tf.train.BytesList(value=[label])),
            'shape': tf.train.Feature(bytes_list=tf.train.BytesList(value=[shape])),
            'image':tf.train.Feature(bytes_list=tf.train.BytesList(value=[binary_image]))
            }))
    writer.write(example.SerializeToString())
    writer.close()
```
To read a TFRecord file, you use TFRecordReader and tf.decode_raw.
```python
def read_from_tfrecord(filenames):
    tfrecord_file_queue = tf.train.string_input_producer(filenames, name='queue')
    reader = tf.TFRecordReader()
    _, tfrecord_serialized = reader.read(tfrecord_file_queue)
    # label and image are stored as bytes but could be stored as
    # int64 or float64 values in a serialized tf.Example protobuf.
    tfrecord_features = tf.parse_single_example(tfrecord_serialized,
                        features={
                        'label': tf.FixedLenFeature([], tf.string),
                        'shape': tf.FixedLenFeature([], tf.string),
                        'image': tf.FixedLenFeature([], tf.string),
                        }, name='features')
    # image was saved as uint8, so we have to decode as uint8.
    image = tf.decode_raw(tfrecord_features['image'], tf.uint8)
    shape = tf.decode_raw(tfrecord_features['shape'], tf.int32)
    # the image tensor is flattened out, so we have to reconstruct the shape
    image = tf.reshape(image, shape)
    label = tf.cast(tfrecord_features['label'], tf.string)
    return label, shape, image
```
Keep in mind that label, shape, and image returned are tensor objects. To get their values, you’ll
have to eval them in tf.Session().
