In [1]:
import tensorflow as tf 

import numpy as np 
import os 
tf.__version__

'2.1.0'

In [2]:
fashion_mnist = tf.keras.datasets.fashion_mnist

In [3]:
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

In [4]:
train_images.shape

(60000, 28, 28)

In [5]:
train_labels.shape

(60000,)

In [6]:
train_images = train_images[..., None]
train_images.shape

(60000, 28, 28, 1)

In [7]:
test_images = test_images[...,None]

In [8]:
test_images.shape

(10000, 28, 28, 1)

In [9]:
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

In [10]:
strategy = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [11]:
BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

In [12]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_images)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)

In [13]:
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)

In [14]:
train_dataset

<BatchDataset shapes: ((None, 28, 28, 1), (None, 28, 28, 1)), types: (tf.float32, tf.float32)>

In [15]:
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)

In [16]:
train_dist_dataset

<tensorflow.python.distribute.input_lib.DistributedDataset at 0x7f3ee81e4780>

In [17]:
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

In [18]:
def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(64, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    return model

In [19]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')

In [20]:
checkpoint_prefix

'./training_checkpoints/ckpt'

In [21]:
with strategy.scope():
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
        reduction=tf.keras.losses.Reduction.NONE
    )

    def compute_loss(labels, predictions):
        pre_example_loss = loss_object(labels, predictions)
        return tf.nn.compute_average_loss(
            per_example_loss=pre_example_loss,
            global_batch_size=GLOBAL_BATCH_SIZE
        )

In [22]:
with strategy.scope():
    test_loss = tf.keras.metrics.Mean(name='test_loss')

    train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
    test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

In [23]:
# 必须在`strategy.scope`下创建模型和优化器。
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

In [24]:
with strategy.scope():
  def train_step(inputs):
    images, labels = inputs

    with tf.GradientTape() as tape:
      predictions = model(images, training=True)
      loss = compute_loss(labels, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_accuracy.update_state(labels, predictions)
    return loss 

  def test_step(inputs):
    images, labels = inputs

    predictions = model(images, training=False)
    t_loss = loss_object(labels, predictions)

    test_loss.update_state(t_loss)
    test_accuracy.update_state(labels, predictions)

In [25]:
with strategy.scope():
  # `experimental_run_v2`将复制提供的计算并使用分布式输入运行它。
  @tf.function
  def distributed_train_step(dataset_inputs):
    per_replica_losses = strategy.experimental_run_v2(train_step,
                                                      args=(dataset_inputs,))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                           axis=None)
 
  @tf.function
  def distributed_test_step(dataset_inputs):
    return strategy.experimental_run_v2(test_step, args=(dataset_inputs,))

  for epoch in range(EPOCHS):
    # 训练循环
    total_loss = 0.0
    num_batches = 0
    for x in train_dist_dataset:
      total_loss += distributed_train_step(x)
      num_batches += 1
    train_loss = total_loss / num_batches

    # 测试循环
    for x in test_dist_dataset:
      distributed_test_step(x)

    if epoch % 2 == 0:
      checkpoint.save(checkpoint_prefix)

    template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
                "Test Accuracy: {}")
    print (template.format(epoch+1, train_loss,
                           train_accuracy.result()*100, test_loss.result(),
                           test_accuracy.result()*100))

    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

INFO:tensorflow:Error reported to Coordinator: in converted code:

    <ipython-input-24-5aded70f8f45>:7 train_step  *
        loss = compute_loss(labels, predictions)
    <ipython-input-21-590804caf2e0>:7 compute_loss  *
        pre_example_loss = loss_object(labels, predictions)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/keras/losses.py:126 __call__
        losses = self.call(y_true, y_pred)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/keras/losses.py:221 call
        return self.fn(y_true, y_pred, **self._fn_kwargs)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/keras/losses.py:978 sparse_categorical_crossentropy
        y_true, y_pred, from_logits=from_logits, axis=axis)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/keras/backend.py:4573 s

ValueError: in converted code:

    <ipython-input-25-6439d0e9d271>:5 distributed_train_step  *
        per_replica_losses = strategy.experimental_run_v2(train_step,
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/distribute/distribute_lib.py:763 experimental_run_v2
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    <ipython-input-24-5aded70f8f45>:7 train_step  *
        loss = compute_loss(labels, predictions)
    <ipython-input-21-590804caf2e0>:7 compute_loss  *
        pre_example_loss = loss_object(labels, predictions)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/keras/losses.py:126 __call__
        losses = self.call(y_true, y_pred)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/keras/losses.py:221 call
        return self.fn(y_true, y_pred, **self._fn_kwargs)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/keras/losses.py:978 sparse_categorical_crossentropy
        y_true, y_pred, from_logits=from_logits, axis=axis)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/keras/backend.py:4573 sparse_categorical_crossentropy
        labels=target, logits=output)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/ops/nn_ops.py:3537 sparse_softmax_cross_entropy_with_logits_v2
        labels=labels, logits=logits, name=name)
    /home/mark/.pyenv/versions/anaconda3-5.0.0/envs/tf2/lib/python3.6/site-packages/tensorflow_core/python/ops/nn_ops.py:3453 sparse_softmax_cross_entropy_with_logits
        logits.get_shape()))

    ValueError: Shape mismatch: The shape of labels (received (50176,)) should equal the shape of logits except for the last dimension (received (64, 10)).
