@FollowerOfScriabin

References:
https://www.tensorflow.org/guide/datasets

https://cyc1am3n.github.io/2018/09/13/how-to-use-dataset-in-tensorflow.html

https://www.tensorflow.org/api_docs/python/tf/data/Dataset


https://hiseon.me/2018/04/15/tensorflow-dataset/

https://locslab.github.io/Tensorflow-Dataset-API(2)/

https://stackoverflow.com/questions/47091726/difference-between-tf-data-dataset-map-and-tf-data-dataset-apply

https://stackoverflow.com/questions/45292517/how-do-i-use-the-group-by-window-function-in-tensorflow


# How to process a data with tf.data.Dataset?
### Explained by diverse examples

In [None]:
import numpy as np
import tensorflow as tf
import collections

# Static numpy data : one shot iterator

In [None]:
#data.make one shot iter cannot use placeholder as input. It can't parameterize.
x = tf.placeholder(tf.float32,shape=[None,2])

e = np.random.sample((3,6,2))
data = tf.data.Dataset.from_tensor_slices(e)
print(data.output_types, data.output_shapes)

iterator = data.make_one_shot_iterator()
features = iterator.get_next()


with tf.Session() as sess:
    while True:
        try:
            print(sess.run(features))
        except tf.errors.OutOfRangeError as e:
            #print(e)
            break

# Dynamic numpy data : initializable iterator

In [None]:
#use initializable iterator to input placeholder
#or it can use to process dict/namedtuple
e = np.random.sample((10,2))
x = tf.placeholder(tf.float32, shape=[None,2])
data = tf.data.Dataset.from_tensor_slices(x)

iterator = data.make_initializable_iterator()

#Note: The returned iterator will be in an uninitialized state, and you must run the iterator.initializer operation before using it:
#dataset = ...
#iterator = dataset.make_initializable_iterator()
# sess.run(iterator.initializer)

features = iterator.get_next()

with tf.Session() as sess:
    sess.run(iterator.initializer, feed_dict = {x:e})
    
    while True:
        try:
            print(sess.run(features))
        except tf.errors.OutOfRangeError:
            break


In [None]:
train_x = (np.random.sample((10,2)), np.random.sample((10,1)))
test_x = (np.ones([1,2]).astype(np.int32), np.zeros([1,1]).astype(np.int32))
epochs = 10
x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1])
dataset = tf.data.Dataset.from_tensor_slices((x, y))
iter = dataset.make_initializable_iterator()
features, labels = iter.get_next()
with tf.Session() as sess:
#     initialise iterator with train data
    sess.run(iter.initializer, feed_dict={ x: train_x[0], y: train_x[1]})
    for _ in range(epochs):
        sess.run([features, labels])
#     switch to test data
    sess.run(iter.initializer, feed_dict={ x: test_x[0], y: test_x[1]})
    print(sess.run([features, labels]))

# Dynamic numpy data : reinitializable iterator

In [None]:
# Reinitializable iterator to switch between Datasets
train_x = tf.data.Dataset.from_tensor_slices((np.random.sample((10,2)), np.random.sample((10,1))))
test_x = tf.data.Dataset.from_tensor_slices((np.random.sample((5,2)), np.random.sample((5,1))))
epochs = 10
iter = tf.data.Iterator.from_structure(train_x.output_types,train_x.output_shapes)
features, labels = iter.get_next()

train_init = iter.make_initializer(train_x)
test_init = iter.make_initializer(test_x)
with tf.Session() as sess:
#     initialise iterator with train data
    sess.run(train_init)
    for _ in range(epochs):
        print(sess.run([features, labels]))
#     switch to test data
    sess.run(test_init)
    print(sess.run([features, labels]))

# From_generator

In [None]:
sequence = np.array([[1,2],[3],[4,5,6],[7]])
def gen():
    for element in sequence:
        yield element
dataset = tf.data.Dataset.from_generator(gen, output_types=(tf.int32),
                                         output_shapes=(tf.TensorShape([None]))) # scalar : []
iter = dataset.make_initializable_iterator()
features = iter.get_next()
with tf.Session() as sess:
    sess.run(iter.initializer)
    print(sess.run(features))
    print(sess.run(features))
    #print(sess.run(features))    

* NOTE: The current implementation of Dataset.from_generator() uses tf.py_func and inherits the same constraints. In particular, it requires the Dataset- and Iterator-related operations to be placed on a device in the same process as the Python program that called Dataset.from_generator(). The body of generator will not be serialized in a GraphDef, and you should not use this method if you need to serialize your model and restore it in a different environment.

* NOTE: If generator depends on mutable global variables or other external state, be aware that the runtime may invoke generator multiple times (in order to support repeating the Dataset) and at any time between the call to Dataset.from_generator() and the production of the first element from the generator. Mutating global variables or external state can cause undefined behavior, and we recommend that you explicitly cache any external state in generator before calling Dataset.from_generator().

# Static dict, namedtuple data

In [None]:
#namedtuple
Sample = collections.namedtuple('sample_data', ['a','b']) # ['a','b'] ='a b'
sample_data = Sample(
    tf.random_uniform([4],maxval=5,dtype=tf.int32), tf.random_uniform([4, 100], 
                                              maxval=100, dtype=tf.int32))
dataset = tf.data.Dataset.from_tensor_slices(sample_data)
print(dataset.output_types)     # ==> sample_data(a=tf.float32, b=tf.int32)
print(dataset.output_shapes)    # ==> sample_data(a=TensorShape([]), b=TensorShape([Dimension(100)]))
print(dataset.output_classes)
print(dataset.output_types.a)   # ==> <dtype: 'float32'>
print(dataset.output_types.b)   # ==> <dtype: 'int32'>
print(dataset.output_shapes.a)  # ==> ()
print(dataset.output_shapes.b)  # ==> (100, )


# dictionary
dataset = tf.data.Dataset.from_tensor_slices(
    {
        'a': tf.random_uniform([4]),
        'b': tf.random_uniform([4, 100], maxval=100, dtype=tf.int32)
    }
)
print(dataset.output_types)     # ==> {'a' : tf.float32, 'b' : tf.int32}
print(dataset.output_shapes)    # ==> {'a': TensorShape([]), 'b': TensorShape([Dimension(100)])}
print(dataset.output_types['a'])    # ==> <dtype: 'float32'>
print(dataset.output_types['b'])    # ==> <dtype: 'int32'>
print(dataset.output_shapes['a'])   # ==> ()
print(dataset.output_shapes['b'])   # ==> (100, )

In [None]:
dict =    {
        'a': np.random.sample((10,1)),
        'b': np.random.sample((10,4))
    }



x, y = tf.placeholder(tf.float32, shape=[None,1]), tf.placeholder(tf.float32, shape=[None,4])
dataset = tf.data.Dataset.from_tensor_slices((x,y))

iterator = dataset.make_initializable_iterator()
features, labels = iterator.get_next()

with tf.Session() as sess:
    sess.run(iterator.initializer, feed_dict = {x:dict['a'], y:dict['b']})
    
    while True:
        try:
            print(sess.run([features,labels]))
        except tf.errors.OutOfRangeError:
            break
#Note: The returned iterator will be in an uninitialized state, and you must run the iterator.initializer operation before using it:
#dataset = ...
#iterator = dataset.make_initializable_iterator()
# sess.run(iterator.initializer)

# Dataset Transformation by groupbywindow

In [None]:
def al(x,r):
    print(x.shape,type(x))
    print(r,type(r),type(r.batch(3)))
    return r.batch(10)

e = np.arange(100).astype(np.int64)
data = tf.data.Dataset.from_tensor_slices(e) # = tf.data.Dataset.range(100)
dataset = data.apply(tf.contrib.data.group_by_window(key_func=lambda x: x% 3, reduce_func= al,window_size=100))
iterator = dataset.make_one_shot_iterator()
features = iterator.get_next()
sess = tf.Session()
sess.run(features) 

# Dataset Transformation by map

In [None]:
data = tf.data.Dataset.from_tensor_slices(e)
dataset = data.map(lambda x: x*2)
iterator = dataset.make_one_shot_iterator()
features = iterator.get_next()
sess = tf.Session()
print(sess.run(features))
print(sess.run(features))

* The difference is that map will execute one function on every element of the Dataset separately, whereas apply will execute one function on the whole Dataset at once (such as group_by_window given as example in the documentation).

* The argument of apply is a function that takes a Dataset and returns a Dataset when the argument of map is a function that takes one element and returns one transformed element.

# Concatenate

In [None]:
# Input dataset and dataset to be concatenated should have same
# nested structures and output types.
# c = { (8, 9), (10, 11), (12, 13) }
# d = { 14.0, 15.0, 16.0 }
# a.concatenate(c) and a.concatenate(d) would result in error.

e1 = np.arange(100).astype(np.int64)
data1 = tf.data.Dataset.from_tensor_slices(e1)
e2 = np.arange(100).astype(np.int64)
data2 = tf.data.Dataset.from_tensor_slices(e2)

data1.concatenate(data2) # cocatenate two datasets

# Shard (for distributed computing)

In [None]:
#https://github.com/tensorflow/examples/blob/master/community/en/docs/deploy/distributed.md
'''
e = np.arange(100).astype(np.int64)
data = tf.data.Dataset.from_tensor_slices(e)
data = data.shard(FLAGS.num_workers, FLAGS.worker_index)
iter = data.make_one_shot_iterator()
features = iter.get_next()
with tf.Session as sess:
    for i in range(input):
        print(sess.run(features))
'''

# Filter

In [None]:
#static

def filter_func(x):
    print(x.shape)
    r = tf.equal(x % 4,0)
    return r
    
e = np.arange(100).astype(np.int64)
data = tf.data.Dataset.from_tensor_slices(e)
print(data)
data_filtered = data.filter(predicate=filter_func)
iter = data_filtered.make_one_shot_iterator()
features = iter.get_next()
with tf.Session() as sess:
    while True:
        try:
            print(sess.run(features))
        except tf.errors.OutOfRangeError:
            break
    

In [None]:
#dynamic
def filter_func(x):
    r = tf.greater(x,5)
    return r

shape = (10,3)
e = np.random.sample(shape) * 10
e = e.flatten()

x= tf.placeholder(tf.float32, shape=[None])
data = tf.data.Dataset.from_tensor_slices(x)
data_filtered = data.filter(filter_func)
iter = data_filtered.make_initializable_iterator()
features = iter.get_next()
with tf.Session() as sess:
    sess.run(iter.initializer, feed_dict={x:e})
    while True:
        try:
            print(sess.run(features))
        except tf.errors.OutOfRangeError:
            break

    

# Flat_map

In [None]:
e= np.random.sample((10,3))
e = tf.data.Dataset.from_tensor_slices(e)
print(e.output_shapes,e.output_types)
data = e.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x))
print(data.output_shapes,data.output_types)

In [None]:
e = tf.data.Dataset.from_tensor_slices(np.random.sample((10,3)) * 10)
data_flatten = e.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x))

def filter_func(t):
    r = tf.greater(t,5)
    print(r.shape)
    return(r)
    
data_filtered = data_flatten.filter(predicate=filter_func)
iter = data_filtered.make_one_shot_iterator()
features = iter.get_next()
with tf.Session() as sess:
    while True:
        try:
            print(sess.run(features))
        except tf.errors.OutOfRangeError:
            break

# repeat, batch, shuffle

In [None]:
#data.make one shot iter cannot use placeholder as input. It can't parameterize.
x = tf.placeholder(tf.float32,shape=[None,2])

e = np.random.sample((20,2))
data = tf.data.Dataset.from_tensor_slices(e)
data = data.shuffle(30).repeat(5).batch(10) #shuffle data and repeat 5 times, and extract batch by 10

iterator = data.make_one_shot_iterator()
features = iterator.get_next()


with tf.Session() as sess:
    while True:
        try:
            print(sess.run(features))
        except tf.errors.OutOfRangeError as e:
            #print(e)
            break