# Feed Dataset through Queue to Tensorflow from HDFS

## Populate HDFS with Sample Dataset

In [20]:
%%bash

hadoop fs -copyFromLocal /root/datasets/linear /

copyFromLocal: `/linear/training.csv': File exists
copyFromLocal: `/linear/validation.csv': File exists


In [21]:
%%bash

hadoop fs -ls /linear

Found 4 items
-rw-r--r--   3 root supergroup    5000000 2017-05-08 01:38 /linear/test.csv
-rw-r--r--   3 root supergroup    5000000 2017-05-08 01:38 /linear/train.csv
-rw-r--r--   3 root supergroup    5000000 2017-05-08 01:38 /linear/training.csv
-rw-r--r--   3 root supergroup    5000000 2017-05-08 01:38 /linear/validation.csv


## Feed TensorFlow from HDFS through a `FIFOQueue`
`tf.train.string_input_producer()` uses `tf.FIFOQueue` internally.

In [60]:
import tensorflow as tf
import numpy as np

hdfs_queue = tf.train.string_input_producer([
    "hdfs://127.0.0.1:39000/linear/training.csv",
])

reader = tf.TextLineReader()
_, value = reader.read(hdfs_queue)
x_observed, y_observed = tf.decode_csv(value, [[0.0],[0.0]])

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord,
                                           sess=sess)
    try:
        n = 10
        print("First %d:\n" % n)
        for _ in range(n):
            x, y = sess.run([x_observed, y_observed])
            print(np.asarray([x, y]))
    finally:
        coord.request_stop()
        coord.join(threads)

First 10:

[ 0.95979553  0.39002556]
[ 0.00741443  0.29535058]
[ 0.3858389   0.34356433]
[ 0.1863874   0.30246931]
[ 0.52435893  0.34973004]
[ 0.59856516  0.34064981]
[ 0.94997454  0.39089239]
[ 0.7457779   0.37103787]
[ 0.65097833  0.35566217]
[ 0.58418411  0.36767057]


## Feed Batched Data from HDFS to TensorFlow
`tf.train.batch` uses `tf.FIFOQueue` internally - similar to `tf.train.string_input_producer()`

`capacity` must be larger than `min_after_dequeue`.  The difference in size becomes the prefetch maximum.

`capacity` = `batch_size` * `num_threads`

In [65]:
import tensorflow as tf

hdfs_queue = tf.train.string_input_producer([
    "hdfs://127.0.0.1:39000/linear/training.csv",
])

reader = tf.TextLineReader()
_, value = reader.read(hdfs_queue)
x_observed, y_observed = tf.decode_csv(value, [[0.0],[0.0]])

x_observed_batch, y_observed_batch = \
    tf.train.batch([x_observed, y_observed], 
                            batch_size=10,
                            capacity=80,
                            num_threads=8)
    
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord,
                                           sess=sess)
    try:
        print("Batch:\n")
        x, y = sess.run([x_observed_batch, y_observed_batch])
        print(x, y)
    finally:
        coord.request_stop()
        coord.join(threads)

Batch:

[ 0.95979553  0.1863874   0.3858389   0.52435893  0.59856516  0.00741443
  0.94997454  0.65097833  0.22963113  0.7457779 ] [ 0.39002556  0.30246931  0.34356433  0.34973004  0.34064981  0.29535058
  0.39089239  0.35566217  0.29871479  0.37103787]


## Feed Shuffled Batch Data from HDFS to TensorFlow
`tf.train.shuffle_batch` uses `tf.RandomShuffleQueue` internally.

`min_after_dequeue` defines the buffer size when randomly sampling.  Larger buffers require more RAM, but provide better shuffling characteristics.

`capacity` must be larger than `min_after_dequeue`.  The difference in size becomes the prefetch maximum.

`capacity` = `batch_size` * (`num_threads` + `some_safety_margin`) + `min_after_dequeue`

In [30]:
import tensorflow as tf

shuffled_hdfs_queue = tf.train.string_input_producer([
    "hdfs://127.0.0.1:39000/linear/training.csv",
])

reader = tf.TextLineReader()
_, value = reader.read(shuffled_hdfs_queue)
x_observed, y_observed = tf.decode_csv(value, [[0.0],[0.0]])

x_observed_shuffled_batch, y_observed_shuffled_batch = \
    tf.train.shuffle_batch([x_observed, y_observed], 
                            batch_size=10,
                            capacity=100,
                            min_after_dequeue=10,
                            num_threads=8)

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord,
                                           sess=sess)
    try:
        print("Shuffled batch:\n")
        x, y = sess.run([x_observed_shuffled_batch, y_observed_shuffled_batch])
        print(x, y)
    finally:
        coord.request_stop()
        coord.join(threads)

Shuffled batch:

[ 0.65097833  0.94997454  0.16309556  0.58418411  0.59856516  0.3858389
  0.22963113  0.00741443  0.83741975  0.82964957] [ 0.35566217  0.39089239  0.31556964  0.36767057  0.34064981  0.34356433
  0.29871479  0.29535058  0.38860601  0.38306496]
