# Introduction to Scalable DL: Day 3, Tutorial 2

**Content creators**: Jan Ebert

**Content reviewers / testers**: Stefan Kesselheim, Alexandre Strube

**Content supervisors** : Stefan Kesselheim

In this tutorial, we will introduce some options to fix file system performance and take a more in-depth look at `tf.data` pipelines and their performance in various settings.

We have learned that some file systems – such as the one the Jülich supercomputers use – are bad at handling many small files and suffer in performance accordingly.\
Assuming our dataset has many small files and is affected by this degradation, we have multiple solutions available to tackle the performance problem. The main factor is whether our dataset fits into working memory (RAM) or not. We will try out two solutions in this tutorial: one for smaller datasets and one for larger datasets.

To get a feel for how some of the functions of `tf.data` work, you will first inspect a simple, synthetic dataset and try out some transformations on it.\
After that, you will work on many files stored on disk and try to optimize the pipeline's throughput performance using various functions and flags available to us.

## Setup

In [1]:
import itertools
import os
from pathlib import Path
import time

from IPython.display import display, HTML
from PIL import Image
# Disable TensorFlow GPU usage so this works on login nodes.
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import tensorflow as tf

from tiny_imagenet_tfrecords import from_tfrecords
from dataset_wrapper import TFDataset


def display_image(image_data):
    display(Image.fromarray(image_data.numpy()))


def decode_jpeg(image_string):
    return tf.io.decode_jpeg(image_string, channels=3)


class TFDatasetProfilingResults:
    """Collect profiling results and nicely format them."""

    def __init__(self):
        self.results = []

    @staticmethod
    def _escape_html(text):
        text = text.replace('&', '&amp;')
        text = text.replace('<', '&lt;')
        text = text.replace('>', '&gt;')
        text = text.replace('"', '&quot;')
        text = text.replace("'", '&#39;')
        return text

    def add_result(self, dataset_set, dataset, duration):
        """Add a profiling result with `duration` for the given
        `dataset` on `dataset_set`.
        """
        self.results.append((
            dataset_set,
            dataset,
            duration,
            dataset.num_iterations,
        ))
        
    def to_html_table(self):
        """Return the collected results formatted as a HTML table."""
        html_results = ['<table><thead><tr>'
                '<th style="text-align: left">Dataset</th>'
                '<th style="text-align: left">Pipeline</th>'
                '<th>Duration [sec]</th>'
                '<th>Iteration</th>'
                '</tr></thead><tbody>']

        for (dataset_set, dataset, duration, num_iters) in self.results:
            pipeline = ',<br>'.join(map(self._escape_html, dataset.pipeline))
            html_results.append(
                f'<tr>'
                f'<td style="text-align: left;">{self._escape_html(dataset_set)}</td>'
                f'<td style="text-align: left;">{pipeline}</td>'
                f'<td>{duration:.3f}</td>'
                f'<td>{num_iters}</td>'
                f'</tr>',
            )

        html_results.append('</tbody></table>')
        return ''.join(html_results)
    
    def averages_to_html_table(self, num_warmup_iterations):
        """Return averaged results formatted as a HTML table.
        Results are only considered after the given number of
        warmup iterations `num_warmup_iterations`.
        """
        html_results = ['<table><thead><tr>'
                        '<th style="text-align: left">Dataset</th>'
                        '<th style="text-align: left">Pipeline</th>'
                        '<th>Average Duration [sec]</th>'
                        '<th>Samples</th>'
                        '</tr></thead><tbody>']

        pipelines = {}
        for (dataset_set, dataset, duration, num_iters) in self.results:
            if num_iters <= num_warmup_iterations:
                continue

            key = (dataset_set, tuple(dataset.pipeline))
            durations = pipelines.setdefault(key, [])
            durations.append(duration)

        for ((dataset_set, pipeline), durations) in pipelines.items():
            num_samples = len(durations)
            avg_duration = sum(durations) / num_samples
            pipeline = ',<br>'.join(map(self._escape_html, pipeline))
            html_results.append(
                f'<tr>'
                f'<td style="text-align: left;">{self._escape_html(dataset_set)}</td>'
                f'<td style="text-align: left;">{pipeline}</td>'
                f'<td>{avg_duration:.3f}</td>'
                f'<td>{num_samples}</td>'
                f'</tr>',
            )

        html_results.append('</tbody></table>')
        if len(html_results) <= 2:
            html_results.append('<i>No run had enough iterations!</i>')
        return ''.join(html_results)
    
    def to_csv(self, separator=';'):
        """Return the collected results in CSV format with the
        given `separator`.
        """
        csv_results = [
            separator.join(['dataset', 'pipeline', 'duration', 'num_iterations']),
        ]

        for (dataset_set, dataset, duration, num_iters) in self.results:
            pipeline = ','.join(dataset.pipeline)
            csv_results.append(separator.join(map(str,
                (dataset_set, pipeline, duration, num_iters),
            )))

        return '\n'.join(csv_results)

## Handling Datasets with Many Files

### This section of the tutorial is very Linux-specific and will not work on Windows machines!

Whenever we have to work with many individual files on the Jülich supercomputers, we are likely to encounter runtime performance degradations. For data-intensive work like deep learning, this loss in performance is usually unacceptable and we need to adapt our dataset to work with our file system.

Usually, this entails packing the dataset into a single file with whatever means necessary. However, doing this means that we would also have to change our program's code to read data from this new, single file instead. Ideally, we would like to keep our code the same while still avoiding the file system issues. We'll take a look at two solutions: the first uses the temporary [_shared memory_ drive `/dev/shm`](https://en.wikipedia.org/wiki/Shared_memory#Support_on_Unix-like_systems) to make in-memory data available to the whole node. The other solution uses [SquashFS](https://en.wikipedia.org/wiki/SquashFS) combined with a virtual file system (a [filesystem in userspace (FUSE)](https://en.wikipedia.org/wiki/Filesystem_in_Userspace) via [`squashfuse`](https://github.com/vasi/squashfuse)).

Let's assume we have two datasets that consist of many files.
1. One of them (_dataset A_) is relatively small (< 500 GB) and fits into the working memory (RAM) of each compute node (together with our program state).
2. The other dataset (_dataset B_) is larger (> 500 GB) and does not fit into RAM.

Depending on whether our dataset fits in RAM, we can adapt our handling accordingly. For the smaller dataset, this will benefit us in two ways: more speed and simplicity.

Let's first take a look at how to handle _dataset A_, which fits into memory. This uses the temporary file system at `/dev/shm`.

### Moving to an In-memory Dataset

If our data fits into memory, the simplest option on a Linux system (like the Jülich supercomputers) is to simply copy it into `/dev/shm` and read from there. In practical terms, we move our data from disk into RAM. We need to be careful to copy the data _exactly once_ on each node, because **all processes on the same node** can read from and write to the same `/dev/shm`, but processes on different nodes see a different `/dev/shm` – `/dev/shm` is node-local storage. If we were to copy the data multiple times in parallel, it will likely get corrupted; if we only copied the data once on the launch node, any other node would not be able to find the data because it doesn't exist on that node.

Another thing to worry about with this approach: `/dev/shm` is a temporary file system and changes on it will not be reflected later nor accessible from other nodes. If you plan to write data, it is easiest to do it in a location other than `/dev/shm`.

It may be slow to copy a bunch of files from the file system since you run into the same problem as always – accessing individual files causes performance problems. Even though we only do this once at the start of the job, the time may pile up. To alleviate this slightly, an easy option is to create a `tar` archive of the data, which is then extracted right from the single archive file into `/dev/shm`.

A template that you can adapt to your own jobs can be found in `shm.sbatch`. In there, the data is automatically `tar`ed once and then `untar`ed into `/dev/shm`.

Let's try out the technique on the Tiny-ImageNet-200 dataset. In Python (and for future usage in this notebook), we can do the following, assuming we are not running multiple Python processes in parallel:

In [None]:
import getpass
import os
from pathlib import Path
import shutil
import tarfile

data_path = Path('/p/project/training2306/datasets/tiny-imagenet-200').expanduser()
tar_path = data_path.with_suffix('.tar')
shm_path = Path(f'/dev/shm/{getpass.getuser()}/{data_path.name}')

# Create a tar archive of the data if it doesn't exist.
if not tar_path.exists():
    with tarfile.open(tar_path, 'w') as tar:
        tar.add(data_path, arcname=data_path.name)

# Clean up any remains of previous jobs.
shutil.rmtree(shm_path, ignore_errors=True)
shm_path.mkdir(parents=True)

# Extract the tar file into `shm_path`.
with tarfile.open(tar_path, 'r') as tar:
    tar.extractall(shm_path.parent)
# Make our data private so future jobs of other users will not be able
# to access possibly sensitive data in this shared location.
shm_path.chmod(0o700)

# Read data from `shm_path`.

### Moving to a FUSE-mounted SquashFS

For data that does not fit into memory, we can pack all data into a SquashFS. This SquashFS is a compressed, read-only archive of the data. That means if our data changes, we have to re-create the SquashFS every time. When your data is very dynamic, this may become annoying.

After creating the SquashFS, we are left with a single file that contains all our data; reading from this would be different than reading from the file system, meaning we'd have to re-write our data reading code. Thankfully, the `squashfuse_ll` binary allows us to access the SquashFS contents like a standard directory. `squashfuse_ll` will _mount_ the SquashFS to a different path and make it look like any other file system path. The FUSE mount is like a link to the SquashFS, only that it presents the data transparently as a standard file system. Once again, we avoid changing the code that is responsible for reading data, but retain the speed benefits of a single-file dataset. One thing we should always take care of, though, is cleaning up the mount directory afterwards, so we do not pollute resources. This is achieved by unconditionally calling `fusermount3 -u <mount_dir>` when the script exits.

The same caveats as for the in-memory usage of `/dev/shm` apply: first off, make sure that we only create, mount, and un-mount once per node. Also, mount locations are limited: on Jülich machines, you have to use directories in `/dev/shm` or `/tmp`. Finally, it's important to mention again that SquashFS files are _read-only_, so we cannot write to the mount path.

Even if we mount the data in `/dev/shm`, remember that the mount is like a link – the data does not actually reside in RAM, unlike in the previous section.

To summarize the basic steps for using a SquashFS are:
1. Create the SquashFS using `mksquash`.
2. Mount the SquashFS to a directory in `/dev/shm` using `squashfuse_ll`.
3. Do your work...
4. Unmount the SquashFS mount directory using `fusermount -u`.

For a template that you can adapt to your own jobs, see `squashfs.sbatch`. Note that the template is much more complex than what is described here due to having to handle creation and clean-up on multiple nodes in parallel while also working around Slurm limitations. We hope that the extensive documentation in the template can clear up what is happening.

Let's create and mount a SquashFS of Tiny-ImageNet-200 right here in Python, assuming we have only one Python process:

In [None]:
import atexit
import getpass
import os
from pathlib import Path
import shutil
import subprocess

data_path = Path('/p/project/training2306/datasets/tiny-imagenet-200').expanduser()
sqsh_path = Path(f'{data_path}.sqsh')
mount_path = Path(f'/dev/shm/{getpass.getuser()}/sqsh/{data_path.name}')

# Create a SquashFS of the data if it doesn't exist.
if not sqsh_path.exists():
    subprocess.run(['mksquashfs', data_path, sqsh_path], check=True)

def clean_up_squashfuse(mount_path):
    if mount_path.is_dir():
        subprocess.run(['fusermount3', '-u', mount_path])
    shutil.rmtree(mount_path, ignore_errors=True)

# Clean up any remains of previous jobs.
clean_up_squashfuse(mount_path)
# Create our mount directory.
mount_path.mkdir(mode=0o700, parents=True, exist_ok=True)

# Unmount the SquashFS when Python exits.
atexit.register(clean_up_squashfuse, mount_path)
# Mount the SquashFS at our mount path.
subprocess.run(['squashfuse_ll', sqsh_path, mount_path], check=True);

# Read data from `mount_path`.

### What are the Gains?

Here we will compare how the methods of
1. loading from the standard file system,
2. loading data directly inside `/dev/shm`, and
3. loading from a FUSE-mounted SquashFS

stack up against each other. Remember that the only thing we change is the path we read the data from – everything else is exactly the same across the three methods.

What do you expect will be fastest on the Jülich machines? What will be slowest?

In [None]:
def profile_dataset(dataset_path):
    assert dataset_path.exists(), 'dataset directory does not exist'
    print('Reading from', dataset_path)

    # Start a timer so we can measure starting from dataset creation.
    start_time = time.perf_counter()

    # Create a generator containing all JPEG images in the dataset.
    files = dataset_path.glob('**/*.JPEG')
    files = list(files)

    iteration_start_time = time.perf_counter()

    # Iterate through the dataset; just access every file once.
    for filename in files:
        # We try to avoid file system caching with this call.
        filename.stat()
        with filename.open('rb') as f:
            f.read()

    end_time = time.perf_counter()
    duration = end_time - start_time
    iteration_duration = end_time - iteration_start_time

    print('Total duration:', duration, 'seconds')
    print('Iteration duration:', iteration_duration, 'seconds')

In [None]:
profile_dataset(data_path)

In [None]:
profile_dataset(shm_path)

In [None]:
assert mount_path.exists(), 'dataset directory does not exist'
assert (mount_path / 'val').exists(), 'dataset directory is not mounted'

profile_dataset(mount_path)

### Summary

On the Jülich machines, the ranking should look like this:
1. Data directly in `/dev/shm`
2. SquashFS + FUSE
3. Standard file system.

Between these options, you should always prefer `/dev/shm` since it will be super fast. However, due to the caveat of the dataset having to fit into memory together with the program state, `/dev/shm` cannot always be used. In that case and if your data does not change very often, using a SquashFS will usually yield better performance compared to the standard file system.

## Getting to Know `tf.data`

### From here on out, the tutorial is Windows-compatible again!

You will now be able to play around with some `tf.data` pipelines in various settings. First use the provided `synthetic_data` generator or write your own. Get a feel for how the `shard` and `batch` methods work. Can you guess what is the difference between `dataset.shard(2, 0).batch(2)` and  `dataset.batch(2).shard(2, 0)`?

In [1]:
def synthetic_data():
    for i in range(16):
        yield i

dataset = tf.data.Dataset.from_generator(
    synthetic_data,
    output_signature=tf.TensorSpec(shape=(), dtype=tf.int32),
)

sharded = dataset.shard(2, 0)
print('Sharded:', [d.numpy() for d in sharded])

batched = dataset.batch(2)
print('Batched:\n[' + ',\n '.join([str(d.numpy()) for d in batched]) + ']')

Sharded: [0, 2, 4, 6, 8, 10, 12, 14]
Batched:
[[0 1],
 [2 3],
 [4 5],
 [6 7],
 [8 9],
 [10 11],
 [12 13],
 [14 15]]


### Cache and Prefetch
Now, the dataset will emulate a short loading time and print out which element is generated when. Please try to understand the code examples and the output they produce. Afterwards, you can create similar examples for [other methods in the module](https://www.tensorflow.org/api_docs/python/tf/data/Dataset).\
There are some task suggestions at the bottom of this subsection.

In [1]:
def sleepy_synthetic_data():
    for i in range(6):
        time.sleep(0.5)
        print('Yielding element', i)
        yield i

sleepy_dataset = tf.data.Dataset.from_generator(
    sleepy_synthetic_data,
    output_signature=tf.TensorSpec(shape=(), dtype=tf.int32),
)
cached = sleepy_dataset.cache()
print('First read of cached dataset')
for d in cached:
    print('Getting element', d)

First read of cached dataset
Yielding element 0
Getting element tf.Tensor(0, shape=(), dtype=int32)
Yielding element 1
Getting element tf.Tensor(1, shape=(), dtype=int32)
Yielding element 2
Getting element tf.Tensor(2, shape=(), dtype=int32)
Yielding element 3
Getting element tf.Tensor(3, shape=(), dtype=int32)
Yielding element 4
Getting element tf.Tensor(4, shape=(), dtype=int32)
Yielding element 5
Getting element tf.Tensor(5, shape=(), dtype=int32)


In [1]:
print('Second read of cached dataset')
for d in cached:
    print('Getting element', d)

Second read of cached dataset
Getting element tf.Tensor(0, shape=(), dtype=int32)
Getting element tf.Tensor(1, shape=(), dtype=int32)
Getting element tf.Tensor(2, shape=(), dtype=int32)
Getting element tf.Tensor(3, shape=(), dtype=int32)
Getting element tf.Tensor(4, shape=(), dtype=int32)
Getting element tf.Tensor(5, shape=(), dtype=int32)


In [1]:
sharded = sleepy_dataset.shard(2, 0)
for d in sharded:
    print('Getting element', d)

Yielding element 0
Getting element tf.Tensor(0, shape=(), dtype=int32)
Yielding element 1
Yielding element 2
Getting element tf.Tensor(2, shape=(), dtype=int32)
Yielding element 3
Yielding element 4
Getting element tf.Tensor(4, shape=(), dtype=int32)
Yielding element 5


In [1]:
shuffled = sleepy_dataset.shuffle(3)
for d in shuffled:
    print('Getting element', d)

print('\nReshuffled upon next iteration.')
for d in shuffled:
    print('Getting element', d)

Yielding element 0
Yielding element 1
Yielding element 2
Getting element tf.Tensor(1, shape=(), dtype=int32)
Yielding element 3
Getting element tf.Tensor(3, shape=(), dtype=int32)
Yielding element 4
Getting element tf.Tensor(4, shape=(), dtype=int32)
Yielding element 5
Getting element tf.Tensor(5, shape=(), dtype=int32)
Getting element tf.Tensor(2, shape=(), dtype=int32)
Getting element tf.Tensor(0, shape=(), dtype=int32)

Reshuffled upon next iteration.
Yielding element 0
Yielding element 1
Yielding element 2
Getting element tf.Tensor(2, shape=(), dtype=int32)
Yielding element 3
Getting element tf.Tensor(3, shape=(), dtype=int32)
Yielding element 4
Getting element tf.Tensor(4, shape=(), dtype=int32)
Yielding element 5
Getting element tf.Tensor(0, shape=(), dtype=int32)
Getting element tf.Tensor(5, shape=(), dtype=int32)
Getting element tf.Tensor(1, shape=(), dtype=int32)


###  Tasks

1. The `prefetch` method tries to fill a buffer with future data as soon as the first element has been requested. Write an example that uses the `prefetch` method. It expects a single argument: the buffer size for how many elements to process in advance. Do you see the behavior you expected? Why or why not? Try to insert a `time.sleep(2)` call in the `for`-loop to simulate an expensive neural network training step.
1. What happens when you `shuffle` and then `cache` a dataset? You can use `repeat(<number>)` to iterate through the dataset more than once.

### Solutions

In [1]:
prefetched = sleepy_dataset.prefetch(2)
for d in prefetched:
    # Prefetching cannot have an effect if we process each data point
    # faster than the data pipeline is able to prefetch the next element.
    # So if our calculations on the data are too fast, prefetching won't
    # give us any improvement.
    time.sleep(2)
    print('Getting element', d)

Yielding element 0
Yielding element 1
Yielding element 2
Getting element tf.Tensor(0, shape=(), dtype=int32)
Yielding element 3
Getting element tf.Tensor(1, shape=(), dtype=int32)
Yielding element 4
Getting element tf.Tensor(2, shape=(), dtype=int32)
Yielding element 5
Getting element tf.Tensor(3, shape=(), dtype=int32)
Getting element tf.Tensor(4, shape=(), dtype=int32)
Getting element tf.Tensor(5, shape=(), dtype=int32)


In [1]:
shuffled = sleepy_dataset.shuffle(6)
shuffled_cached = shuffled.cache()
for d in shuffled_cached.repeat(2):
    print('Getting element', d)

Yielding element 0
Yielding element 1
Yielding element 2
Yielding element 3
Yielding element 4
Yielding element 5
Getting element tf.Tensor(3, shape=(), dtype=int32)
Getting element tf.Tensor(0, shape=(), dtype=int32)
Getting element tf.Tensor(4, shape=(), dtype=int32)
Getting element tf.Tensor(5, shape=(), dtype=int32)
Getting element tf.Tensor(2, shape=(), dtype=int32)
Getting element tf.Tensor(1, shape=(), dtype=int32)
Getting element tf.Tensor(3, shape=(), dtype=int32)
Getting element tf.Tensor(0, shape=(), dtype=int32)
Getting element tf.Tensor(4, shape=(), dtype=int32)
Getting element tf.Tensor(5, shape=(), dtype=int32)
Getting element tf.Tensor(2, shape=(), dtype=int32)
Getting element tf.Tensor(1, shape=(), dtype=int32)


## Accessing the File System

To get some more perspective on different functions and what reading from individual files entails, we will now work an a subset of the Tiny-ImageNet-200 dataset. In total, there are 10&#8239;000 images.

Below, you will see we created a list that contains file paths to all images ending in ".JPEG" in a certain directory tree.
You'll also notice that we used a simple for-loop to read, convert, and display the data in our "dataset". However, this way, we cannot take advantage of the advanced functionality of `tf.data` pipelines. You are going to change this!

In [None]:
# This dataset contains only the validation set from Tiny ImageNet,
# 10,000 images in total.
dataset_path = Path('/p/project/training2306/datasets/tiny-tiny-imagenet').expanduser()
assert dataset_path.exists(), 'dataset directory does not exist'

# Create a generator containing all JPEG images in the dataset.
files = dataset_path.glob('**/*.JPEG')
# Convert from `Path` objects to strings
files = list(map(str, files))

boring_dataset = files

for (i, path) in enumerate(boring_dataset):
    print(path)

    file_content = tf.io.read_file(path)
    image = decode_jpeg(file_content)

    display_image(image)

    if i >= 5:
        break

# Instead of putting the operations in the above for-loop,
# we could also have written something like this:

# boring_dataset = map(tf.io.read_file, boring_dataset)
# boring_dataset = map(decode_jpeg, boring_dataset)
#
# for image in itertools.islice(boring_dataset, 5):
#     display_image(image)

In [None]:
dataset = TFDataset.from_tensor_slices(files)

# Task: The dataset already contains the file paths;
#       read the file contents here by mapping `tf.io.read_file`
#       onto the dataset.
dataset = dataset.map(tf.io.read_file)

# Task: Now that you have the file contents, convert them
#       to raw image data by mapping the function `decode_jpeg`.
dataset = dataset.map(decode_jpeg)

In [None]:
# When you are done with the tasks, this should show you
# 5 images like before!
# Notice how slick this code looks compared to before? You'll see
# what other advantages `tf.data.Dataset`s have in the rest of
# this exercise.
for image in dataset.take(5):
    display_image(image)

### Tasks

1. Convert the iterative method we used to read and convert the data to use `tf.data.Dataset` methods. Similar to the Python built-in `map` function, the `map` method of `tf.data.Dataset`s takes a function to call on each element in the dataset, returning the element for the resulting dataset.

## Playground

### Pipeline Profiling on Data in Individual Files

To get some more perspective on different functions and what reading from individual files entails, you can now try to optimize a pre-built pipeline on the data you just wrote a pipeline yourself for.

Change the below pipeline and run iterations to register the experiments. You can later display them in a table. Remember that we are reading from 10&#8239;000 different files, so try to see and understand how and why different functions in the pipeline change the runtime. We suggest starting by comparing the total runtime with `use_tensor_slices = False` vs. `use_tensor_slices = True` (see below). Afterwards, check out what happens below the "Your pipeline starts here" comment.

In [None]:
# Setup

# Create a result collector.
# It has some methods to format your results nicely
# in tables.
# Execute again to reset your results.
fs_profiling_results = TFDatasetProfilingResults()

In [None]:
# This dataset contains only the validation set from Tiny ImageNet,
# 10,000 images in total.
dataset_path = Path('/p/project/training2306/datasets/tiny-tiny-imagenet').expanduser()
assert dataset_path.exists(), 'dataset directory does not exist'

# Create a generator containing all JPEG images in the dataset.
files = dataset_path.glob('**/*.JPEG')
# Convert from `Path` objects to strings
files = map(str, files)

# Try changing this flag and see how the runtime performance changes.
use_tensor_slices = False

# Start a timer so we can measure starting from dataset creation.
start_time = time.perf_counter()

if use_tensor_slices:
    dataset = TFDataset.from_tensor_slices(list(files))
else:
    dataset = TFDataset.from_generator(
        lambda: files,
        output_signature=tf.TensorSpec(shape=(), dtype=tf.string),
    )

# The cardinality is the _known_ length of the dataset.
if dataset.cardinality() == tf.data.UNKNOWN_CARDINALITY:
    print("TensorFlow does not know the dataset's cardinality.")
elif dataset.cardinality() == tf.data.INFINITE_CARDINALITY:
    print('TensorFlow knows the dataset has infinite cardinality.')
else:
    print(f"TensorFlow knows the dataset's cardinality is {dataset.cardinality()}.")

# Your pipeline starts here
# =========================

dataset = dataset.map(
    tf.io.read_file,
    
    # TensorFlow can automatically parallelize some functions.
    # To let TensorFlow decide based on the available CPU,
    # use the value `tf.data.AUTOTUNE`.
    # num_parallel_calls=4,

    # When the function is deterministic, it will always give us
    # the results in the same order. Being able to forgo this
    # guarantee could increase our performance (only relevant if
    # `num_parallel_calls` is set).
    # deterministic=False,
)
dataset = dataset.map(
    lambda path: tf.io.decode_jpeg(path, channels=3),
    # num_parallel_calls=4,
    # deterministic=False,
)

# dataset = dataset.shard(8, 0)
# dataset = dataset.shuffle(dataset.cardinality() if dataset.cardinality() > 0 else 1024)
# dataset = dataset.prefetch(2048)

iteration_start_time = time.perf_counter()

# Iterate through the dataset
for d in dataset:
    pass

end_time = time.perf_counter()
duration = end_time - start_time
iteration_duration = end_time - iteration_start_time
fs_profiling_results.add_result('tiny-tiny-imagenet', dataset, iteration_duration)

print('\nTotal duration:', duration, 'seconds')
print('Iteration duration:', iteration_duration, 'seconds')
print('Pipeline:\n[' + ',\n '.join(dataset.pipeline) + ']')

#### Results

##### Results table

In [None]:
display(HTML(fs_profiling_results.to_html_table()))

##### Averages

In [None]:
display(HTML(fs_profiling_results.averages_to_html_table(0)))

In [None]:
num_warmup_iterations = 1

display(HTML(
    profiling_results.averages_to_html_table(num_warmup_iterations),
))

##### Format results as CSV

In case you want to store your results in a standard format for text files.

In [None]:
print(fs_profiling_results.to_csv())