<a href="https://colab.research.google.com/github/dowrave/Tensorflow_Basic/blob/main/220518_PerformanceOptimization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import time

- 지연 시간이 중간중간 들어가 있는 것만 포인트

In [None]:
# 기본 3개의 샘플 생성 + 1번째 항목이 파일 열기 전 휴면 + 읽기 시뮬레이션 항목 생성 전에 휴면
class ArtificialDataset(tf.data.Dataset):
  def _generator(num_samples):
    time.sleep(.03)

    for sample_idx in range(num_samples):
      time.sleep(.015)
      yield (sample_idx, )

  def __new__(cls, num_samples = 3):
    return tf.data.Dataset.from_generator(
        cls._generator,
        output_types = tf.dtypes.int64,
        output_shapes = (1, ),
        args = (num_samples, )
    )

- 훈련 루프 : 데이터셋 반복 시간을 측정하는 더미 훈련 루프. 훈련 시간이 시뮬레이션 됨

In [None]:
def benchmark(dataset, num_epochs = 2):
  start_time = time.perf_counter()
  for epoch_num in range(num_epochs):
    for sample in dataset:
      time.sleep(.01)
  tf.print("실행 시간 : ", time.perf_counter() - start_time)

# 성능 최적화

In [None]:
# 1. 그냥 실행
benchmark(ArtificialDataset()) # 0.29 ~ 0.30

- 위 실행 시간은 파일 열기 -> 읽기 -> 훈련하기 -> 읽기 -> 훈련하기 각각의 과정이 1개씩만 진행됨.
  - 즉 하나가 진행되면 나머지 2개는 유휴 상태임
- 이를 개선하는 방법이 아래와 같음

## 가져오기(Prefetching)
- 전처리 ~ 훈련 스텝 실행을 오버랩함. 모델이 s스텝을 실행하는 동안 입력 파이프라인은 s+1 스텝의 데이터를 읽음.
- `tf.data.Dataset.prefetch` 변환이 제공되며 이는 데이터 소비 시간과 생성 시간 간의 의존성을 줄일 수 있음. 
  - 백그라운드 스레드와 내부 버퍼를 사용하여 요청된 시간 전에 입력 데이터셋에서 요소를 가져옴.
  - 요소 수는 배치 수와 같거나 커야 함
  - 이를 수동으로 조정하거나 `tf.data.experimental.AUTOTUNE`으로 설정하면 `tf.data` 런타임이 실행 시 동적으로 값을 조정함


In [None]:
benchmark(
    ArtificialDataset().prefetch(tf.data.experimental.AUTOTUNE)
) # 0.23

## 데이터 추출 병렬화
- 실제 환경에서는 입력 데이터를 원격으로(GCS, HDFS) 저장할 수 있음.
- 로컬에서 읽는 데이터셋 파이프라인은 로컬, 원격 저장소의 차이 때문에 원격으로 데이터를 읽을 때 입출력에 병목이 발생할 수 있음
  - 1번째 바이트 : 원격 저장소에서 1번째 바이트를 읽는 건 로컬보다 훨씬 오래 걸림
  - 읽기 처리량 : 원격 저장소는 큰 총 대역폭을 가지나 한 파일을 읽을 땐 일부만 사용 가능함
  - 바이트들이 메모리로 읽히면 데이터를 역직렬화하고 해독할 필요가 있을 수 있다.
    - 이는 추가 계산이 필요함.
  
- 다양한 데이터 추출 오버헤드 영향을 줄이고자, `tf.data.Dataset.interleave` 변환이 있다. 이는 데이터 추출 단계를 병렬화하는데 사용할 수 있음.
  - 중첩할 데이터셋은 `cycle_length`로 지정
  - 병렬처리 수준은 `num_parallel_calls`로 지정.
- `prefetch`, `map` 변환과 비슷하게 `interleave` 변환은 `tf.data.experimental.AUTOTUNE`을 지원한다.

In [None]:
# 순차적
benchmark(tf.data.Dataset.range(2).interleave(ArtificalDataset)) # 0.29s

- 위 코드는 실질적인 성능의 향상은 없지만, 파일을 여는 과정을 분리함(원래는 전체를 열고 시작했다면, 여기서는 1/2 열고 일부를 읽고 훈련하고 1/2 열고 일부를 읽고 훈련하고 를 반복함

In [None]:
# 병렬 인터리브
benchmark(tf.data.Dataset.range(2).interleave(ArtificialDataset, num_parallel_calls = tf.data.experimental.AUTOTUNE)) # 0.21s

- 왜 처음 실행할 때는 느렸다가 갈수록 빨라지는지 모르겠지만 얘도 0.2초대까지 빨라짐

## 데이터 변환 병렬화
- 입력 요소들은 전처리가 필요할 수 있다. 이를 위해 `tf.data.Dataset.map` 변환이 있고 이는 사용자 정의 함수를 입력 데이터셋의 각 요소에 적용함.
  - 입력 요소들이 독립적이기 때문에 여러 CPU 코어에서 병렬로 실행될 수 있다.
- 이를 위해 `map` 변환도 `num_parallel_calls` 인수를 지원한다. 
- 얘도 `tf.data.experimental.AUTOTUNE`을 지원함.

In [None]:
def mapped_function(s):
  tf.py_function(lambda: time.sleep(0.03), [], ())
  return s

In [None]:
# 순차적
benchmark(ArtificialDataset().map(mapped_function)) # 0.47 ~ 0.65s

# 병렬
benchmark(ArtificialDataset().map(mapped_function, num_parallel_calls = tf.data.experimental.AUTOTUNE)) # 0.33s

## 캐시하기
`tf.data.Dataset.cache` 변환은 데이터셋을 메모리, 로컬 저장소에 캐시할 수 있다. 각 에포크 동안 실행되는 일부 작업이 저장됨.

In [None]:
benchmark(ArtificialDataset().map(
    mapped_function # 캐시 전 시간이많이 걸리는 작업
    ).cache(), 5)

- cache 이전의 변환은 1번째 에포크 동안에만 실행된다. 다음 에포크에는 cache 변환에 의해 캐시된 데이터를 재사용한다.

## 매핑 벡터화
- `map` 변환으로 전달된 사용자 정의 함수를 호출 시 사용자 정의 함수의 스케줄링 및 실행에 관련된 오버헤드가 있다.
- 사용자 정의 함수를 벡터화(한 번에 여러 입력에 작동하도록)하고 맵을 변환하기 전에 배치 변환을 하는 것이 좋다.
- <b> 무슨 얘기다? 데이터셋을 벡터화(batch로 만듦)하고 map 함수를 가해라~ </b>

In [None]:
fast_dataset = tf.data.Dataset.range(1000)

def fast_benchmark(dataset, num_epochs = 2):
  start_time = time.perf_counter()
  for _ in tf.data.Dataset.range(num_epochs):
    for _ in dataset:
      pass
  tf.print("실행 시간 : ", time.perf_counter() - start_time)

def increment(x):
  return x + 1

In [None]:
# 스칼라 매핑
fast_benchmark(fast_dataset.map(increment).batch(256)) # 0.08s : 매핑이 여러 번에 걸쳐 일어남

In [None]:
# 벡터 매핑
fast_benchmark(fast_dataset.batch(256).map(increment)) # 0.03s : 매핑은 딱 1번만 진행됨

## 메모리 사용량 줄이기
- `interleave`, `prefetch`, `shuffle` 등의 많은 변환은 요소들의 내부 버퍼를 유지한다. 대충 메모리 사용량이 낮아지는 순서를 고르라는 얘기

In [None]:
# 부분계산 캐싱 : 데이터가 너무 큰 경우를 제외하면 map 변환 후 데이터셋을 캐시하는 것이 좋다.
dataset.map(time_consuming_mapping).cache().map(memory.consuming_mapping)

# 예제
- `prefetch 변환` : 프로듀서, 컨슈머의 작업 오버랩
- `interleave 변환` : 데이터 읽기 변환 병렬화
- `num_parallel_calls` : map 변환 병렬 처리
- `cache 변환` : 데이터가 메모리에 저장될 수 있다면 1번째 에포크 동안 데이터를 메모리에 캐시
- `map` 변환 : 사용자 정의 함수 벡터화 (이건 batch 먼저 지정하라는 뜻)
 `interleave`, `prefetch`, `shuffle` 변환 -> 메모리 사용 줄여라

## 그림 재현
- 이 안내서에 나온 이미지를 그리는 데 사용된 코드임
- 여기는 그냥 복붙함. 따라친다고 이해될 부분은 아님

In [None]:
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt 

### 데이터셋 
- 각 단계에서 소요된 시간을 리턴하는 데이터셋

In [None]:
class TimeMeasuredDataset(tf.data.Dataset):
    # 출력: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # 생성된 데이터셋 수
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # 각 데이터를 수행한 에포크 수

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # 파일 열기
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # 파일에서 데이터(줄, 기록) 읽어오기
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # 음수는 필터링됨


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

### 반복 루프

In [None]:
def timelined_benchmark(dataset, num_epochs=2):
    # 누산기 초기화
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # 데이터셋 준비 정보 기록하기
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # 훈련 시간 시뮬레이션
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # 훈련 정보 기록하기
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # 에포크 정보 기록하기
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("실행 시간:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

In [None]:
def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # 타임라인에서 유효하지 않은 항목(음수 또는 빈 스텝) 제거
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # 처음 발견될 때 순서대로 다른 스텝을 가져옵니다.
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # 시작 시간을 0으로 하고 최대 시간 값을 계산하십시오.
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # 주어진 단계에 대한 타이밍과 주석 얻기
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

In [None]:
# 즉시 실행에서 매핑된 함수라면, 그래프로 만들어준다.

def map_decorator(func):
    def wrapper(steps, times, values):
        # 자동 그래프가 메서드를 컴파일하지 못하도록 tf.py_function을 사용
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

In [None]:
_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

In [None]:
# 그냥 - 최적화 X
@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # 시간 소비 스텝
    time.sleep(0.0001)  # 메모리 소비 스텝
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)

In [None]:
@map_decorator
def time_consumming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # 시간 소비 스텝
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consumming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # 메모리 소비 스텝
    map_elapsed = time.perf_counter() - map_enter

    # 배치 차원을 다루는 데 tf.tile 사용
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # 데이터 읽기 병렬화
        dataset_generator_fun,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(  # 매핑된 함수 벡터화
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # 맵 변환 병렬화
        time_consumming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .cache()  # 데이터 캐시
    .map(  # 메모리 사용량 줄이기
        memory_consumming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .prefetch(  # 프로듀서와 컨슈머 작업 오버랩
        tf.data.experimental.AUTOTUNE
    )
    .unbatch(),
    5
)