## Queue

An illustration for using queues

In [1]:
import tensorflow as tf
import memory_util

  return f(*args, **kwds)


Queue with specified capacity and datatype

In [2]:
q = tf.FIFOQueue(capacity=3,dtypes=tf.float32)

Fill the queue

In [3]:
enq_op = q.enqueue(0)

Extract the element

In [4]:
x = q.dequeue()

Do some op

In [5]:
y = x+1
y = tf.Print(y,data=[y],message="y = ")

Add back into queue

In [6]:
q_inc = q.enqueue(y)

In [7]:
sess = tf.InteractiveSession()

In [8]:
enq_op.run()

In [9]:
def yf():
    with memory_util.capture_stderr() as mw:
      sess.run(q_inc)

    print(mw.getvalue())

In [10]:
yf()
yf()
yf()
yf()

y = [1]

y = [2]

y = [3]

y = [4]



**Remember**:
> Trying to dequeue when no element is present in the queue will freeze the program

**Queue Runner**

A QueueRunner will control the asynchronous execution of enqueue operations to **ensure that our queues never run dry**.

In [11]:
q = tf.FIFOQueue(capacity=3,dtypes=tf.float32)
enq_op = q.enqueue_many([[0,0,0],])

Create a queue runner object. It can create multiple threads of enqueue operations, all of which it will handle in an asynchronous fashion. Here, 3 threads will be created.

In [12]:
qr = tf.train.QueueRunner(q, [enq_op] * 3)
tf.train.add_queue_runner(qr)

The graph of y has a dequeue object. If we run y for the second time without enqueuing, our code will be frozen. But now, queue runner object will take care of that.

In [13]:
x = q.dequeue()
y = x+1
y = tf.Print(y,data=[y],message="y = ")

Enable asynchronous operations (Parallelization)

In [14]:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)

In [15]:
print(sess.run(y))
print(sess.run(y+1))
print(sess.run(y+2))

1.0
2.0
3.0


Join threads (Disable parallelization)

**Remember**: 
> Printing of threads need not be in linear order before joining threads

In [16]:
coord.request_stop()
coord.join(threads)

**Example 1**

[data]

In [25]:
q = tf.FIFOQueue(capacity=5,dtypes=tf.int32) 
enq = q.enqueue_many([[1,2,3,4,5],])
sess.run(enq)

In [26]:
print(sess.run(q.dequeue()))
print(sess.run(q.dequeue()))
print(sess.run(q.dequeue()))
print(sess.run(q.dequeue()))
print(sess.run(q.dequeue()))

1
2
3
4
5


**Example 2**

[data, label]

In [27]:
data = tf.random_normal((20,5))
labels = tf.range(0,limit=20)

q = tf.FIFOQueue(capacity=20,dtypes=[tf.float32,tf.int32]) 
data_q = q.enqueue_many([data,labels])
x,l = q.dequeue()

In [28]:
sess.run(data_q)
print(sess.run([x,l]))
print(sess.run([x,l]))
print(sess.run([x,l]))
print(sess.run([x,l]))
print(sess.run([x,l]))

[array([ 0.36239254,  2.1364446 ,  0.5059319 , -1.3780541 , -0.5089628 ],
      dtype=float32), 0]
[array([ 0.6716266 , -2.2305787 ,  0.4523175 , -0.6288688 , -0.20258127],
      dtype=float32), 1]
[array([ 1.2920688 , -0.30235445,  0.529395  , -0.61158234, -0.99176794],
      dtype=float32), 2]
[array([-1.9537941 , -0.7215987 , -1.0443631 , -1.6029885 ,  0.38794613],
      dtype=float32), 3]
[array([-0.25549248,  0.9723263 ,  0.4431345 ,  0.02318975,  1.1907212 ],
      dtype=float32), 4]
