In [1]:
import tensorflow as tf
import numpy as np
import os


# 패션 MNIST 데이터셋 다운로드

In [2]:
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels),(test_images,test_labels) = fashion_mnist.load_data()

In [3]:
train_images = train_images[...,None]
test_images = test_images[...,None]

In [4]:
train_images.shape

(60000, 28, 28, 1)

In [5]:
# 이미지를 [0,1]범위로 변경하기
train_images = train_images/np.float32(255)
test_images = test_images/np.float32(255)

In [6]:
# 변수와 그래프를 분산하는 전략 만들기
strategy = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [7]:
# 그래프와 변수를 플랫폼과 무관한 SavedModel형식으로 내보냅니다. 
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS=10

In [8]:
# 분산 데이터셋들을 strategy.scope내에 생성
with strategy.scope():
    train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
    train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)

    test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
    test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

In [9]:
def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

  return model

In [10]:
# 체크포인트들을 저장하기 위해서 체크포인트 디렉토리를 생성합니다.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

In [11]:
# tf.distribute.Strategy를 사용할 때, 손실은 어떻게 계산되어야 하나요
# 4개의 GPU가 있고 입력 배치 크기가 64라고 하면 입력 배치 하나가 여러 개의 장치(4개의 GPU)에 분배
# 16이 아니고 64로 나누는 이유는 그래디언터들이 각 장치에서 계산된 다음, 모든 장치를 동기화하기 위해 이 그래디언트 값들을 전부 더하기 때문
with strategy.scope():
    # reduction을 'none'으로 설정. 축소를 나중에 하고,
    # GLOBAL_BATCH_SIZE(64)로 나눌 수 있습니다.
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(reduction=tf.keras.losses.Reduction.NONE)
    # 또는 loss_fn = tf.keras.losses.sparse_categorical_crossentropy를 사용해도 됩니다.
    def compute_loss(labels, predictions):
        per_example_loss = loss_object(labels, predictions)
        return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

In [12]:
with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

In [13]:
# 모델과 옵티마이저는 `strategy.scope`에서 만들어져야 합니다.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

In [14]:
with strategy.scope():
  def train_step(inputs):
    images, labels = inputs

    with tf.GradientTape() as tape:
      predictions = model(images, training=True)
      loss = compute_loss(labels, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_accuracy.update_state(labels, predictions)
    return loss 

  def test_step(inputs):
    images, labels = inputs

    predictions = model(images, training=False)
    t_loss = loss_object(labels, predictions)

    test_loss.update_state(t_loss)
    test_accuracy.update_state(labels, predictions)

In [16]:
with strategy.scope():
  # `experimental_run_v2`는 주어진 계산을 복사하고,
  # 분산된 입력으로 계산을 수행합니다.
  
  @tf.function
  def distributed_train_step(dataset_inputs):
    per_replica_losses = strategy.experimental_run_v2(train_step,
                                                      args=(dataset_inputs,))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                           axis=None)
 
  @tf.function
  def distributed_test_step(dataset_inputs):
    return strategy.experimental_run_v2(test_step, args=(dataset_inputs,))

  for epoch in range(EPOCHS):
    # 훈련 루프
    total_loss = 0.0
    num_batches = 0
    for x in train_dist_dataset:
      total_loss += distributed_train_step(x)
      num_batches += 1
    train_loss = total_loss / num_batches

    # 테스트 루프
    for x in test_dist_dataset:
      distributed_test_step(x)

    if epoch % 2 == 0:
      checkpoint.save(checkpoint_prefix)

    template = ("에포크 {}, 손실: {}, 정확도: {}, 테스트 손실: {}, "
                "테스트 정확도: {}")
    print (template.format(epoch+1, train_loss,
                           train_accuracy.result()*100, test_loss.result(),
                           test_accuracy.result()*100))

    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

에포크 1, 손실: 0.1509631723165512, 정확도: 94.36333465576172, 테스트 손실: 0.27013859152793884, 테스트 정확도: 90.94000244140625
에포크 2, 손실: 0.1411147266626358, 정확도: 94.76333618164062, 테스트 손실: 0.2779633104801178, 테스트 정확도: 90.8499984741211
에포크 3, 손실: 0.13062159717082977, 정확도: 95.08666229248047, 테스트 손실: 0.28512436151504517, 테스트 정확도: 90.55000305175781
에포크 4, 손실: 0.11979230493307114, 정확도: 95.5133285522461, 테스트 손실: 0.28708377480506897, 테스트 정확도: 91.11000061035156
에포크 5, 손실: 0.11318346112966537, 정확도: 95.77999877929688, 테스트 손실: 0.2867371439933777, 테스트 정확도: 91.0199966430664
에포크 6, 손실: 0.10535977780818939, 정확도: 96.10000610351562, 테스트 손실: 0.3474065661430359, 테스트 정확도: 89.95000457763672
에포크 7, 손실: 0.09534232318401337, 정확도: 96.38833618164062, 테스트 손실: 0.3346357047557831, 테스트 정확도: 90.5999984741211
에포크 8, 손실: 0.08927672356367111, 정확도: 96.65333557128906, 테스트 손실: 0.3230447471141815, 테스트 정확도: 91.00999450683594
에포크 9, 손실: 0.08220059424638748, 정확도: 96.94332885742188, 테스트 손실: 0.35810238122940063, 테스트 정확도: 90.8699951171875
에포크 

In [17]:
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)

In [18]:
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)

In [19]:
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도: {}'.format(
    eval_accuracy.result()*100))

전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도: 90.8699951171875
