In [2]:
import tensorflow as tf
import tensorflow.data as tfd
import numpy as np

# Creating datasets from Sequence
In general I am quite sick of the `tf.data.Dataset` object. It's mappers work only on tensor graphs, even for eager execution. This makes the entire API set pretty non-intuitive. An alternative is to build my own pipeline and wrap it inside the `tf.keras.utils.Sequence` object.

The `__getitem__` method should return a batch. Keras will take care of shuffling the batches, so that `batch[n+1]` will not always follow `batch[n]`, but my code will have to take care of shuffling within the batch, so that `batch[m]` in one epoch is not exactly same as `batch[m]` in another epoch.

In [12]:
class MyDataset(tf.keras.utils.Sequence):
    def __init__(self):
        self.w = np.array([1., 2., 3.])
        self.b = 0.5
        self._num_batches = 100
        self._batch_size = 8
    
    def __len__(self):
        return self._num_batches
    
    def __getitem__(self, idx):
        x = np.random.random((self._batch_size, 3))
        y = (x@self.w) + self.b
        d = np.random.standard_normal(self._batch_size)
        return x, y+d
        

Now lets use this in a simple linear regression model.

In [4]:
import tensorflow.keras as keras
import tensorflow.keras.layers as layers

In [5]:
x = layers.Input(shape=(3,))
y_hat = layers.Dense(1)(x)
model = keras.Model(x, y_hat)
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 3)]               0         
_________________________________________________________________
dense (Dense)                (None, 1)                 4         
Total params: 4
Trainable params: 4
Non-trainable params: 0
_________________________________________________________________


In [16]:
model.compile(optimizer="sgd", loss="mse", metrics=["mae"])

In [17]:
model.fit(MyDataset(), epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x130f78c18>

# Creating datasets with Dataset

Given, I don't like `Dataset`, why bother with it all? There are a bunch of ready-made datasets offered by `tensorflow-datasets` package. These output the data as `Dataset` objects so it is useful to know how to use it.

The `Dataset` object is the workhorse object of all data manipulations. This is an abstract base class and every data provider must implement their own `Dataset` subclass. However, there are a number of built-in static methods that can be used for soem generic scenarios. The two most common ones are to create a dataset from a numpy/tf tensor; and to create a dataset from a python generator.

## Creating datasets from tensors

When given a simple array, each element of the array will form a single element. When given a matrix, each row will form a single element. It is also possible to give a tuple of tensors as input. In this case, each tuple will be sliced elementwise.

In [2]:
ds = tfd.Dataset.from_tensor_slices([8, 3, 0, 8, 2, 1])
ds

<TensorSliceDataset shapes: (), types: tf.int32>

The most general way to accessing the dataset is to run it through a `for-loop`.

In [3]:
for x in ds:
    print(x)

tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)


Some dataset implementations will also expose an iterator. The `TensorSliceDataset` does that. But not all datasets will.

In [5]:
it = iter(ds)
x = next(it)
x

<tf.Tensor: id=30, shape=(), dtype=int32, numpy=8>

In [6]:
ds = tfd.Dataset.from_tensor_slices([
    [1., 2., 3.],
    [4., 5., 6.]
])
for i, x in enumerate(ds):
    print(i, x)

0 tf.Tensor([1. 2. 3.], shape=(3,), dtype=float32)
1 tf.Tensor([4. 5. 6.], shape=(3,), dtype=float32)


In [8]:
mat = tf.constant(np.array([
    [1., 2., 3.],
    [4., 5., 6.]
]))
vec = tf.constant(np.array([
    [10.],
    [20.]
]))
ds = tfd.Dataset.from_tensor_slices((vec, mat))
for i, x in enumerate(ds):
    # x will be a two element tuple, with the first element from vec and the second from mat
    print(i, x)

0 (<tf.Tensor: id=51, shape=(1,), dtype=float64, numpy=array([10.])>, <tf.Tensor: id=52, shape=(3,), dtype=float64, numpy=array([1., 2., 3.])>)
1 (<tf.Tensor: id=55, shape=(1,), dtype=float64, numpy=array([20.])>, <tf.Tensor: id=56, shape=(3,), dtype=float64, numpy=array([4., 5., 6.])>)


## Creating datasets from generators
It is good practics to provide the `output_shapes` arg when creating datasets from generators. If a particular dimension is unknown or is variable, use `None`.

In [9]:
def count(stop):
    i = 0
    while i < stop:
        yield np.random.randint(10, 1000, (3, 2))
        i += 1

In [10]:
ds = tfd.Dataset.from_generator(count, args=[5], output_types=tf.int32, output_shapes=(3, 2))

W0815 17:25:05.838013 4577142208 deprecation.py:323] From /Users/avilay/venvs/ai/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:505: py_func (from tensorflow.python.ops.script_ops) is deprecated and will be removed in a future version.
Instructions for updating:
tf.py_func is deprecated in TF V2. Instead, there are two
    options available in V2.
    - tf.py_function takes a python function which manipulates tf eager
    tensors instead of numpy arrays. It's easy to convert a tf eager tensor to
    an ndarray (just call tensor.numpy()) but having access to eager tensors
    means `tf.py_function`s can use accelerators such as GPUs as well as
    being differentiable using a gradient tape.
    - tf.numpy_function maintains the semantics of the deprecated tf.py_func
    (it is not differentiable, and manipulates numpy arrays). It drops the
    stateful argument making all functions stateful.
    


In [11]:
for i, x in enumerate(ds):
    print(i, x)

0 tf.Tensor(
[[691 875]
 [688 677]
 [365 742]], shape=(3, 2), dtype=int32)
1 tf.Tensor(
[[706 518]
 [291 967]
 [177 168]], shape=(3, 2), dtype=int32)
2 tf.Tensor(
[[483 919]
 [299 290]
 [677 379]], shape=(3, 2), dtype=int32)
3 tf.Tensor(
[[644 994]
 [278 486]
 [673 152]], shape=(3, 2), dtype=int32)
4 tf.Tensor(
[[178  29]
 [823 978]
 [249 164]], shape=(3, 2), dtype=int32)


In [19]:
def count_rnd(stop):
    i = 0
    while i < stop:
        nrows = np.random.randint(2, 5)
        yield np.random.random((nrows, 3))
        i += 1

In [20]:
ds = tfd.Dataset.from_generator(count_rnd, args=[3], output_types=tf.float32, output_shapes=(None, 3))

In [21]:
for i, x in enumerate(ds):
    print(i, x)

0 tf.Tensor(
[[0.38219634 0.40371072 0.7774332 ]
 [0.98708916 0.30993912 0.00245317]], shape=(2, 3), dtype=float32)
1 tf.Tensor(
[[0.57388437 0.89134675 0.44393557]
 [0.80804914 0.1617799  0.8996212 ]], shape=(2, 3), dtype=float32)
2 tf.Tensor(
[[0.18673097 0.5564268  0.08767605]
 [0.65918577 0.31532907 0.04343607]
 [0.7139551  0.42873502 0.7228817 ]], shape=(3, 3), dtype=float32)


## Zipping arbitrary datasets
TODO

## Creating pipelines

`Dataset` object has a bunch of Spark like methods to process data, e.g., `map`. The weird thing about `map` is that it internally converts eager tensors into graph tensors. This means that I cannot call `x.numpy()` for example. In the example below see that the type of the tensor inside the mapper is `tensorflow.python.framework.ops.Tensor` not to be confused with `tf.Tensor`. And inside the for loop the tensors are avialable again as eager tensors `tensorflow.python.framework.ops.EagerTensor`.

`map` can be parallelized. For the most part I can use the `tf.data.experimental.AUTOTUNE` constant to let tf dynamically scale with the number of avialable CPUs. This is very useful when reading data from disk or network. See example of this in the image processing example.

Another useful method is `take(n)` which works as expected.

In [34]:
def gen_vec(count=5):
    for _ in range(count):
        yield tf.random.uniform((3,))

def mapper(x):
    print(type(x))
    return x
    
    
for x in tfd.Dataset.from_generator(gen_vec, output_types=tf.float32).map(mapper):
    print(type(x))

<class 'tensorflow.python.framework.ops.Tensor'>
<class 'tensorflow.python.framework.ops.EagerTensor'>
<class 'tensorflow.python.framework.ops.EagerTensor'>
<class 'tensorflow.python.framework.ops.EagerTensor'>
<class 'tensorflow.python.framework.ops.EagerTensor'>
<class 'tensorflow.python.framework.ops.EagerTensor'>


In [35]:
for x in tfd.Dataset.from_generator(gen_vec, output_types=tf.float32).take(2):
    print(x)

tf.Tensor([0.36576295 0.7977334  0.35978234], shape=(3,), dtype=float32)
tf.Tensor([0.77115643 0.8696778  0.64151216], shape=(3,), dtype=float32)


## Batching and Shuffling
Typically I want to shuffle the input dataset for each epoch and I want to access it in batches. `Dataset` supports both these use cases. It also has a `prefetch` method which will prefetch the next batch while the current batch is being processed. See use in processing images example.

`shuffle()` will work "best", i.e., with maximum randomization if the shuffle size is at least as big as the dataset size. Of course if the full dataset size does not fit in memory, use the biggest possible. Conceptually here is what happens. Lets say I specify shuffle size of 100. Then tf will create a queue of size 100 and fill it with elements from the dataset, lets say items with index 0 to 99. With the first iteration it will randomly pick an item from the queue, i.e., any item from 0 to 99, lets say item number 50. Then it will add item 100 to the queue. In the next iteration it will randomly select from this queue and add item 101 to the queue and so on.

It is important to call `shuffle()` before `batch()`. Otherwise it will shuffle the batches, but not items within the batch. In other words, each queue element will be a full batch, not individual instances from the dataset.

In [63]:
def counter():
    i = 0
    while i < 500:
        yield i
        i += 1

In [64]:
ds = tfd.Dataset.from_generator(counter, output_types=tf.int8)
for x in ds.take(5):
    print(x)

tf.Tensor(0, shape=(), dtype=int8)
tf.Tensor(1, shape=(), dtype=int8)
tf.Tensor(2, shape=(), dtype=int8)
tf.Tensor(3, shape=(), dtype=int8)
tf.Tensor(4, shape=(), dtype=int8)


With `shuffle` being called, each element is out of order.

In [49]:
for x in ds.shuffle(16).take(5):
    print(x)

tf.Tensor(3, shape=(), dtype=int8)
tf.Tensor(7, shape=(), dtype=int8)
tf.Tensor(9, shape=(), dtype=int8)
tf.Tensor(6, shape=(), dtype=int8)
tf.Tensor(12, shape=(), dtype=int8)


When batching, each element is a batch of the specified size.

In [50]:
for x in ds.batch(8).take(3):
    # Each x will be 8 elements long
    print(x)

tf.Tensor([0 1 2 3 4 5 6 7], shape=(8,), dtype=int8)
tf.Tensor([ 8  9 10 11 12 13 14 15], shape=(8,), dtype=int8)
tf.Tensor([16 17 18 19 20 21 22 23], shape=(8,), dtype=int8)


When calling `shuffle` before `batch`, each batch is shuffled. The elements in each batch are out of order.

In [51]:
for x in ds.shuffle(16).batch(8).take(3):
    print(x)

tf.Tensor([12  0 13  7  9 20  1 18], shape=(8,), dtype=int8)
tf.Tensor([ 4 24  6 10 11 21  8 19], shape=(8,), dtype=int8)
tf.Tensor([17 26 27 29 22 30 25 31], shape=(8,), dtype=int8)


When calling `shuffle` after `batch`, while the batches are out of order, internally each batch is in-order.

In [52]:
for x in ds.batch(8).shuffle(16).take(3):
    print(x)

tf.Tensor([24 25 26 27 28 29 30 31], shape=(8,), dtype=int8)
tf.Tensor([88 89 90 91 92 93 94 95], shape=(8,), dtype=int8)
tf.Tensor([0 1 2 3 4 5 6 7], shape=(8,), dtype=int8)


### Batch Sizes
With a dataset the size of 500 elements, I expect to see 500/8 = 62.5 batches of size 8, i.e., I'll see 63 batches, with the last batch having 4 elements. To have uniform batch sizes I can drop the last batch. The last batch will only be dropped if it has less number of elements. Otherwise I'll get all the batches.

In [65]:
batch_sizes = []
for x in ds.batch(8):
    batch_sizes.append(len(x))

In [68]:
print(len(batch_sizes))
for batch_size in batch_sizes:
    print(batch_size, end=" ")

63
8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 4 

In [69]:
batch_sizes = []
for x in ds.batch(8, drop_remainder=True):
    batch_sizes.append(len(x))

In [70]:
print(len(batch_sizes))
for batch_size in batch_sizes:
    print(batch_size, end=" ")

62
8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 

In [71]:
batch_sizes = []
for x in ds.batch(10, drop_remainder=True):
    batch_sizes.append(len(x))

In [72]:
len(batch_sizes)

50

## Repeating
Typically `keras` built-in models have a single method `fit` which will take in a dataset that yields a tuple of (X, y), the number of epochs to train for, and the number of "steps" per epoch. In such cases I want the dataset to loop endlessly and let `keras` take care of iterating the correct number of times. The number of steps here would typically be the number of batches. In psedocode, insteading of saying -
```
for epoch in range(num_epochs):
    for batch in dataset:
        model.train(batch)
```
I say the following:
```
model.train(dataset, epochs=num_epochs, steps_per_epoch=num_batches)
```

Again, it is important to call `shuffle` before `repeat`.

In [74]:
ds = tfd.Dataset.from_generator(gen_vec, output_types=tf.int8)
ds = ds.shuffle(16)
ds = ds.repeat()  # --> Important to call after shuffle
ds = ds.batch(8)