## Optimize tensorflow pipeline performance with prefetch and caching

In [1]:
import tensorflow as tf
import time

2023-08-16 09:35:21.784236: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-08-16 09:35:22.873595: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-08-16 09:35:22.881181: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
tf.__version__

'2.13.0'

## Prefetch

In [3]:
class FileDataset(tf.data.Dataset):
    def read_file_in_batches(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls.read_file_in_batches,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

In [4]:
def benchmark(dataset, num_epochs=2):
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)

In [8]:
%%timeit
benchmark(FileDataset())

550 ms ± 15.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [7]:
%%timeit
benchmark(FileDataset().prefetch(1))

542 ms ± 20.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
%%timeit
benchmark(FileDataset().prefetch(tf.data.AUTOTUNE))

520 ms ± 32.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


**As you can notice above, using prefetch improves the performance from 550 ms to 542 and 520 ms**

## Cache

In [11]:
dataset = tf.data.Dataset.range(10)

In [14]:
dataset = dataset.map(lambda x : x**2)

<tensorflow.python.data.ops.dataset_ops._NumpyIterator at 0x7f3bd2407340>

In [15]:
list(dataset.as_numpy_iterator())

[0, 1, 16, 81, 256, 625, 1296, 2401, 4096, 6561]

In [18]:
dataset =  dataset.cache()
# dataset =  dataset.cache().prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
# Subsequent iterations read from the cache.
list(dataset.as_numpy_iterator())


[0, 1, 16, 81, 256, 625, 1296, 2401, 4096, 6561]

In [19]:
def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s
    

In [20]:
%%timeit -n1 -r1

benchmark(FileDataset().map(mapped_function),5)

1.27 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [21]:
%%timeit -n1 -r1

benchmark(FileDataset().map(mapped_function).cache(),5)

504 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


**As you can notice above, using Cache improves the performance from 1.27 s to 504 ms**

Further reading https://www.tensorflow.org/guide/data_performance#caching