### Queues and Coordinators
Queues: important TensorFlow objects for computing tensors asynchronously in a graph

In [None]:
# In input pipeline, multiple threads can help us reduce the bottleneck at the reading in data phase 
# because reading in data is a lot of waiting.
# in using queues to prepare inputs for training a model, we have:
# Multiple threads prepare training examples and push them in the queue.
# A training thread executes a training op that dequeues mini-batches from the queue.

# The TensorFlow Session object is designed multithreaded, so multiple threads can easily use the same session 
# and run ops in parallel.
# In python, All threads must be able to stop together, exceptions must be caught and reported, 
# and queues must be properly closed when stopping.

# TensorFlow provides two classes to help with the threading: tf.Coordinator and tf.train.QueueRunner.
# The Coordinator class helps multiple threads stop together and report exceptions to a program 
# that waits for them to stop. The QueueRunner class is used to create a number of threads 
# cooperating to enqueue tensors in the same queue.

# two main queue classes, tf.FIFOQueue and tf.RandomShuffleQueue.
# FIFOQueue creates a queue the dequeues elements in a first in first out order, 
# while RandomShuffleQueue dequeues elements in, well, a random order.
# These two queues support the enqueue, enqueue_many, and dequeue

# A common practice is that you enqueue many examples in when you read your data, but dequeue them one by one.
# If you want to get multiple elements at once for your batch training,
# you’ll have to use tf.train.batch or tf.train.shuffle_batch if you want to your batch to be shuffled.

# There is also tf.PaddingFIFOQueue which is a FIFOQueue that supports batching variable-sized tensors by padding.
# tf.PriorityQueue, which is a FIFOQueue whose enqueues and dequeues take in another argument: the priority

# in practice, you rarely use a queue by itself, but always with string_input_producer

In [None]:
N_SAMPLES = 1000
NUM_THREADS = 4
# Generating some simple data
# create 1000 random samples, each is a 1D array from the normal distribution (10, 1) 
data = 10 * np.random.randn(N_SAMPLES, 4) + 1
# create 1000 random labels of 0 and 1
target = np.random.randint(0, 2, size=N_SAMPLES)
queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])
enqueue_op = queue.enqueue_many([data, target])
dequeue_op = queue.dequeue()
# create NUM_THREADS to do enqueue
qr = tf.train.QueueRunner(queue, [enqueue_op] * NUM_THREADS)
with tf.Session() as sess:
# Create a coordinator, launch the queue runner threads.
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True) 
    for step in xrange(100):   # do to 100 iterations
        if coord.should_stop(): 
            break
        data_batch, label_batch = sess.run(dequeue_op) 
    coord.request_stop()
    coord.join(enqueue_threads)

In [None]:
# tf.Coordinator is used to manage threads of any thread we create, e.g., use python threading package to create threads
import threading
# thread body: loop until the coordinator indicates a stop was requested.
# if some condition becomes true, ask the coordinator to stop.
def my_loop(coord):
    while not coord.should_stop():
        ... do  something ... 
        if   ... some condition ...:
            coord.request_stop() 

# main code: create a coordinator.
coord = tf.Coordinator()
# create 10 threads that run 'my_loop()'
# you can also create threads using QueueRunner as the example above
threads = [threading.Thread(target=my_loop, args=(coord,)) for _ in xrange(10)]
# start the threads and wait for all of them to stop.
for t in threads:
    t.start()
coord.join(threads)

### Data Reader

In [None]:
tf.TextLineReader
# output the lines of a file delimited by newlines e.g. text files, CSV files

tf.FixedLengthRecoredReader
# output the entire file when all files have same fixed lenghts
# e.g. each MNIST file has 28 * 28 pixels, CIFAR-10 32 * 32 * 3

tf.WholeFileReader
# output the entire file content

tf.TFRecordReader
# reads samples from tensorflow's own binary format (TFRecords)

tf.ReaderBase
# Allows you to create your own readers

In [None]:
# To use data reader, we first need to create a queue to hold the names of all the files 
# that you want to read in through tf.train.string_input_producer.
filename_queue = tf.train.string_input_producer(["heart.csv"])
reader = tf.TextLineReader(skip_header_lines=1)
# it means you choose to skip the first line for every file in the queue
# think of readers as ops that return a different value every time you call it 
# it’ll return you a pair key, value, in which key is a key to identify the file and record 
# (useful for debugging if you have some weird records), and a scalar string value
key, value = reader.read(filename_queue)
# e.g.:
key = data/heart.csv:2 # 2nd line in the file heart.csv
value = 144,0.01,4.41,28.61,Absent,55,28.87,2.06,63,1

# tf.train.string_input_producer creates a FIFOQueue under the hood, so to run the queue, 
# we’ll need tf.Coordinator and tf.QueueRunner.
filename_queue = tf.train.string_input_producer(filenames)
reader = tf.TextLineReader(skip_header_lines=1) # skip the first line in the file 
key, value = reader.read(filename_queue)
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    print sess.run(key) # data/heart.csv:2
    print sess.run(value) # 144,0.01,4.41,28.61,Absent,55,28.87,2.06,63,1
    coord.request_stop()
    coord.join(threads)

# To convert string tensor into a vector representation of features
# we need Tensorflow CSV decoder
content = tf.decode_csv(value, record_defaults=record_defaults)
#  The record defaults serve two purposes:
# 1. it tells the decoder what types of data to expect in each column.
# 2.  if a space in a column happens to be empty, it’ll fill in that space 
#     with the default value of the data type that we specify.

record_defaults = [[1.0] for _ in range(N_FEATURES)] # define all features to be floats
record_defaults[4] = [''] # make the fifth feature string 
record_defaults.append([1]) # specify the 10th column to be integer, because we like our labels to be integer
content = tf.decode_csv(value, record_defaults=record_defaults)

# You can also do all the kind of pre-processing you need for your data before feeding it in.
# For example, now we have our content is a list of 10 elements, 8 are floats, 1 is string, and 1 is integer. 
# We’ll have to convert the string to float (Absent as 0 and Present as 1), 
# and then convert the first 9 features into a tensor that can be fed into the model.

# convert the 5th column (present/absent) to the binary value 0 and 1
condition = tf.equal(content[4], tf.constant('Present'))
content[4] = tf.select(condition, tf.constant(1.0), tf.constant(0.0))
# pack all 9 features into a tensor
features = tf.pack(content[:N_FEATURES])
# assign the last column to label
label = content[-1]

# Now each sample read from csv file would be converted into tensors
# But we want to batch'em up. can do so using tf.train.batch, or tf.train.shuffle_batch 
# if you want to shuffle your batches.

# minimum number elements in the queue after a dequeue, used to ensure # that the samples are sufficiently mixed
# I think 10 times the BATCH_SIZE is sufficient
min_after_dequeue = 10 * BATCH_SIZE
# the maximum number of elements in the queue
capacity =  20 * BATCH_SIZE
# shuffle the data to generate BATCH_SIZE sample pairs
data_batch, label_batch = tf.train.shuffle_batch([features, label], batch_size=BATCH_SIZE, capacity=capacity,
                                                 min_after_dequeue=min_after_dequeue)


### TFRecord: TensorFlow's own binary data format

In [None]:
# E.g. convert an image to a TFRecord
def get_image_binary(filename):
    image = Image.open(filename)
    image = np.asarray(image, np.uint8)
    shape = np.array(image.shape, np.int32)
    return shape.tobytes(), image.tobytes()  # convert image to raw data bytes in the array.
# Next, you write these byte strings into a TFRecord file using tf.python_io.TFRecordWriter and tf.train.Features.
# You need the shape information so you can reconstruct the image from the binary format later.

def write_to_tfrecord(label, shape, binary_image, tfrecord_file):
    """This example is to write a sample to TFRecord file. If you want to write more samples, just use loop"""
    writer = tf.python_io.TFRecordWriter(tfrecord_file)
    # write label, shape, and image content to the TFRecord file
    example = tf.train.Example(features=tf.train.Features(feature={
        'label': tf.train.Feature(bytes_list=tf.train.BytesList(value=[label])),
        'shape': tf.train.Feature(bytes_list=tf.train.BytesList(value=[shape])),
        'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[binary_image]))
    }))
    writer.write(example.SerializeToString())
    writer.close()
    
# TO read a TFRecord file
                       
def read_from_tfrecord(filenames):
    tfrecord_file_queue = tf.train.string_input_producer(filenames, name='queue')
    reader = tf.TFRecordReader()
    _, tfrecord_serialized = reader.read(tfrecord_file_queue)
    # label and image are stored as bytes but could be stored as
    # int64 or float64 values in a serialized tf.Example protobuf.
    tfrecord_features = tf.parse_single_example(tfrecord_serialized,
                        features={
                         'label': tf.FixedLenFeature([], tf.string ),
                         'shape': tf.FixedLenFeature([], tf.string ),
                         'image': tf.FixedLenFeature([], tf.string),
                    }, name='features')              
    # image was saved as uint8, so we have to decode as uint8.
    image = tf.decode_raw(tfrecord_features['image'], tf.uint8)
    shape = tf.decode_raw(tfrecord_features['shape'], tf.int32)
    # The image tensor is flattened out, so we have to reconstruct the shape
    image = tf.reshape(image, shape)
    label = tf.cast(tfrecord_features['label'], tf.string)
    return  label, shape, image
