##### Copyright 2019 The TensorFlow Authors.

In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Лучшая производительность с API tf.data

<table class="tfo-notebook-buttons" align="left">
  <td><a target="_blank" href="https://www.tensorflow.org/guide/data_performance"><img src="https://www.tensorflow.org/images/tf_logo_32px.png"> Посмотреть на TensorFlow.org</a></td>
  <td><a target="_blank" href="https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/guide/data_performance.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png"> Запустить в Google Colab</a></td>
  <td><a target="_blank" href="https://github.com/tensorflow/docs/blob/master/site/en/guide/data_performance.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png"> Посмотреть источник на GitHub</a></td>
  <td><a href="https://storage.googleapis.com/tensorflow_docs/docs/site/en/guide/data_performance.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png"> Скачать блокнот</a></td>
</table>

## TRANSLATION WITH TYPO

GPUs and TPUs can radically reduce the time required to execute a single training step. Achieving peak performance requires an efficient input pipeline that delivers data for the next step before the current step has finished. The `tf.data` API helps to build flexible and efficient input pipelines. This document demonstrates how to use the `tf.data` API to build highly performant TensorFlow input pipelines.

Before you continue, read the "[Build TensorFlow input pipelines](./data.ipynb)" guide, to learn how to use the `tf.data` API.

## Ресурсы

- [Построить входные конвейеры TensorFlow](./data.ipynb)
- API `tf.data.Dataset`
- [Проанализируйте производительность `tf.data` с помощью TF Profiler](./data_performance_analysis.md)

## Настроить

In [0]:
import tensorflow as tf

import time

В этом руководстве вы будете перебирать набор данных и измерять производительность. Создание воспроизводимых эталонов производительности может быть затруднено, на него влияют различные факторы:

- текущая загрузка процессора,
- сетевой трафик,
- сложные механизмы, такие как кэш и т. д.

Следовательно, чтобы обеспечить воспроизводимый тест, создайте искусственный пример.

### Набор данных

Определите класс, унаследованный от `tf.data.Dataset` именем `ArtificialDataset` . Этот набор данных:

- генерирует образцы `num_samples` (по умолчанию 3)
- спит какое-то время перед первым элементом, имитирующим открытие файла
- спит в течение некоторого времени перед созданием каждого элемента для имитации чтения данных из файла

In [0]:
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)
        
        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)
            
            yield (sample_idx,)
    
    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=tf.dtypes.int64,
            output_shapes=(1,),
            args=(num_samples,)
        )

Этот набор данных аналогичен `tf.data.Dataset.range` , добавляя фиксированную задержку в начале и между каждой выборкой.

### Тренировочная петля

Напишите фиктивный обучающий цикл, который измеряет, сколько времени требуется для итерации по набору данных. Время тренировки моделируется.

In [0]:
def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    tf.print("Execution time:", time.perf_counter() - start_time)

## Оптимизировать производительность

Чтобы продемонстрировать, как можно оптимизировать производительность, вы улучшите производительность `ArtificialDataset` .

### Наивный подход

Начните с простого конвейера, не используя трюков, итерируя по набору данных как есть.

In [0]:
benchmark(ArtificialDataset())

Под капотом вот как тратится ваше время исполнения:

![наивный](https://www.tensorflow.org/guide/images/data_performance/naive.svg)

Вы можете видеть, что выполнение этапа обучения включает в себя:

- открытие файла, если он еще не открыт,
- извлечение записи данных из файла,
- используя данные для обучения.

Однако в наивной синхронной реализации, как здесь, пока ваш конвейер извлекает данные, ваша модель бездействует. И наоборот, пока ваша модель тренируется, входной конвейер бездействует. Таким образом, время шага обучения - это сумма времени всех, времени открытия, чтения и обучения.

Следующие разделы основаны на этом входном конвейере, иллюстрируя лучшие практики для разработки производительных входных конвейеров TensorFlow.

### Предзагрузка

Предварительная выборка перекрывает предварительную обработку и выполнение модели этапа обучения. Пока модель выполняет этап обучения `s` , входной конвейер считывает данные для этапа `s+1` . Это сокращает время шага до максимума (в отличие от суммы) обучения и время, необходимое для извлечения данных.

The `tf.data` API provides the `tf.data.Dataset.prefetch` transformation. It can be used to decouple the time when data is produced from the time when data is consumed. In particular, the transformation uses a background thread and an internal buffer to prefetch elements from the input dataset ahead of the time they are requested. The number of elements to prefetch should be equal to (or possibly greater than) the number of batches consumed by a single training step. You could either manually tune this value, or set it to `tf.data.experimental.AUTOTUNE` which will prompt the `tf.data` runtime to tune the value dynamically at runtime.

Обратите внимание, что преобразование предварительной выборки дает преимущества каждый раз, когда есть возможность совмещать работу «производителя» с работой «потребителя».

In [0]:
benchmark(
    ArtificialDataset()
    .prefetch(tf.data.experimental.AUTOTUNE)
)

![опережающая выборка](https://www.tensorflow.org/guide/images/data_performance/prefetched.svg)

На этот раз вы можете видеть, что во время этапа обучения для выборки 0 входной конвейер считывает данные для выборки 1 и так далее.

### Распараллеливание извлечения данных

В реальных условиях входные данные могут храниться удаленно (например, GCS или HDFS). Конвейер набора данных, который хорошо работает при локальном чтении данных, может стать узким местом при вводе-выводе при удаленном чтении данных из-за следующих различий между локальным и удаленным хранилищем:

- **Время до первого байта:** чтение первого байта файла из удаленного хранилища может занять на несколько порядков больше времени, чем из локального хранилища.
- Пропускная способность чтения **:** хотя удаленное хранилище обычно предлагает большую совокупную пропускную способность, чтение одного файла может использовать только небольшую часть этой пропускной способности.

Кроме того, после загрузки необработанных байтов в память может также потребоваться десериализация и / или дешифрование данных (например, [protobuf](https://developers.google.com/protocol-buffers/) ), что требует дополнительных вычислений. Эти издержки присутствуют независимо от того, хранятся ли данные локально или удаленно, но могут быть хуже в удаленном случае, если данные не были предварительно эффективно извлечены.

To mitigate the impact of the various data extraction overheads, the `tf.data.Dataset.interleave` transformation can be used to parallelize the data loading step, interleaving the contents of other datasets (such as data file readers). The number of datasets to overlap can be specified by the `cycle_length` argument, while the level of parallelism can be specified by the `num_parallel_calls` argument. Similar to the `prefetch` transformation, the `interleave` transformation supports `tf.data.experimental.AUTOTUNE` which will delegate the decision about what level of parallelism to use to the `tf.data` runtime.

#### Последовательное чередование

The default arguments of the `tf.data.Dataset.interleave` transformation make it interleave single samples from two datasets sequentially.

In [0]:
benchmark(
    tf.data.Dataset.range(2)
    .interleave(ArtificialDataset)
)

![Последовательное чередование](https://www.tensorflow.org/guide/images/data_performance/sequential_interleave.svg)

Этот график позволяет продемонстрировать поведение преобразования `interleave` , выборочно выбирая выборки из двух доступных наборов данных. Тем не менее, никакого улучшения производительности здесь не происходит.

#### Параллельное чередование

Теперь используйте аргумент `num_parallel_calls` преобразования `interleave` . Это загружает несколько наборов данных параллельно, сокращая время ожидания открытия файлов.

In [0]:
benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        ArtificialDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
)

![Параллельное чередование](https://www.tensorflow.org/guide/images/data_performance/parallel_interleave.svg)

На этот раз чтение двух наборов данных распараллелено, что сокращает общее время обработки данных.

### Распараллеливание преобразования данных

При подготовке данных элементы ввода могут нуждаться в предварительной обработке. Для этого API `tf.data` предлагает преобразование `tf.data.Dataset.map` , которое применяет пользовательскую функцию к каждому элементу входного набора данных. Поскольку входные элементы не зависят друг от друга, предварительная обработка может быть распараллелена на нескольких ядрах ЦП. Чтобы сделать это возможным, подобно преобразованиям `prefetch` и `interleave` преобразование `map` предоставляет аргумент `num_parallel_calls` для указания уровня параллелизма.

Choosing the best value for the `num_parallel_calls` argument depends on your hardware, characteristics of your training data (such as its size and shape), the cost of your map function, and what other processing is happening on the CPU at the same time. A simple heuristic is to use the number of available CPU cores. However, as for the `prefetch` and `interleave` transformation, the `map` transformation supports `tf.data.experimental.AUTOTUNE` which will delegate the decision about what level of parallelism to use to the `tf.data` runtime.

In [0]:
def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

#### Последовательное отображение

Начните с использования преобразования `map` без параллелизма в качестве базового примера.

In [0]:
benchmark(
    ArtificialDataset()
    .map(mapped_function)
)

![Последовательное отображение](https://www.tensorflow.org/guide/images/data_performance/sequential_map.svg)

Что касается [наивного подхода](#The-naive-approach) , то здесь время, затраченное на открытие, чтение, предварительную обработку (отображение) и этапы обучения, суммируется за одну итерацию.

#### Параллельное отображение

Теперь используйте ту же функцию предварительной обработки, но примените ее параллельно на нескольких выборках.

In [0]:
benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
)

![Параллельное отображение](https://www.tensorflow.org/guide/images/data_performance/parallel_map.svg)

Теперь на графике видно, что этапы предварительной обработки перекрываются, сокращая общее время одной итерации.

### Кэширование

Преобразование `tf.data.Dataset.cache` может кэшировать набор данных либо в памяти, либо в локальном хранилище. Это спасет некоторые операции (такие как открытие файла и чтение данных) от выполнения во время каждой эпохи.

In [0]:
benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)

![Кэшированный набор данных](https://www.tensorflow.org/guide/images/data_performance/cached_dataset.svg)

Когда вы кешируете набор данных, преобразования перед `cache` (например, открытие файла и чтение данных) выполняются только в течение первой эпохи. Следующие эпохи будут повторно использовать данные, кэшированные с помощью преобразования `cache` .

Если пользовательская функция, переданная в преобразование `map` является дорогостоящей, применяйте преобразование `cache` после преобразования `map` если полученный набор данных все еще может помещаться в память или локальное хранилище. Если пользовательская функция увеличивает пространство, необходимое для хранения набора данных, за пределами емкости кеша, либо примените ее после преобразования `cache` либо рассмотрите возможность предварительной обработки данных перед тренировкой, чтобы уменьшить использование ресурсов.

### Векторизация картирования

Вызов пользовательской функции, переданной в преобразование `map` , связан с планированием и выполнением пользовательской функции. Мы рекомендуем векторизовать определяемую пользователем функцию (то есть, чтобы она работала с партией входов одновременно) и применить `batch` преобразование *перед* преобразованием `map` .

Чтобы проиллюстрировать эту хорошую практику, ваш искусственный набор данных не подходит. Задержка планирования составляет около 10 микросекунд (10e-6 секунд), что намного меньше, чем десятки миллисекунд, используемых в `ArtificialDataset` , и, следовательно, ее влияние трудно увидеть.

Для этого примера используйте базовую функцию `tf.data.Dataset.range` и упростите обучающий цикл до его самой простой формы.

In [0]:
fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)
    
def increment(x):
    return x+1

#### Скалярное отображение

In [0]:
fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)

![Скалярная карта](https://www.tensorflow.org/guide/images/data_performance/scalar_map.svg)

График выше иллюстрирует, что происходит (с меньшим количеством образцов). Вы можете видеть, что отображенная функция применяется для каждого образца. Хотя эта функция очень быстрая, она имеет некоторые накладные расходы, которые влияют на производительность по времени.

#### Векторизация

In [0]:
fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)

![Векторизованная карта](https://www.tensorflow.org/guide/images/data_performance/vectorized_map.svg)

На этот раз сопоставленная функция вызывается один раз и применяется к партии выборки. Хотя выполнение функции может занять больше времени, накладные расходы появляются только один раз, что повышает общую производительность по времени.

### Уменьшение памяти

Ряд преобразований, включая `interleave` , `prefetch` и `shuffle` , поддерживают внутренний буфер элементов. Если пользовательская функция, переданная в преобразование `map` изменяет размер элементов, то порядок преобразования карты и преобразования, которые буферизуют элементы, влияют на использование памяти. В общем, мы рекомендуем выбирать порядок, который приводит к уменьшению объема памяти, если для производительности не требуется другой порядок.

#### Кэширование частичных вычислений

Рекомендуется кэшировать набор данных после преобразования `map` за исключением случаев, когда это преобразование делает данные слишком большими для размещения в памяти. Компромисс может быть достигнут, если ваша отображенная функция может быть разделена на две части: одну, занимающую много времени, и часть, занимающую память. В этом случае вы можете связать свои преобразования, как показано ниже:

```python
dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)
```

Таким образом, трудоемкая часть выполняется только в течение первой эпохи, и вы избегаете использовать слишком много места в кеше.

## Краткое изложение практики

Вот краткое изложение лучших практик для проектирования высокопроизводительных входных конвейеров TensorFlow:

- [Используйте преобразование `prefetch`](#Pipelining) чтобы перекрыть работу производителя и потребителя.
- [Распараллелить преобразование чтения данных,](#Parallelizing-data-extraction) используя преобразование `interleave` .
- [Распараллелить преобразование `map`](#Parallelizing-data-transformation) , установив аргумент `num_parallel_calls` .
- [Используйте преобразование `cache`](#Caching) для кэширования данных в памяти в течение первой эпохи
- [Векторизация пользовательских функций,](#Map-and-batch) переданных в преобразование `map`
- [Сократите использование памяти](#Reducing-memory-footprint) при применении преобразований `interleave` , `prefetch` и `shuffle` .

## Воспроизведение фигур

Примечание. В остальной части этого блокнота рассказывается о том, как воспроизвести приведенные выше рисунки. Не стесняйтесь поиграть с этим кодом, но его понимание не является важной частью этого руководства.

To go deeper in the `tf.data.Dataset` API understanding, you can play with your own pipelines. Below is the code used to plot the images from this guide. It can be a good starting point, showing some workarounds for common difficulties such as:

- Воспроизводимость времени исполнения;
- Сопоставленные функции нетерпеливого исполнения;
- преобразование `interleave`

In [0]:
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

### Набор данных

По аналогии с `ArtificialDataset` вы можете создать набор данных, возвращающий время, затраченное на каждый шаг.

In [0]:
class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))
    
    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset
    
    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])
        
        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter
        
        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter
            
            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered
            
    
    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Этот набор данных предоставляет образцы формы `[[2, 1], [2, 2], [2, 3]]` и типа `[tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]` . Каждый образец:

```
(   [("Open"), ("Read")],   [(t0, d), (t0, d)],   [(i, e, -1), (i, e, s)] )
```

Куда:

- `Open` и `Read` - идентификаторы шагов
- `t0` - отметка времени, когда начался соответствующий шаг
- `d` - время, проведенное на соответствующем шаге
- `i` индекс экземпляра
- `e` - индекс эпохи (количество повторений набора данных)
- `s` - индекс выборки

### Цикл итерации

Сделайте цикл итерации немного сложнее, чтобы агрегировать все тайминги. Это будет работать только с наборами данных, генерирующими образцы, как описано выше.

In [0]:
def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)
    
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)
            
            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter
            
            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)
        
        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)
    
    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

### Метод построения

Наконец, определите функцию, способную построить временную шкалу с учетом значений, возвращаемых функцией `timelined_benchmark` .

In [0]:
def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()
    
    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)
    
    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)
    
    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")
        
        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]
        
        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

### Используйте обертки для сопоставленной функции

Чтобы запустить сопоставленную функцию в активном контексте, вы должны обернуть их внутри вызова `tf.py_function` .

In [0]:
def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

### Сравнение трубопроводов

In [0]:
_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

#### наивный

In [0]:
@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)

### оптимизированный

In [0]:
@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.experimental.AUTOTUNE
    )
    .unbatch(),
    5
)

In [0]:
draw_timeline(naive_timeline, "Naive", 15)

In [0]:
draw_timeline(optimized_timeline, "Optimized", 15)