In [2]:
import tensorflow as tf

### Data Readers 

Ops that return different values every time you call them
<strong>(Think Python’s generator)</strong>

* tf.TextLineReader :Outputs the lines of a file delimited by newlines (E.g. text files, CSV files)

* tf.FixedLengthRecordReader :Outputs the entire file when all files have same fixed lengths
   (E.g. each MNIST file has 28 x 28 pixels, CIFAR-10 32 x 32 x 3)
* tf.WholeFileReader : Outputs the entire file content

* tf.TFRecordReader : Reads samples from TensorFlow’s own binary format (TFRecord)
* tf.ReaderBase : To allow you to create your own readers


### Read in files from queues

filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"]) <br>
reader = tf.TextLineReader() <br>
key, value = reader.read(filename_queue)

### tf.FIFOQueue 

In [3]:
q = tf.FIFOQueue(3, "float")

* capacity : 큐의 크기(integer)
* dtypes : 데이터 타입
* shapes(Optional) : TensorShape
* names(Optional)
...


In [30]:
init = q.enqueue_many(([1.,2.,3.],))

In [32]:
with tf.Session() as sess:
    sess.run(init)
    print(sess.run(q.dequeue()))
    print(sess.run(q.dequeue()))
    print(sess.run(q.dequeue()))
    sess.run(q.enqueue([5]))
    print(sess.run(q.dequeue()))

1.0
2.0
3.0
5.0


### Threads & Queues

큐들을 관리하기 위해서 tf.train.Coordinator() 와 tf.train.start_queue_runners를 사용할 수 있다.

## CSV 파일 읽어오는 예제 

In [35]:
DATA_PATH = 'data/heart.csv'
BATCH_SIZE = 3
N_FEATURES = 9

In [36]:
def batch_generator(filenames):
    """ filenames is the list of files you want to read from. 
    In this case, it contains only heart.csv
    """
    filename_queue = tf.train.string_input_producer(filenames)
    reader = tf.TextLineReader(skip_header_lines=1) # 파일의 첫째줄 스킵
    _, value = reader.read(filename_queue)

    # record_defaults are the default values in case some of our columns are empty
    # This is also to tell tensorflow the format of our data (the type of the decode result)
    # for this dataset, out of 9 feature columns, 
    # 8 of them are floats (some are integers, but to make our features homogenous, 
    # we consider them floats), and 1 is string (at position 5)
    # the last column corresponds to the lable is an integer
    
    # null값이 있을 수도 있어서 디폴트 레코드를 만든다!?
    # value를 읽어오되, null 값이면 디폴트 값을 넣도록
    record_defaults = [[1.0] for _ in range(N_FEATURES)] 
    record_defaults[4] = ['']
    record_defaults.append([1])

    # read in the 10 rows of data
    content = tf.decode_csv(value, record_defaults=record_defaults) 
    
    # 5번째 컬럼을 존재/부재 여부의 이진변수화 시킨다?!
    # convert the 5th column (present/absent) to the binary value 0 and 1
    condition = tf.equal(content[4], tf.constant('Present'))
    content[4] = tf.where(condition, tf.constant(1.0), tf.constant(0.0))

    # 각 컬럼이 텐서로 반환되기 때문에 instances로 만든다
    # column1 + column2 + ... columnN
    features = tf.stack(content[:N_FEATURES])
    
    
    # 레이블 컬럼
    # assign the last column to label
    label = content[-1]


    # 디큐 후 남아 있는 큐 elements의 수(충분히 섞인다는 것을)
    # 보장하기 위해서
    # 보통 BATCH_SIZE의 10배 정도면 충분하다
    min_after_dequeue = 10 * BATCH_SIZE

    # 큐의 맥시멈 capacity
    capacity = 20 * BATCH_SIZE

    # shuffle the data to generate BATCH_SIZE sample pairs
    data_batch, label_batch = tf.train.shuffle_batch([features, label], batch_size=BATCH_SIZE, 
                                        capacity=capacity, min_after_dequeue=min_after_dequeue)

    return data_batch, label_batch

#### tf.train.string_input_producer(string_tensor, num_epochs=None, shuffle=True, seed=None, capacity=32, shared_name=None, name=None, cancel_op=None)

Output strings (e.g. filenames) to a queue for an input pipeline. (파일 네임의 스트링 리스트를 넣으면 된당?) <br>
아! 여러개의 파일을 큐로 만들어 넣고 리더가 하나씩 열어보면서
다 불러올 수 있도록 하는 거인듯!

#### class tf.TextLineReader 

A Reader that outputs the lines of a file delimited by newlines. (줄 단위로 읽어온다)
E.g. text files, CSV files

Newlines are stripped from the output. See ReaderBase for supported methods.
<br>
<br>
<strong>read(queue, name=None)</strong><br>
Returns the next record (key,value pair) produced by a reader

#### tf.decode_csv(records, record_defaults, field_delim=None, name=None) 

Convert CSV records to tensors. Each column maps to one tensor. (각 컬럼이 tensor로 매핑된다?)

* records: A Tensor of type string. Each string is a record/row in the csv and all records should have the same format.
* record_defaults: A list of Tensor objects with types from: float32, int32, int64, string. One tensor per column of the input record, with either a scalar default value for that column or empty if the column is required.
* field_delim: An optional string. Defaults to ",". delimiter to separate fields in a record.
* name: A name for the operation (optional).

#### tf.stack(values, axis=0, name='stack') 

Stacks a list of rank-R tensors into one rank-(R+1) tensor. (np.vstack과 유사)

#### tf.train.shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, num_threads=1, seed=None, enqueue_many=False, shapes=None, allow_smaller_final_batch=False, shared_name=None, name=None)

Creates batches by randomly shuffling tensors.

<strong>Creates batches of 32 images and 32 labels</strong>
<br>
image_batch, label_batch = tf.train.shuffle_batch(<br>
      [single_image, single_label],<br>
      batch_size=32,<br>
      num_threads=4,<br>
      capacity=50000,<br>
      min_after_dequeue=10000)

In [37]:
def generate_batches(data_batch, label_batch):
    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(coord=coord)
        for _ in range(10): # generate 10 batches
            features, labels = sess.run([data_batch, label_batch])
            print(features)
        coord.request_stop()
        coord.join(threads)

In [38]:
data_batch, label_batch = batch_generator([DATA_PATH])
generate_batches(data_batch, label_batch)

[[ 114.            0.            3.82999992   19.39999962    1.           49.
    24.86000061    2.49000001   29.        ]
 [ 124.           14.            6.23000002   35.95999908    1.           45.
    30.09000015    0.           59.        ]
 [ 142.           18.20000076    4.34000015   24.37999916    0.           61.
    26.19000053    0.           50.        ]]
[[ 132.            7.9000001     2.8499999    26.5           1.           51.
    26.15999985   25.70999908   44.        ]
 [ 206.            6.            2.95000005   32.27000046    0.           72.
    26.80999947   56.06000137   60.        ]
 [ 117.            1.52999997    2.44000006   28.95000076    1.           35.
    25.88999939   30.03000069   46.        ]]
[[ 134.           14.10000038    4.44000006   22.38999939    1.           65.
    23.09000015    0.           40.        ]
 [ 144.            4.09000015    5.55000019   31.39999962    1.           60.
    29.43000031    5.55000019   56.        ]
 [ 120.       

#### class tf.train.Coordinator

A coordinator for threads.

This class implements a simple mechanism to coordinate the termination of a set of threads. <br><br>
쓰레드를 계속 감시한다? 쓰레드를 돌리고 멈추는 것을 관리

`Create a coordinator.` <br>
coord = Coordinator() <br>
`Start a number of threads, passing the coordinator to each of them.`<br>
...start thread 1...(coord, ...)<br>
...start thread N...(coord, ...)<br>
`Wait for all the threads to terminate.`<br>
coord.join(threads)

#### tf.train.start_queue_runners(sess=None, coord=None, daemon=True, start=True, collection=tf.GraphKeys.QUEUE_RUNNERS)

Starts all queue runners collected in the graph.

This is a companion method to add_queue_runner(). It just starts threads for all queue runners collected in the graph. It returns the list of all threads.



만약 data가 있는 storage와 worker가 각각 다른 곳에 있다면 즉, client를 한번 거쳐야 한다면 feed_dict에서 병목 현상이 일어난다?! <br>
<strong>Data Readers</strong>를 이용하면 client를 거치지 않고 바로 storage가 worker의 process로 갈 수 있도록 해준다.(generator) <br>
아직 뭐가 장점인지 와닿지는 않지만 속도 상의 이점이 있겠지?! <br>
cf. 그래프 위에 있는 큐들은 단독으로 sess.run은 안되고 queue_runner를 이용해서 thread 위에서 실제로 불러올 수 있당? 이 threads를 관리해주는 것은 Coordinator이다