10. 하나의 머신에 여러 개의 GPU에서 MirroredStrategy 전략으로 모델을 훈련해보세요(GPU를 준비하지 못하면 코랩의 GPU 런타임을 사용하여 가상 GPU 2개를 만들 수 있습니다). CentralStorageStrategy 전략으로 모델을 다시 훈련하고 훈련 시간을 비교해보세요.

In [None]:
# setting

# Python
import sys
assert sys.version_info >= (3,5)

# sklearn
import sklearn
assert sklearn.__version__ >= "0.20"

try: 
  # tensorflow_version in colab only
  %tensorflow_version 2.x
  !echo "deb http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" > /etc/apt/sources.list.d/tensorflow-serving.list
  !curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | apt-key add -
  !apt update && apt-get install -y tensorflow-model-server
  %pip install -q -U tensorflow-serving-api
  IS_COLAB = True
except Exception:
  IS_COLAB = False

# tensorflow
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

if not tf.config.list_physical_devices('GPU'):
  print("감지된 GPU가 없습니다. GPU가 없으면 LSTM과 CNN이 매우 느릴 수 있습니다.")
  if IS_COLAB:
    print("런타임 > 런타임 유현 변경 메유를 선택하고 하드웨어 가속기로 GPU를 고르세요.")

# module 
import numpy as np
import os

# consistency
np.random.seed(42)
tf.random.set_seed(42)

# graph
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize = 14)
mpl.rc('xtick', labelsize = 12)
mpl.rc('ytick', labelsize = 12)

# save pic
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "deploy"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok = True)

def save_fig(fig_id, tight_layout = True, fig_extension = "png", resolution = 300):
  path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
  print("그림 저장")
  if tight_layout:
    plt.tight_layout()
  plt.savefig(path, format = fig_extension, dpi = resolution)


Colab only includes TensorFlow 2.x; %tensorflow_version has no effect.
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2943  100  2943    0     0   2574      0  0:00:01  0:00:01 --:--:--  2574
OK
Get:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Get:3 http://storage.googleapis.com/tensorflow-serving-apt stable InRelease [3,026 B]
Get:4 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease [18.1 kB]
Hit:6 http://archive.ubuntu.com/ubuntu focal InRelease
Get:7 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Hit:8 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Get:9 http://storage.googleapis.com/tensorflow-serving-apt stable/tensorf

분산 훈련

많은 모델이 하나의 GPU or CPU에서 훈련될 수 있지만 훈련 속도가 너무 느리면 같은 머신에 있는 여러개의 GPU에 분산할 수 있다. 훈련 속도가 느릴 경우 빠르게 만드는 방법이 몇가지 있다.
> 1) 구글 클라우드 AI 플랫폼에 있는 TPU 사용
https://cloud.google.com/tpu/docs/intro-to-tpu?hl=ko

> 2) GPU를 여러 개 가진 서버 여러 대에서 모델 훈련

> 3) 모델 병렬화 사용

> 같은 머신에서 GPU 여러 개로 시작한 다음 여러 머신에서 GPU  여러 개 사용

In [None]:
# Data 
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.
X_test = X_test[..., np.newaxis].astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_new = X_test[:3]

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

In [None]:
def create_model():
  return keras.models.Sequential([
      keras.layers.Conv2D(filters = 64, kernel_size = 7, activation = "relu",
                          padding = "same", input_shape = [28, 28, 1]),
      keras.layers.MaxPooling2D(pool_size = 2),
      keras.layers.Conv2D(filters=128, kernel_size = 3, activation = "relu",
                          padding = "same"),
      keras.layers.Conv2D(filters=128, kernel_size = 3, activation = "relu",
                          padding = "same"),
      keras.layers.MaxPooling2D(pool_size = 2),
      keras.layers.Flatten(),
      keras.layers.Dense(units=64, activation="relu"),
      keras.layers.Dropout(0.5),
      keras.layers.Dense(units = 10, activation = "softmax"),
  ])

In [None]:
batch_size = 100
model = create_model()
model.compile(loss = "sparse_categorical_crossentropy",
              optimizer = keras.optimizers.SGD(learning_rate = 1e-2),
              metrics = ["accuracy"])
model.fit(X_train, y_train, epochs = 10, 
          validation_data = (X_valid, y_valid), batch_size = batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7fa66100cc70>

* MirroredStrategy

> Tensorflow 복잡성을 대신 처리해주는 간단한 분석 전략 API 제공
MirroredStrategy: 데이터 병렬화를 사용해 가능한 모든 GPU에서 케라스 모델을 훈련시킨다. 

> MirroredStrategy() 객체 만들고, scope() 메서드를 호출하여 분산 컨텍스트를 얻는다.
이 컨텍스트로 모델 생성과 컴파일 과정을 감싼다. 
보통 모델처럼 fit() 메서드를 호출한다.

>> 내부적으로 tf.keras는 분산을 자동으로 인식한다.

>> MirroredStrategy 컨텍스트 안에서 모든 변수와 연산이 가능한 모든 GPU 장치에 복제되어야 하는 것을 알고 있다.

>> fit() 메서드는 자동으로 훈련 배치를 모든 복제 모델에 나눈다.

>> 배치 크기가 복제 모델의 개수로 나누어 떨어져야 한다.

In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

distribution = tf.distribute.MirroredStrategy()

# Change the default all-reduce algorithm:
#distribution = tf.distribute.MirroredStrategy(
#    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

# Specify the list of GPUs to use:
#distribution = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

# Use the central storage strategy instead:
#distribution = tf.distribute.experimental.CentralStorageStrategy()

#if IS_COLAB and "COLAB_TPU_ADDR" in os.environ:
#  tpu_address = "grpc://" + os.environ["COLAB_TPU_ADDR"]
#else:
#  tpu_address = ""
#resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_address)
#tf.config.experimental_connect_to_cluster(resolver)
#tf.tpu.experimental.initialize_tpu_system(resolver)
#distribution = tf.distribute.experimental.TPUStrategy(resolver)


with distribution.scope():
  model = create_model()
  model.compile(loss = "sparse_categorical_crossentropy",
                optimizer = keras.optimizers.SGD(learning_rate = 1e-2),
                metrics = ["accuracy"])

In [None]:
batch_size = 100 # must be divisible by the number of workers
model.fit(X_train, y_train, epochs = 10, 
          validation_data = (X_valid, y_valid), batch_size = batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f0e402939d0>

In [None]:
model.predict(X_new)



array([[8.4251001e-10, 2.9088641e-08, 3.3689443e-07, 3.0173635e-07,
        1.9761372e-10, 3.5851899e-09, 3.6592305e-12, 9.9999487e-01,
        6.3325223e-10, 4.4634776e-06],
       [3.1198788e-08, 2.0205212e-04, 9.9979395e-01, 1.1756342e-06,
        7.0201622e-10, 9.8169795e-10, 9.3250094e-08, 3.7634737e-07,
        2.2979877e-06, 2.5762759e-10],
       [1.5152257e-06, 9.9925452e-01, 1.6152777e-05, 1.9996974e-06,
        1.1874104e-04, 3.1586972e-06, 3.4926034e-04, 2.3067753e-04,
        1.9584379e-05, 4.4090898e-06]], dtype=float32)

In [None]:
# 사용자 정의 훈련 루프_MirroredStrategy

keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

K = keras.backend

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
  model = create_model()
  optimizer = keras.optimizers.SGD()

with distribution.scope():
  dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat().batch(batch_size)
  input_iterator = distribution.make_dataset_iterator(dataset)

@tf.function
def train_step():
  def step_fn(inputs):
    X, y = inputs
    with tf.GradientTape() as tape:
      Y_proba = model(X)
      loss = K.sum(keras.losses.sparse_categorical_crossentropy(y, Y_proba)) / batch_size

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss

  per_replica_losses = distribution.experimental_run(step_fn, input_iterator)
  mean_loss = distribution.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis = None)
  return mean_loss

n_epochs = 10
with distribution.scope():
  input_iterator.initialize()
  for epoch in range(n_epochs):
    print("Epoch {}/{}".format(epoch + 1, n_epochs))
    for iteration in range(len(X_train) // batch_size):
      print("\rLoss: {:.3f}".format(train_step().numpy()), end ="")
    print()

Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
Instructions for updating:
use run() instead


Epoch 1/10
Loss: 0.373
Epoch 2/10
Loss: 0.282
Epoch 3/10
Loss: 0.260
Epoch 4/10
Loss: 0.254
Epoch 5/10
Loss: 0.248
Epoch 6/10
Loss: 0.242
Epoch 7/10
Loss: 0.235
Epoch 8/10
Loss: 0.230
Epoch 9/10
Loss: 0.221
Epoch 10/10
Loss: 0.214


In [None]:
# 모델을 로드하여 가능한 모든 장치에서 실행하고 싶다면 
# 분산 컨텍스트 안에서 keras.models.load_model()호출

with distribution.scope():
  mirrored_model = keras.models.load_model("my_mnist_model.h5")


In [None]:
# 가능한 GPU 장치 중 일부만 사용하고 싶다면
# MirroredStartegy 생성자에 장치 리스트를 전달

distribution = tf.distribute.MirroredStrategy(["/gpu:0", "/gpu:1"])

- 중앙 집중적인 파라미터로 데이터 병렬화 사용

> MirroredStrategy() -> CentralStorageStrategy()

> => tf.distribute.experimental.CentralStorageStrategy()

In [None]:
# 사용자 정의 훈련 루프_CentralStorageStrategy

keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

K = keras.backend

distribution = tf.distribute.experimental.CentralStorageStrategy()
# distribution = tf.distribute.MirroredStrategy() => experimental.CentralStorageStrategy()

with distribution.scope():
  model = create_model()
  optimizer = keras.optimizers.SGD()

with distribution.scope():
  dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat().batch(batch_size)
  input_iterator = distribution.make_dataset_iterator(dataset)

@tf.function
def train_step():
  def step_fn(inputs):
    X, y = inputs
    with tf.GradientTape() as tape:
      Y_proba = model(X)
      loss = K.sum(keras.losses.sparse_categorical_crossentropy(y, Y_proba)) / batch_size

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss

  per_replica_losses = distribution.experimental_run(step_fn, input_iterator)
  mean_loss = distribution.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis = None)
  return mean_loss

n_epochs = 10
with distribution.scope():
  input_iterator.initialize()
  for epoch in range(n_epochs):
    print("Epoch {}/{}".format(epoch + 1, n_epochs))
    for iteration in range(len(X_train) // batch_size):
      print("\rLoss: {:.3f}".format(train_step().numpy()), end ="")
    print()

Instructions for updating:
Use the iterator's `initializer` property instead.
Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
Instructions for updating:
use run() instead


Epoch 1/10
Loss: 0.406
Epoch 2/10
Loss: 0.304
Epoch 3/10
Loss: 0.282
Epoch 4/10
Loss: 0.272
Epoch 5/10
Loss: 0.268
Epoch 6/10
Loss: 0.263
Epoch 7/10
Loss: 0.259
Epoch 8/10
Loss: 0.253
Epoch 9/10
Loss: 0.247
Epoch 10/10
Loss: 0.241
