# tf.data.Dataset

<b>tf.data</b> is an API for input pipelines: makes it possible to handle large amounts of data, read from different data formats, and perform complex transformations<br>

It has several classes, such as:
* <b><i>class Dataset</i></b>: Base class containing methods to create and transform datasets. Also allows you initialize a dataset from data in memory, or from a Python generator.
* <b><i>class DatasetSpec</i></b>: Type specification for tf.data.Dataset.
* <b><i>class FixedLengthRecordDataset</i></b>: A Dataset of fixed-length records from one or more binary files.
* <b><i>class Options</i></b>: Represents options for tf.data.Dataset.
* <b><i>class TFRecordDataset</i></b>: A Dataset comprising records from one or more TFRecord files.
* <b><i>class TextLineDataset</i></b>: A Dataset comprising lines from one or more text files.

This notebook focuses on <b>tf.data.Dataset</b> which is: <br>
* Abstraction representing a sequence of "elements"
* "elements" may consist of one or more components (elem could be train example w 2-components: features and label)
	⁃	of any type representable by tf.TypeSpec, including tf.Tensor, tf.data.Dataset, tf.SparseTensor, * tf.RaggedTensor, or tf.TensorArray
* creation from data source in mem or files OR as output of some transformation
* more performant than feed_dict
* base class: allows init dataset from in-mem data or Python generator

This notebook has sections demonstrating:
* Creation
* Iterators
* Mods - such as shuffle and batch
* End 2 End Examples
* Performance

<b>Resources:</b><br>
API: https://www.tensorflow.org/api_docs/python/tf/data/Dataset <br>
Guides - Build Input Pipeline: https://www.tensorflow.org/guide/data <br>
Tutorial: https://www.tensorflow.org/tutorials/estimator/linear <br>
blogpost: https://developers.googleblog.com/2017/09/introducing-tensorflow-datasets.html <br>
blogpost: https://towardsdatascience.com/how-to-use-dataset-in-tensorflow-c758ef9e4428 <br>
blogpost: https://medium.com/ymedialabs-innovation/how-to-use-dataset-and-iterators-in-tensorflow-with-code-samples-3bb98b6b74ab


In [1]:
import tensorflow as tf
import pprint
import numpy as np
import pandas as pd
tf.enable_eager_execution()

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


## Creating Datasets

In [None]:
# from a NUMPY ARRAY
x = np.random.sample((100,2))    # make a numpy array

dataset = tf.data.Dataset.from_tensor_slices(x)

for tensorObj in dataset:
    print(str(tensorObj.numpy()))

# NOTE: could user iterator, but apparently that is deprecated
#iter = dataset.make_one_shot_iterator()
#el = iter.get_next()

#with tf.Session() as sess:
    #print(sess.run(el))
    

In [None]:
# from TUPLE of NUMPY ARRAYS
features, labels = (np.random.sample((100,2)), np.random.sample((100,1)))  #TYPE: (numpy.ndarray, numpy.ndarray)
dataset = tf.data.Dataset.from_tensor_slices((features,labels))            #TYPE: tensorflow.python.data.ops.dataset_ops.DatasetV1Adapter

iter = dataset.make_one_shot_iterator()
el = iter.get_next()

with tf.Session() as sess:
    print(sess.run(el))

In [None]:
# from a TENSOR 
tensor = tf.random_uniform([100, 2])                  #TYPE: tensorflow.python.framework.ops.Tensor
dataset = tf.data.Dataset.from_tensor_slices(tensor)  #TYPE: tensorflow.python.data.ops.dataset_ops.DatasetV1Adapter

iter = dataset.make_initializable_iterator()
el = iter.get_next()

with tf.Session() as sess:
    sess.run(iter.initializer)
    print(sess.run(el))

In [None]:
# using a placeholder
x = tf.placeholder(tf.float32, shape=[None,2])
dataset = tf.data.Dataset.from_tensor_slices(x)

data = np.random.sample((100,2))

iter = dataset.make_initializable_iterator()
el = iter.get_next()

with tf.Session() as sess:
    sess.run(iter.initializer, feed_dict={ x: data })
    print(sess.run(el))

In [None]:
# from generator
sequence = np.array([[[1]],[[2],[3]],[[3],[4],[5]]])

def generator():
    for el in sequence:
        yield el

dataset = tf.data.Dataset().batch(1).from_generator(generator,
                                           output_types= tf.int64, 
                                           output_shapes=(tf.TensorShape([None, 1])))

iter = dataset.make_initializable_iterator()
el = iter.get_next()

with tf.Session() as sess:
    sess.run(iter.initializer)
    print(sess.run(el))
    print(sess.run(el))
    print(sess.run(el))


In [None]:
# From a CSV file
CSV_PATH = './tweets.csv'
dataset = tf.contrib.data.make_csv_dataset(CSV_PATH, batch_size=32)
iter = dataset.make_one_shot_iterator()
next = iter.get_next()
print(next) # next is a dict with key=columns names and value=column data
inputs, labels = next['text'], next['sentiment']

with  tf.Session() as sess:
    print(sess.run([inputs,labels]))

In [2]:
# From Pandas Dataframe or Series

# get some data into a Pandas Dataframe
URL = 'https://storage.googleapis.com/applied-dl/heart.csv'
dataframe = pd.read_csv(URL)
print(dataframe.head())

# convert features and labels into SERIES object
labels = dataframe.pop('target')              #TYPE: pandas.core.series.Series
features = dict(dataframe)                    #TYPE: dict where KEY is column name and VAL is col as pandas.core.series.Series

print("\nCOLUMNS")
print(features.keys())
print("\nAGE Column")
print(features['age'][0:4])

# make dataset from tupple of Pandas Series objs
dataset = tf.data.Dataset.from_tensor_slices((features,labels))
pp = pprint.PrettyPrinter()
pp.pprint(dataset.output_classes)
pp.pprint(dataset.output_types)
pp.pprint(dataset.output_shapes)

# make a small batch for looking at
dataset = dataset.batch(5)

# now take a batch and look at it
for feature_batch, label_batch in dataset.take(1):
    print('Every feature:', list(feature_batch.keys()))
    print('A batch of ages:', feature_batch['age'].numpy())   #note: the numpy() bit only works w "eager execution"
    print('A batch of targets:', label_batch.numpy())
    break
    

   age  sex  cp  trestbps  chol  fbs  restecg  thalach  exang  oldpeak  slope  \
0   63    1   1       145   233    1        2      150      0      2.3      3   
1   67    1   4       160   286    0        2      108      1      1.5      2   
2   67    1   4       120   229    0        2      129      1      2.6      2   
3   37    1   3       130   250    0        0      187      0      3.5      3   
4   41    0   2       130   204    0        2      172      0      1.4      1   

   ca        thal  target  
0   0       fixed       0  
1   3      normal       1  
2   2  reversible       0  
3   0      normal       0  
4   0      normal       0  

COLUMNS
dict_keys(['age', 'sex', 'cp', 'trestbps', 'chol', 'fbs', 'restecg', 'thalach', 'exang', 'oldpeak', 'slope', 'ca', 'thal'])

AGE Column
0    63
1    67
2    67
3    37
Name: age, dtype: int64
Instructions for updating:
Use `tf.compat.v1.data.get_output_classes(dataset)`.
({'age': <class 'tensorflow.python.framework.ops.Tensor'>,
  'ca

# Iterators

In [None]:
# initializable iterator to switch between data
EPOCHS = 10

x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1])
dataset = tf.data.Dataset.from_tensor_slices((x, y))

train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.array([[1,2]]), np.array([[0]]))

iter = dataset.make_initializable_iterator()
features, labels = iter.get_next()

with tf.Session() as sess:
#     initialise iterator with train data
    sess.run(iter.initializer, feed_dict={ x: train_data[0], y: train_data[1]})
    for _ in range(EPOCHS):
        sess.run([features, labels])
#     switch to test data
    sess.run(iter.initializer, feed_dict={ x: test_data[0], y: test_data[1]})
    print(sess.run([features, labels]))

    
    

In [None]:
# Reinitializable iterator to switch between Datasets
EPOCHS = 10
# making fake data using numpy
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.random.sample((10,2)), np.random.sample((10,1)))
# create two datasets, one for training and one for test
train_dataset = tf.data.Dataset.from_tensor_slices(train_data)
test_dataset = tf.data.Dataset.from_tensor_slices(test_data)
# create a iterator of the correct shape and type
iter = tf.data.Iterator.from_structure(train_dataset.output_types,
                                           train_dataset.output_shapes)
features, labels = iter.get_next()
# create the initialisation operations
train_init_op = iter.make_initializer(train_dataset)
test_init_op = iter.make_initializer(test_dataset)

with tf.Session() as sess:
    sess.run(train_init_op) # switch to train dataset
    for _ in range(EPOCHS):
        sess.run([features, labels])
    sess.run(test_init_op) # switch to val dataset
    print(sess.run([features, labels]))

    
    

In [None]:
# feedable iterator to switch between iterators
EPOCHS = 10
# making fake data using numpy
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.random.sample((10,2)), np.random.sample((10,1)))
# create placeholder
x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1])
# create two datasets, one for training and one for test
train_dataset = tf.data.Dataset.from_tensor_slices((x,y))
test_dataset = tf.data.Dataset.from_tensor_slices((x,y))
# create the iterators from the dataset
train_iterator = train_dataset.make_initializable_iterator()
test_iterator = test_dataset.make_initializable_iterator()
# same as in the doc https://www.tensorflow.org/programmers_guide/datasets#creating_an_iterator
handle = tf.placeholder(tf.string, shape=[])
iter = tf.data.Iterator.from_string_handle(
    handle, train_dataset.output_types, train_dataset.output_shapes)
next_elements = iter.get_next()

with tf.Session() as sess:
    train_handle = sess.run(train_iterator.string_handle())
    test_handle = sess.run(test_iterator.string_handle())
    
    # initialise iterators. In our case we could have used the 'one-shot' iterator instead,
    # and directly feed the data insted the Dataset.from_tensor_slices function, but this
    # approach is more general
    sess.run(train_iterator.initializer, feed_dict={ x: train_data[0], y: train_data[1]})
    sess.run(test_iterator.initializer, feed_dict={ x: test_data[0], y: test_data[1]})
    
    for _ in range(EPOCHS):
        x,y = sess.run(next_elements, feed_dict = {handle: train_handle})
        print(x, y)
        
    x,y = sess.run(next_elements, feed_dict = {handle: test_handle})
    print(x,y)

# Mods - shuffle, batch, repeat, map, etc

In [None]:
# BATCHING
BATCH_SIZE = 4
x = np.random.sample((100,2))
# make a dataset from a numpy array
dataset = tf.data.Dataset.from_tensor_slices(x).batch(BATCH_SIZE)

iter = dataset.make_one_shot_iterator()
el = iter.get_next()

with tf.Session() as sess:
    print(sess.run(el))

In [None]:
# REPEAT
BATCH_SIZE = 4
x = np.array([[1],[2],[3],[4]])
# make a dataset from a numpy array
dataset = tf.data.Dataset.from_tensor_slices(x)
dataset = dataset.repeat()

iter = dataset.make_one_shot_iterator()
el = iter.get_next()

with tf.Session() as sess:
    for _ in range(8):
        print(sess.run(el))

In [None]:
# MAP
x = np.array([[1],[2],[3],[4]])
# make a dataset from a numpy array
dataset = tf.data.Dataset.from_tensor_slices(x)
dataset = dataset.map(lambda x: x*2)

iter = dataset.make_one_shot_iterator()
el = iter.get_next()

with tf.Session() as sess:
#     this will run forever
        for _ in range(len(x)):
            print(sess.run(el))

In [None]:
# SHUFFLE
BATCH_SIZE = 4
x = np.array([[1],[2],[3],[4]])
# make a dataset from a numpy array
dataset = tf.data.Dataset.from_tensor_slices(x)
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.batch(BATCH_SIZE)

iter = dataset.make_one_shot_iterator()
el = iter.get_next()

with tf.Session() as sess:
    print(sess.run(el))

# End 2 End Examples

In [None]:
# how to pass the value to a model
EPOCHS = 10
BATCH_SIZE = 16
# using two numpy arrays
features, labels = (np.array([np.random.sample((100,2))]), 
                    np.array([np.random.sample((100,1))]))

dataset = tf.data.Dataset.from_tensor_slices((features,labels)).repeat().batch(BATCH_SIZE)

iter = dataset.make_one_shot_iterator()
x, y = iter.get_next()

# make a simple model
net = tf.layers.dense(x, 8, activation=tf.tanh) # pass the first value from iter.get_next() as input
net = tf.layers.dense(net, 8, activation=tf.tanh)
prediction = tf.layers.dense(net, 1, activation=tf.tanh)

loss = tf.losses.mean_squared_error(prediction, y) # pass the second value from iter.get_net() as label
train_op = tf.train.AdamOptimizer().minimize(loss)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    for i in range(EPOCHS):
        _, loss_value = sess.run([train_op, loss])
        print("Iter: {}, Loss: {:.4f}".format(i, loss_value))

In [None]:
# Wrapping all together -> Switch between train and test set using Initializable iterator
EPOCHS = 10
# create a placeholder to dynamically switch between batch sizes
batch_size = tf.placeholder(tf.int64)
BATCH_SIZE = 32

x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1])
dataset = tf.data.Dataset.from_tensor_slices((x, y)).batch(batch_size).repeat()

# using two numpy arrays
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.random.sample((20,2)), np.random.sample((20,1)))

iter = dataset.make_initializable_iterator()
features, labels = iter.get_next()
# make a simple model
net = tf.layers.dense(features, 8, activation=tf.tanh) # pass the first value from iter.get_next() as input
net = tf.layers.dense(net, 8, activation=tf.tanh)
prediction = tf.layers.dense(net, 1, activation=tf.tanh)

loss = tf.losses.mean_squared_error(prediction, labels) # pass the second value from iter.get_net() as label
train_op = tf.train.AdamOptimizer().minimize(loss)

n_batches = train_data[0].shape[0] // BATCH_SIZE

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    # initialise iterator with train data
    sess.run(iter.initializer, feed_dict={ x: train_data[0], y: train_data[1], batch_size: BATCH_SIZE})
    print('Training...')
    for i in range(EPOCHS):
        tot_loss = 0
        for _ in range(n_batches):
            _, loss_value = sess.run([train_op, loss])
            tot_loss += loss_value
        print("Iter: {}, Loss: {:.4f}".format(i, tot_loss / n_batches))
    # initialise iterator with test data
    sess.run(iter.initializer, feed_dict={ x: test_data[0], y: test_data[1], batch_size: test_data[0].shape[0]})
    print('Test Loss: {:4f}'.format(sess.run(loss)))


In [None]:
# Wrapping all together -> Switch between train and test set using Reinitializable iterator
EPOCHS = 10
# create a placeholder to dynamically switch between batch sizes
batch_size = tf.placeholder(tf.int64)

x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1])
train_dataset = tf.data.Dataset.from_tensor_slices((x,y)).batch(batch_size).repeat()
test_dataset = tf.data.Dataset.from_tensor_slices((x,y)).batch(batch_size) # always batch even if you want to one shot it
# using two numpy arrays
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.random.sample((20,2)), np.random.sample((20,1)))

# create a iterator of the correct shape and type
iter = tf.data.Iterator.from_structure(train_dataset.output_types,
                                           train_dataset.output_shapes)
features, labels = iter.get_next()
# create the initialisation operations
train_init_op = iter.make_initializer(train_dataset)
test_init_op = iter.make_initializer(test_dataset)

# make a simple model
net = tf.layers.dense(features, 8, activation=tf.tanh) # pass the first value from iter.get_next() as input
net = tf.layers.dense(net, 8, activation=tf.tanh)
prediction = tf.layers.dense(net, 1, activation=tf.tanh)

loss = tf.losses.mean_squared_error(prediction, labels) # pass the second value from iter.get_net() as label
train_op = tf.train.AdamOptimizer().minimize(loss)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    # initialise iterator with train data
    sess.run(train_init_op, feed_dict = {x : train_data[0], y: train_data[1], batch_size: 16})
    print('Training...')
    for i in range(EPOCHS):
        tot_loss = 0
        for _ in range(n_batches):
            _, loss_value = sess.run([train_op, loss])
            tot_loss += loss_value
        print("Iter: {}, Loss: {:.4f}".format(i, tot_loss / n_batches))
    # initialise iterator with test data
    sess.run(test_init_op, feed_dict = {x : test_data[0], y: test_data[1], batch_size:len(test_data[0])})
    print('Test Loss: {:4f}'.format(sess.run(loss)))


# Performance

In [None]:
log_time = {}
# copied form https://medium.com/pythonhive/python-decorator-to-measure-the-execution-time-of-methods-fa04cb6bb36d
def how_much(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        
        if 'log_time' in kw:
            name = kw.get('log_name', method.__name__)
            kw['log_time'][name] = (te - ts)
            
        return result
    return timed

In [None]:
# benchmark
import time
DATA_SIZE = 5000
DATA_SHAPE = ((32,32),(20,))
BATCH_SIZE = 64 
N_BATCHES = DATA_SIZE // BATCH_SIZE
EPOCHS = 10

test_size = (DATA_SIZE//100)*20 

train_shape = ((DATA_SIZE, *DATA_SHAPE[0]),(DATA_SIZE, *DATA_SHAPE[1]))
test_shape = ((test_size, *DATA_SHAPE[0]),(test_size, *DATA_SHAPE[1]))
print(train_shape, test_shape)
train_data = (np.random.sample(train_shape[0]), np.random.sample(train_shape[1]))
test_data = (np.random.sample(test_shape[0]), np.random.sample(test_shape[1])) 

In [None]:
# used to keep track of the methodds
log_time = {}

tf.reset_default_graph()
sess = tf.InteractiveSession()

input_shape = [None, *DATA_SHAPE[0]] # [None, 64, 64, 3]
output_shape = [None,*DATA_SHAPE[1]] # [None, 20]
print(input_shape, output_shape)

x, y = tf.placeholder(tf.float32, shape=input_shape), tf.placeholder(tf.float32, shape=output_shape)

@how_much
def one_shot(**kwargs):
    print('one_shot')
    train_dataset = tf.data.Dataset.from_tensor_slices(train_data).batch(BATCH_SIZE).repeat()
    train_el = train_dataset.make_one_shot_iterator().get_next()
    
    test_dataset = tf.data.Dataset.from_tensor_slices(test_data).batch(BATCH_SIZE).repeat()
    test_el = test_dataset.make_one_shot_iterator().get_next()
    for i in range(EPOCHS):
        print(i)
        for _ in range(N_BATCHES):
            sess.run(train_el)
        for _ in range(N_BATCHES):
            sess.run(test_el)
            
@how_much
def initialisable(**kwargs):
    print('initialisable')
    dataset = tf.data.Dataset.from_tensor_slices((x, y)).batch(BATCH_SIZE).repeat()

    iter = dataset.make_initializable_iterator()
    elements = iter.get_next()
    
    for i in range(EPOCHS):
        print(i)
        sess.run(iter.initializer, feed_dict={ x: train_data[0], y: train_data[1]})
        for _ in range(N_BATCHES):
            sess.run(elements)
        sess.run(iter.initializer, feed_dict={ x: test_data[0], y: test_data[1]})
        for _ in range(N_BATCHES):
            sess.run(elements)
@how_much            
def reinitializable(**kwargs):
    print('reinitializable')
    # create two datasets, one for training and one for test
    train_dataset = tf.data.Dataset.from_tensor_slices((x,y)).batch(BATCH_SIZE).repeat()
    test_dataset = tf.data.Dataset.from_tensor_slices((x,y)).batch(BATCH_SIZE).repeat()
    # create a iterator of the correct shape and type
    iter = tf.data.Iterator.from_structure(train_dataset.output_types,
                                               train_dataset.output_shapes)
    elements = iter.get_next()
    # create the initialisation operations
    train_init_op = iter.make_initializer(train_dataset)
    test_init_op = iter.make_initializer(test_dataset)
    
    for i in range(EPOCHS):
        print(i)
        sess.run(train_init_op, feed_dict={ x: train_data[0], y: train_data[1]})
        for _ in range(N_BATCHES):
            sess.run(elements)
        sess.run(test_init_op, feed_dict={ x: test_data[0], y: test_data[1]})
        for _ in range(N_BATCHES):
            sess.run(elements)
@how_much            
def feedable(**kwargs):
    print('feedable')
    # create two datasets, one for training and one for test
    train_dataset = tf.data.Dataset.from_tensor_slices((x,y)).batch(BATCH_SIZE).repeat()
    test_dataset = tf.data.Dataset.from_tensor_slices((x,y)).batch(BATCH_SIZE).repeat()
    # create the iterators from the dataset
    train_iterator = train_dataset.make_initializable_iterator()
    test_iterator = test_dataset.make_initializable_iterator()

    handle = tf.placeholder(tf.string, shape=[])
    iter = tf.data.Iterator.from_string_handle(
        handle, train_dataset.output_types, train_dataset.output_shapes)
    elements = iter.get_next()

    train_handle = sess.run(train_iterator.string_handle())
    test_handle = sess.run(test_iterator.string_handle())

    sess.run(train_iterator.initializer, feed_dict={ x: train_data[0], y: train_data[1]})
    sess.run(test_iterator.initializer, feed_dict={ x: test_data[0], y: test_data[1]})

    for i in range(EPOCHS):
        print(i)
        for _ in range(N_BATCHES):
            sess.run(elements, feed_dict={handle: train_handle})
        for _ in range(N_BATCHES):
            sess.run(elements, feed_dict={handle: test_handle})
            
one_shot(log_time=log_time)
initialisable(log_time=log_time)
reinitializable(log_time=log_time)
feedable(log_time=log_time)

sorted((value,key) for (key,value) in log_time.items())
