# Keras 를 사용한 Multi-worker 훈련
tf.distribute.Strategy API 를 사용하여 케라스 모델을 다중 워커로 분산 훈련하는 방법을 살펴봅니다.<BR>
tf.distribute.Strategy API 에 관한 내용은
[텐서플로로 분산 훈련하기](https://www.tensorflow.org/guide/distributed_training?hl=ko) 를 참고해주세요.

## 설정

In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals

In [None]:
try:
  # %tensorflow_version 기능은 코랩에서만 사용할 수 있습니다.
  %tensorflow_version 2.x
except Exception:
  pass
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

## Dataset 준비

In [None]:
BUFFER_SIZE = 10000
BATCH_SIZE = 64

# MNIST 데이터를 (0, 255] 범위에서 (0., 1.] 범위로 조정
def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255
  return image, label

datasets, info = tfds.load(name='mnist',
                           with_info=True,
                           as_supervised=True)

train_datasets_unbatched = datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)
train_datasets = train_datasets_unbatched.batch(BATCH_SIZE)

[1mDownloading and preparing dataset mnist/3.0.1 (download: 11.06 MiB, generated: 21.00 MiB, total: 32.06 MiB) to /root/tensorflow_datasets/mnist/3.0.1...[0m


local data directory. If you'd instead prefer to read directly from our public
GCS bucket (recommended if you're running on GCP), you can instead pass
`try_gcs=True` to `tfds.load` or set `data_dir=gs://tfds-data/datasets`.



[1mDataset mnist downloaded and prepared to /root/tensorflow_datasets/mnist/3.0.1. Subsequent calls will reuse this data.[0m


## Keras 모델 만들기

In [None]:
def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])
  model.compile(
      loss=tf.keras.losses.sparse_categorical_crossentropy,
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model

- 단일 워커로 모델 테스트

In [None]:
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(x=train_datasets, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7f91fee0d250>

## Multi-worker 구성
텐서플로에서 여러 장비를 사용할 때는 TF_CONFIG 환경 변수를 설정해야 합니다. 하나의 클러스터를 구성하는 각 장비에 클러스터 구성을 알려주고 각각 다른 역할을 부여할 수 있습니다.<BR>
- cluster : worker 같은 여러 타입의 작업 이름을 키로 하는 파이썬 딕셔너리로 훈련 클러스터에 대한 정보를 지정합니다. 이 중에 체크포인트를 저장하거나, 서머리를 쓰는 일 등을 추가로 담당하는 워커가 필요한데 이를 Chief 워커라고 합니다. 관례적으로 index 번호가 0인 워커가 치프워커가 됩니다.
- task : 현재 워커의 작업에 대한 정보를 지정합니다.

### TF_CONFIG 예시
두개의 워커를 localhost 에 띄우는 예시입니다. 실제로는 각 워커를 다른 장비에서 띄울텐데, 실제 IP 주소와 포트를 할당하고, 그에 맞게 TF_CONFIG 를 지정해야 합니다.<BR>
(주의. 해당 코드는 코랩에서는 실행하면 안됩니다. 주어진 IP와 포트로 gRPC 서버를 띄우려할텐데, 아마도 실패할 것입니다.)

In [None]:
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        # 'worker': ["13.124.188.202:12345", "3.37.345.50:23456"]
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

## 전략 선택
MultiWorkerMirroredStrategy 는 동기 다중 워커 훈련에서 추천하는 전략으로 모델의 레이어에 있는 모든 변수으 ㅣ복사본을 각 워커의 장치마다 만들어서 CollectiveOps 를 사용하여 그래디언트를 모으고, 각 변수의 값을 동기화합니다.<BR>
더 자세한 내용은 [tf.distribute.Strategy 가이드](https://www.tensorflow.org/guide/distributed_training?hl=ko)를 참고하세요.

In [None]:
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

## 모델 훈련
MultiWorkerMirroredStrategy 를 Keras 와 함께 사용하려면 모델 구성과 compile() 호출 코드를 strategy.scope() 안으로 넣어주면 됩니다.

In [None]:
NUM_WORKERS = 2
# 여기서 배치 크기는 워커의 수를 곱한 크기로 늘려야 합니다. `tf.data.Dataset.batch`에는
# 전역 배치 크기를 지정해야 하기 때문입니다. 전에는 64였지만, 이제 128이 됩니다.
GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS
train_datasets = train_datasets_unbatched.batch(GLOBAL_BATCH_SIZE)
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(x=train_datasets, epochs=3)

## 데이터셋 샤딩과 배치 크기
tfdistribute.Strategy API가 다중워커 훈련에 맞게 자동으로 데이터셋을 샤딩해줍니다.<BR>
만약에 직접 샤딩을 하고 싶다면 다음과 같이 자동 샤딩 기능을 끌 수 있습니다.

In [None]:
options = tf.data.Options()
options.experimental_distribute.auto_shard = False
train_datasets_no_auto_shard = train_datasets.with_options(options)

## 성능
MultiWorkerMirroredStrategy 를 사용하여 여러 워커를 사용하여 훈련할 수 있습니다.<BR>
가능하면 변수를 tf.float 타입으로 바꾸십시오.<BR>
[공식 ResNet 모델 예제](https://github.com/tensorflow/models/blob/8367cf6dabe11adf7628541706b660821f397dce/official/resnet/resnet_model.py#L466)

## ModelCheckpoint 콜백
다중 워커 훈련의 내결함 기능을 사용하려면, Model.fit() 을 호출할 때 ModelCheckpoint 의 인스턴스를 제공해야 합니다.

In [None]:
# `filepath` 매개변수를 모든 워커가 접근할 수 있는 파일 시스템 경로로 바꾸십시오.
callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath='/tmp/keras-ckpt')]
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(x=train_datasets, epochs=3, callbacks=callbacks)

* 워커가 정지당하면, 정지당한 워커가 다시 살아날 때까지 전체 클러스터가 잠시 멈추고, 워커가 다시 들어오면 다른 워커도 재시작 됩니다. 모든 워커가 이전에 저장한 체크포인트 파일을 읽고, 예전 상태를 불러오면 클러스터가 다시 일관된 상태가 됩니다.