In [1]:
import tensorflow as tf
import random
import numpy as np
import os
import cv2

## Feed_dict system의 문제점

우리는 이번 tutorial을 통해서 Data pipelining을 효율적으로 만드는 것에 중점적으로 확인하려고 한다.

TensorFlow를 통해서 학습을 위한 코드를 작성할 때 크게 세 부분으로 나뉘는데,

첫 째 input data를 로드하는 부분, 

둘 째 model을 디자인하고 input data를 받아 prediction을 출력하는 부분, 

셋 째 prediction을 정답과 비교하고 model의 파라미터를 갱신하는 부분으로 나눌 수 있다.

우리는 코드를 통해 학습 연산을 구현할 때 우리의 연산 자원인 GPU의 효율성을 최대로 발휘하도록 해야 한다. 이때 많은 경우에 걸쳐 Bottle-neck으로 작용하는 것이 바로 첫 번째 input loading part이다. GPU를 효율적으로 사용하여 학습 속도를 가속하기 위해서는 지속적으로 쉬지 않고 data를 전달해주어야 한다. 만약 program이 data를 가져와서 model에 전달하는 부분과 data를 통해 연산하는 부분이 순차적으로 수행되게 된다면 GPU는 data를 가져와 넣어줄 때까지 놀게 될 것이다.

TensorFlow의 feed_dict는 이점에서 문제가 있다. feed_dict는 python data를 session에게 복사하여 넘겨준다. 만약 single threading을 하는 program이라면 GPU는 data를 대기하며 idle이 발생할 것이다.

In [1]:
import time
import tensorflow as tf

# We simulate some raw input data 
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_inputs_data = tf.random_normal([128, 1024], mean=0, stddev=1)
# We will try to predict this law:
# predict 1 if the sum of the elements is positive and 0 otherwise
y_inputs_data = tf.cast(tf.reduce_sum(x_inputs_data, axis=1, keep_dims=True) > 0, tf.int32)

# We build our small model: a basic two layers neural net with ReLU
with tf.variable_scope("placeholder"):
    input = tf.placeholder(tf.float32, shape=[None, 1024])
    y_true = tf.placeholder(tf.int32, shape=[None, 1])
with tf.variable_scope('FullyConnected'):
    w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1))
    b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(input, w) + b
    y = tf.nn.relu(z)

    w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1))
    b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(y, w2) + b2
with tf.variable_scope('Loss'):
    losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z)
    loss_op = tf.reduce_mean(losses)
with tf.variable_scope('Accuracy'):
    y_pred = tf.cast(z > 0, tf.int32)
    accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32))
    accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:")

# We add the training operation, ...
adam = tf.train.AdamOptimizer(1e-2)
train_op = adam.minimize(loss_op, name="train_op")

startTime = time.time()
with tf.Session() as sess:
    # ... init our variables, ...
    sess.run(tf.global_variables_initializer())

    # ... check the accuracy before training, ...
    x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
    sess.run(accuracy, feed_dict={
        input: x_input,
        y_true: y_input
    })

    # ... train ...
    for i in range(5000):
        #  ... by sampling some input data (fetching) ...
        x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
        # ... and feeding it to our model
        _, loss = sess.run([train_op, loss_op], feed_dict={
            input: x_input,
            y_true: y_input
        })

        # We regularly check the loss
        if i % 500 == 0:
            print('iter:%d - loss:%f' % (i, loss))

    # Finally, we check our final accuracy
    x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
    sess.run(accuracy, feed_dict={
        input: x_input,
        y_true: y_input
    })

print("Time taken: %f" % (time.time() - startTime))

Instructions for updating:
keep_dims is deprecated, use keepdims instead
iter:0 - loss:2.912185
iter:500 - loss:0.369858
iter:1000 - loss:0.851633
iter:1500 - loss:1.058526
iter:2000 - loss:0.988433
iter:2500 - loss:0.315225
iter:3000 - loss:0.246361
iter:3500 - loss:0.849347
iter:4000 - loss:0.897075
iter:4500 - loss:2.264365
Time taken: 7.456434


# Solution

위에서 언급한 바대로 '데이터 입력' 부분과 '연산' 부분을 비동기화하여 GPU가 쉬지 않고 일할 수 잇다면, 훨씬 효율적으로 연산 자원을 활용할 수 있을 것이다. 우리는 multi-threading을 사용하여 데이터 입력 부분을 효율적으로 만들어줄 수 있다. TensorFlow에서는 이를 위해 Queue와 Queue runner를 API로 제공하고 있다.

reference : https://blog.metaflow.fr/tensorflow-how-to-optimise-your-input-pipeline-with-queues-and-multi-threading-e7c3874157e0

## Queue_runner 사용 방법

In [None]:

import tensorflow as tf

# This time, let's start with 6 samples of 1 data point
x_input_data = tf.random_normal([6], mean=-1, stddev=4)

# Note that the FIFO queue has still a capacity of 3
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

# To check what is happening in this case:
# we will print a message each time "x_input_data" is actually computed
# to be used in the "enqueue_many" operation
x_input_data = tf.Print(x_input_data, data=[x_input_data], message="Raw inputs data generated:", summarize=6)
enqueue_op = q.enqueue_many(x_input_data)

# To leverage multi-threading we create a "QueueRunner"
# that will handle the "enqueue_op" outside of the main thread
# We don't need much parallelism here, so we will use only 1 thread
numberOfThreads = 1 
qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)
# Don't forget to add your "QueueRunner" to the QUEUE_RUNNERS collection
tf.train.add_queue_runner(qr) 

input = q.dequeue() 
input = tf.Print(input, data=[q.size(), input], message="Nb elements left, input:")

# fake graph: START
y = input + 1
# fake graph: END 

# We start the session as usual ...
with tf.Session() as sess:
    # But now we build our coordinator to coordinate our child threads with
    # the main thread
    coord = tf.train.Coordinator()
    # Beware, if you don't start all your queues before runnig anything
    # The main threads will wait for them to start and you will hang again
    # This helper start all queues in tf.GraphKeys.QUEUE_RUNNERS
    threads = tf.train.start_queue_runners(coord=coord)

    # The QueueRunner will automatically call the enqueue operation
    # asynchronously in its own thread ensuring that the queue is always full
    # No more hanging for the main process, no more waiting for the GPU
    sess.run(y)
    sess.run(y) 
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)

    # We request our child threads to stop ...
    coord.request_stop()
    # ... and we wait for them to do so before releasing the main thread
    coord.join(threads)

## 개선된 문제점

In [3]:
import time
import tensorflow as tf

# We simulate some raw input data 
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_input_data = tf.random_normal([128, 1024], mean=0, stddev=1)

# We build our small model: a basic two layers neural net with ReLU

with tf.variable_scope("While_Queue_runner"):

    with tf.variable_scope("queue"):
        q = tf.FIFOQueue(capacity=5, dtypes=tf.float32) # enqueue 5 batches
        # We use the "enqueue" operation so 1 element of the queue is the full batch
        enqueue_op = q.enqueue(x_input_data)
        numberOfThreads = 1
        qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)
        tf.train.add_queue_runner(qr)
        input = q.dequeue() # It replaces our input placeholder
        # We can also compute y_true right into the graph now
        y_true = tf.cast(tf.reduce_sum(input, axis=1, keep_dims=True) > 0, tf.int32)

    with tf.variable_scope('FullyConnected'):
        w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1))
        b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1))
        z = tf.matmul(input, w) + b
        y = tf.nn.relu(z)

        w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1))
        b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1))
        z = tf.matmul(y, w2) + b2

    with tf.variable_scope('Loss'):
        losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z)
        loss_op = tf.reduce_mean(losses)

    with tf.variable_scope('Accuracy'):
        y_pred = tf.cast(z > 0, tf.int32)
        accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32))
        accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:")

    # We add the training op ...
    adam = tf.train.AdamOptimizer(1e-2)
    train_op = adam.minimize(loss_op, name="train_op")

startTime = time.time()
with tf.Session() as sess:
    # ... init our variables, ...
    sess.run(tf.global_variables_initializer())

    # ... add the coordinator, ...
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    # ... check the accuracy before training (without feed_dict!), ...
    sess.run(accuracy)

    # ... train ...
    for i in range(5000):
        #  ... without sampling from Python and without a feed_dict !
        _, loss = sess.run([train_op, loss_op])

        # We regularly check the loss
        if i % 500 == 0:
            print('iter:%d - loss:%f' % (i, loss))

    # Finally, we check our final accuracy
    sess.run(accuracy)

    coord.request_stop()
    coord.join(threads)

print("Time taken: %f" % (time.time() - startTime))

iter:0 - loss:2.171846
iter:500 - loss:0.469907
iter:1000 - loss:0.633990
iter:1500 - loss:1.030779
iter:2000 - loss:0.545669
iter:2500 - loss:0.323410
iter:3000 - loss:0.787239
iter:3500 - loss:0.652673
iter:4000 - loss:0.695005
iter:4500 - loss:1.328690
Time taken: 4.120565


# reference

https://www.tensorflow.org/api_docs/python/tf/data/Dataset

https://www.tensorflow.org/guide/datasets

# importing data with Tensorflow tf.data API

tf.data는 간단한 수준에서 복잡한 수준까지의 Input pipeline을 구성할 수 있도록 하는 API이다. 구체적으로 제공하는 기능은 다음과 같다.

# 1. tf.data.Dataset

tf.data.Dataset은 연속된 element 집합으로, 각각의 element가 Tensor object로 구성되어있다. 이때 각각의 element는 Training을 위한 data와 label의 pair로 볼 수 있을 것이다.

크게 다음과 같은 구성을 가진다.

- Creating source : 다수의 tf.Tensor object 또는 file로부터 dataset을 구성한다.

  e.g : Dataset.from_tensor_slices()
  
  그 외에도 from_generator, list_files, interleave


- Applying a transformation : 하나 또는 여러 개의 dataset object들로부터 새로운 dataset을 구성한다.

  e.g : Dataset.batch()
  
  그 외에도 concatenate, filter, reduce, map, flat_map, padded_batch, shard, zip, shuffle

### Definition of data source
1. tf.data.Dataset.from_tensors() or tf.data.Dataset.from_tensor_slices() :
   memory 상의 tensor들로 dataset을 만드는 경우
2. tf.data.TFRecordDataset :
   Disk 상의 file들로 dataset을 만드는 경우
3. tf.data.from_generator  :
   python iterator로부터 dataset을 만드는 경우

## Create dataset and Show information
output_shapes : dataset의 각 element의 shape 정보
output_types : dataset의 각 element의 type

In [5]:
dataset1 = tf.data.Dataset.from_tensor_slices( tf.random_uniform([4, 10], dtype = tf.float32) )
print("dataset1 : ")
print(dataset1.output_types)
print(dataset1.output_shapes)

dataset2 = tf.data.Dataset.from_tensor_slices( 
    (tf.random_uniform( [4, 1] ),
     tf.random_uniform( [4, 10], maxval=100, dtype = tf.int32)))

print("dataset2 : ")
print(dataset2.output_types)
print(dataset2.output_shapes)
    
dataset3 = tf.data.Dataset.zip( (dataset1, dataset2) )
print("dataset3 : ")
print(dataset3.output_types)
print(dataset3.output_shapes)
    

dataset1 : 
<dtype: 'float32'>
(10,)
dataset2 : 
(tf.float32, tf.int32)
(TensorShape([Dimension(1)]), TensorShape([Dimension(10)]))
dataset3 : 
(tf.float32, (tf.float32, tf.int32))
(TensorShape([Dimension(10)]), (TensorShape([Dimension(1)]), TensorShape([Dimension(10)])))


In [28]:
file_list_dataset = tf.data.Dataset.list_files(
    '/home/dan/datasets/flower_photos/daisy/*.jpg',
    shuffle=None,
    seed=None
)
iterator = file_list_dataset.make_one_shot_iterator()
next_element = iterator.get_next()

with tf.Session() as sess:

SyntaxError: unexpected EOF while parsing (<ipython-input-28-95f8998e7bfa>, line 9)

<TensorSliceDataset shapes: (10,), types: tf.float32>

# 2. Creating iterator

우리의 데이터를 표현하는 데이터셋을 만든 뒤에, 데이터셋의 element에 접근하는 iterator를 만들어야 한다. tf.data API는 다음과 같은 iterator를 제공하고 있다. 

- one-shot,
- initializable,
- reinitializable, and
- feedable.

## 2.1 one_shot_iterator

one shot iterator는 

In [25]:
dataset = tf.data.Dataset.range(100)
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

with tf.Session() as sess:
    for i in range(10):
      value = sess.run(next_element)
      print(value)
      assert i == value

0
1
2
3
4
5
6
7
8
9


1. 다양한 source로부터의 입력
   1. local file system
   2. distributed file system
   3. On-memory data
   4. real-time data generator
