##TensorFlow中的分布式培训

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


In [3]:
import os
os.chdir("/content/drive/My Drive/data")
!pwd

/content/drive/My Drive/data


##概观
该[tf.distribute.StrategyAPI](https://www.tensorflow.org/api_docs/python/tf/distribute/Strategy)提供了多个处理单元分配你的训练的抽象。目标是允许用户使用现有模型和培训代码启用分布式培训，只需进行最少的更改。

本教程使用[tf.distribute.MirroredStrategy](https://www.tensorflow.org/api_docs/python/tf/distribute/MirroredStrategy)，它在一台计算机上的许多GPU上进行同步培训的图形内复制。实质上，它将所有模型的变量复制到每个处理器。然后，它使用[all-reduce](http://mpitutorial.com/tutorials/mpi-reduce-and-allreduce/)来组合来自所有处理器的渐变，并将组合值应用于模型的所有副本。

`MirroredStategy`是TensorFlow核心中可用的几种分发策略之一。您可以在[分销策略指南中](https://www.tensorflow.org/guide/distribute_strategy)阅读更多策略。

###Keras API
此示例使用[tf.kerasAPI](https://www.tensorflow.org/api_docs/python/tf/keras)来构建模型和训练循环。有关自定义训练循环，请参阅[本教程](https://www.tensorflow.org/tutorials/distribute/training_loops)。

##导入依赖项

In [0]:
from __future__ import absolute_import, division, print_function, unicode_literals


In [0]:
#Import TensorFlow
!pip install -q tf-nightly-gpu
import tensorflow as tf
import tensorflow_datasets as tfds

import os

[K     |████████████████████████████████| 411.0MB 50kB/s 


###下载数据集
下载MNIST数据集并从[TensorFlow数据集](https://www.tensorflow.org/datasets)加载它。这将返回[tf.data格式](https://www.tensorflow.org/api_docs/python/tf/data)的数据集。

设置`with_info`为`True`包括整个数据集的元数据，该数据集将保存到此处`ds_info`。除此之外，该元数据对象包括列车和测试示例的数量。

In [0]:
datasets, ds_info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']

##定义分销策略
创建一个MirroredStrategy对象。这将处理分发，并提供一个上下文管理器（[tf.distribute.MirroredStrategy.scope](https://www.tensorflow.org/api_docs/python/tf/distribute/MirroredStrategy#scope)）来构建你的模型。

In [0]:
strategy = tf.distribute.MirroredStrategy()

In [53]:
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

Number of devices: 1


##设置输入管道
如果在多个GPU上训练模型，则应相应地增加批量大小，以便有效地利用额外的计算能力。此外，应相应调整学习率。

In [0]:
# You can also do ds_info.splits.total_num_examples to get the total
# number of examples in the dataset.

num_train_examples = ds_info.splits['train'].num_examples
num_test_examples = ds_info.splits['test'].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

像素值（0-255）必须标准化为0-1范围。在函数中定义此比例。

In [0]:
def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255
  
  return image, label

将此功能应用于培训和测试数据，随机播放培训数据，并将其[批量进行培训](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#batch)。

In [0]:
train_dataset = mnist_train.map(scale).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

##创建模型
在上下文中创建和编译Keras模型strategy.scope。

In [57]:
with strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])

  model.compile(loss='sparse_categorical_crossentropy',
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])

RuntimeError: ignored

###定义回调。
这里使用的回调是：

  - Tensorboard：此回调为Tensorboard写入日志，允许您可视化图形。
  - 模型检查点：此回调在每个纪元后保存模型。
  - 学习速率调度程序：使用此回调，您可以安排学习速率在每个纪元/批次之后更改。
  
为了便于说明，添加打印回调以显示笔记本中的学习速率。

In [0]:
# Define the checkpoint directory to store the checkpoints

checkpoint_dir = './models/training_checkpoints'
#Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

In [0]:
#Functin for decaying the learing rate.
#You can define any decay function you need.
def decay(epoch):
  if epoch <3:
    return 1e-3
  elif epoch >=3 and epoch <7:
    return 1e-4
  else:
    return 1e-5

In [0]:
#Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1,
                                                     tf.keras.backend.get_value(model.optimizer.lr)))

In [0]:
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                      save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

##训练和评估
现在，以通常的方式训练模型，调用`fit`模型并传入在教程开始时创建的数据集。无论您是否分发培训，此步骤都是相同的。

In [0]:
model.fit(train_dataset, epochs=10, callbacks=callbacks)