# Tensorflow ライブラリ読解(Queue)

## はじめに(Queue)とは
一般的な回路におけるQueueと同じ

マルチスレッドにおいて、複数のスレッドで処理したデータを複数のスレッドで利用したい場合、間にQueueを挟むことにより
処理速度の差を吸収することができ、スレッド間でのデータのやり取りを効率的にする。

## Queueの種類
https://www.tensorflow.org/api_docs/python/io_ops/queues 参照
### tf.FIFOQueue  
一般的なFIFO
### tf.PaddingFIFOQueue  
A FIFOQueue that supports batching variable-sized tensors by padding.  
色々なサイズのテンソルを、パディングしながら格納できるFIFO
### tf.RandomShuffleQueue
Create a queue that dequeues elements in a random order.  
ランダムな順番にデータを取り出すQueue
### tf.PriorityQueue
A queue implementation that dequeues elements in prioritized order.  
優先度順にデータを取り出すQueue

## それぞれのQueueを使ってみる
(tf.PaddingFIFOQueueは使い方分からず...)

In [1]:
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division

import tensorflow as tf

d = [[1, 2, 3, 4, 5]]

q = tf.FIFOQueue(capacity=10, dtypes=tf.int32)
init_op = q.enqueue_many(d)
dequeue_op = q.dequeue()

with tf.Session() as sess:
    sess.run([init_op])
    sess.run(q.close())
    print("Size = %d" % sess.run(q.size()))
    for i in range(3):
        print("q[%d] = %s" % (i, sess.run([dequeue_op])))
    

Size = 5
q[0] = [1]
q[1] = [2]
q[2] = [3]


In [2]:
q = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=1, dtypes=tf.int32)
init_op = q.enqueue_many(d)
dequeue_op = q.dequeue()

with tf.Session() as sess:
    sess.run([init_op])
    sess.run(q.close())
    print("Size = %d" % sess.run(q.size()))
    for i in range(3):
        print("q[%d] = %s" % (i, sess.run([dequeue_op])))

Size = 5
q[0] = [4]
q[1] = [2]
q[2] = [5]


In [3]:
q = tf.PriorityQueue(capacity=10, types=[tf.int32], shapes=[])
enqueue_ops = [
    q.enqueue((3, 3)), # (priority, tensor)
    q.enqueue((2, 2)),
    q.enqueue((1, 1))
]
dequeue_op = q.dequeue()

with tf.Session() as sess:
    for op in enqueue_ops:
        sess.run([op])
    sess.run(q.close())
    print("Size = %d" % sess.run(q.size()))
    for i in range(3):
        print("q[%d] = %s" % (i, sess.run([dequeue_op])))

Size = 3
q[0] = [[1, 1]]
q[1] = [[2, 2]]
q[2] = [[3, 3]]


## マルチスレッドでQueueを使う
マルチスレッドでQueueを使う場合は、CoordinatorとQueueRunnerを利用する(らしい)  
https://www.tensorflow.org/how_tos/threading_and_queues/
この辺を参照


In [2]:
import random

# 深さ10のQueueを作成
q = tf.FIFOQueue(capacity=10, dtypes=tf.int32)
# 初期化関数
init_op = q.enqueue_many(d)
# Queueに詰める関数を2つ定義(100, 101をそれぞれ入れる)
enqueue_ops = map(lambda e: q.enqueue(e), [100, 101])
# Queueを取り出して実際に利用する関数
dequeue_op = q.dequeue()

with tf.Session() as sess:
#     sess.run([init_op]) # Queueを初期化
    coord = tf.train.Coordinator() # Coordinatorをインスタンス
    qr = tf.train.QueueRunner(q, enqueue_ops) # QueueRunnerでそれぞれのopをスレッド化
    tf.train.add_queue_runner(qr) # デフォルトグラフに登録する
    threads = tf.train.start_queue_runners(coord=coord) # Queueの動作開始, 戻り値はthreadのlist
    
    for i in range(10):
        print(sess.run([dequeue_op]), end=", ") # 15回取り出す、マルチスレッドなので100と101が観測されるはず
    
    coord.request_stop() # すべての命令が終わり次第Queueを停止させる
    coord.join(threads) # 止まるまで待機

    for i in range(10): # 10段分はQueueに残っている
        print(sess.run([dequeue_op]), end=", ")

    try:
        # CoordinatorでQueueRunnerを動かすと停止時にQueueをクローズする
        # なので、段数以上取り出そうとするとエラーが発生
        print(sess.run([dequeue_op])) # may be error
    except tf.errors.OutOfRangeError:
        print("Queue Empty!")

[100], [100], [101], [100], [101], [100], [101], [100], [101], [100], [101], [100], [101], [100], [101], [100], [101], [100], [101], [100], Queue Empty!


## Queue作成関数
この辺の無限Queueを作成するための関数が用意されている。  

- tf.train.input_producer
- tf.train.range_input_producer
- tf.train.slice_input_producer
- tf.train.string_input_producer

### 例: string_input_producer

In [5]:

d = ["a", "b", "c", "d", "e"]

# shuffle=Trueの場合 → ランダムな順番で出力(デフォルト)
# shuffle=Falseの場合 → 入れた順番で出力
# 戻り値はQueue、だが内部でQueueRunnerを作成し・かつadd_queue_runnerされた状態で出力される
# 入力されたリストのテキストを無限に出力する
q = tf.train.string_input_producer(d)
# 回数を指定することも可能
# q = tf.train.string_input_producer(d, num_epochs=2)

dequeue_op = q.dequeue()

with tf.Session() as sess:
    # 回数を指定した場合は初期化する必要があるらしい
    # sess.run(tf.local_variables_initializer())
    
    coord = tf.train.Coordinator()
    
    # 既に登録されているためaddせずに利用できる(何スレッドなんだろう？)
    threads = tf.train.start_queue_runners(coord=coord)

    for i in range(10):
        print(sess.run([dequeue_op]))

    coord.request_stop()
    coord.join(threads)


['e']
['c']
['b']
['a']
['d']
['a']
['b']
['e']
['d']
['c']
