# Distributed training with Keras

## 0. Overview

### i. Keras API

## 1. Import dependencies

In [1]:
# Import TensorFlow and TensorFlow Datasets

import tensorflow_datasets as tfds
import tensorflow as tf

import os

print("The version of Tensorflow: {}".format(tf.__version__))

The version of Tensorflow: 2.4.1


## 2. Download the dataset

In [2]:
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)

mnist_train, mnist_test = datasets['train'], datasets['test']

[1mDownloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to C:\Users\admin\tensorflow_datasets\mnist\3.0.1...[0m


HBox(children=(HTML(value='Dl Completed...'), FloatProgress(value=1.0, bar_style='info', layout=Layout(width='…

HBox(children=(HTML(value='Dl Size...'), FloatProgress(value=1.0, bar_style='info', layout=Layout(width='20px'…

HBox(children=(HTML(value='Extraction completed...'), FloatProgress(value=1.0, bar_style='info', layout=Layout…









HBox(children=(HTML(value='Generating splits...'), FloatProgress(value=0.0, max=2.0), HTML(value='')))

HBox(children=(HTML(value='Generating train examples...'), FloatProgress(value=1.0, bar_style='info', layout=L…

HBox(children=(HTML(value='Shuffling mnist-train.tfrecord...'), FloatProgress(value=0.0, max=60000.0), HTML(va…

HBox(children=(HTML(value='Generating test examples...'), FloatProgress(value=1.0, bar_style='info', layout=La…

HBox(children=(HTML(value='Shuffling mnist-test.tfrecord...'), FloatProgress(value=0.0, max=10000.0), HTML(val…

[1mDataset mnist downloaded and prepared to C:\Users\admin\tensorflow_datasets\mnist\3.0.1. Subsequent calls will reuse this data.[0m


## 3. Define distribution strategy

In [3]:
strategy = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


In [4]:
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

Number of devices: 1


## 4. Setup input pipeline

In [5]:
# You can also do info.splits.total_num_examples to get the total
# number of examples in the dataset.

num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

In [6]:
def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255

  return image, label

In [7]:
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

## 5. Create the model

In [8]:
with strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])

  model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])

INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


## 6. Define the callbacks

In [9]:
# Define the checkpoint directory to store the checkpoints

checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

In [10]:
# Function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
  if epoch < 3:
    return 1e-3
  elif epoch >= 3 and epoch < 7:
    return 1e-4
  else:
    return 1e-5

In [12]:
# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1,model.optimizer.lr.numpy()))

In [13]:
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

## 7. Train and evaluate

In [14]:
model.fit(train_dataset, epochs=12, callbacks=callbacks)

Epoch 1/12
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).







Learning rate for epoch 1 is 0.0010000000474974513
Epoch 2/12

Learning rate for epoch 2 is 0.0010000000474974513
Epoch 3/12

Learning rate for epoch 3 is 0.0010000000474974513
Epoch 4/12

Learning rate for epoch 4 is 9.999999747378752e-05
Epoch 5/12

Learning rate for epoch 5 is 9.999999747378752e-05
Epoch 6/12

Learning rate for epoch 6 is 9.999999747378752e-05
Epoch 7/12

Learning rate for epoch 7 is 9.999999747378752e-05
Epoch 8/12

Learning rate for epoch 8 is 9.999999747378752e-06
Epoch 9/12

Learning rate for epoch 9 is 9.999999747378752e-06
Epoch 10/12

Learning rate for epoch 10 is 9.999999747378752e-06
Epoch 11/12

Learning rate for epoch 11 is 9.999999747378752e-06
Epoch 12/12

Learning rate for epoch 12 is 9.999999747378752e-06


<tensorflow.python.keras.callbacks.History at 0x27fe22d6130>

In [21]:
%ls training_checkpoints

 驱动器 D 中的卷是 Data
 卷的序列号是 12E0-D62D

 D:\miniconda\workspace\TensorFlow2\training_checkpoints 的目录

2021/04/02  00:11    <DIR>          .
2021/04/02  00:11    <DIR>          ..
2021/04/02  00:11                71 checkpoint
2021/04/02  00:11         4,168,209 ckpt_1.data-00000-of-00001
2021/04/02  00:11             1,654 ckpt_1.index
2021/04/02  00:11         4,168,209 ckpt_10.data-00000-of-00001
2021/04/02  00:11             1,654 ckpt_10.index
2021/04/02  00:11         4,168,209 ckpt_11.data-00000-of-00001
2021/04/02  00:11             1,654 ckpt_11.index
2021/04/02  00:11         4,168,209 ckpt_12.data-00000-of-00001
2021/04/02  00:11             1,654 ckpt_12.index
2021/04/02  00:11         4,168,209 ckpt_2.data-00000-of-00001
2021/04/02  00:11             1,654 ckpt_2.index
2021/04/02  00:11         4,168,209 ckpt_3.data-00000-of-00001
2021/04/02  00:11             1,654 ckpt_3.index
2021/04/02  00:11         4,168,209 ckpt_4.data-00000-of-00001
2021/04/02  00:11             1,654 c

In [22]:
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

eval_loss, eval_acc = model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

Eval loss: 0.03994723781943321, Eval Accuracy: 0.9854999780654907


In [30]:
%ls -sh logs\train

 驱动器 D 中的卷是 Data

找不到文件



 卷的序列号是 12E0-D62D

 D:\miniconda\workspace\TensorFlow2 的目录


 D:\miniconda\workspace\TensorFlow2\logs\train 的目录

2021/04/02  00:11    <DIR>          .
2021/04/02  00:11    <DIR>          ..
2021/04/02  00:11            50,706 events.out.tfevents.1617293469.LAPTOP-4SAJKMHC.7032.140334.v2
2021/04/02  00:11                40 events.out.tfevents.1617293473.LAPTOP-4SAJKMHC.profile-empty
2021/04/02  00:11    <DIR>          plugins
               2 个文件         50,746 字节
               3 个目录 319,329,013,760 可用字节


## 8. Export to SavedModel

In [31]:
path = './saved_model/'

In [32]:
model.save(path, save_format='tf')

INFO:tensorflow:Assets written to: ./saved_model/assets


INFO:tensorflow:Assets written to: ./saved_model/assets


In [33]:
# Load the model without strategy.scope

unreplicated_model = tf.keras.models.load_model(path)

unreplicated_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer=tf.keras.optimizers.Adam(),
    metrics=['accuracy'])

eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

Eval loss: 0.03994723781943321, Eval Accuracy: 0.9854999780654907


In [34]:
# Load the model with strategy.scope

with strategy.scope():
  replicated_model = tf.keras.models.load_model(path)
  replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                           optimizer=tf.keras.optimizers.Adam(),
                           metrics=['accuracy'])

  eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
  print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

Eval loss: 0.03994723781943321, Eval Accuracy: 0.9854999780654907


### i. Examples and Tutorials

## 9. Next steps

# reference

### https://tensorflow.google.cn/tutorials/distribute/keras