## 1. 导入模块

In [1]:
%matplotlib inline
import numpy as np
import pandas as pd
import sklearn
import matplotlib as mpl
import matplotlib.pyplot as plt

from tensorflow import keras
import tensorflow as tf

import sys
import os
import time
import datetime

for module in [np, pd, sklearn, mpl, keras, tf]:
    print(module.__name__, module.__version__)

numpy 1.18.1
pandas 0.25.3
sklearn 0.22.1
matplotlib 3.1.2
tensorflow_core.python.keras.api._v2.keras 2.2.4-tf
tensorflow 2.1.0


## 2. GPU配置策略

In [2]:
# tf.debugging.set_log_device_placement(True)
tf.config.set_soft_device_placement(True)

gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

print("Physical GPU: {}".format(len(gpus)))

logical_gpus = tf.config.experimental.list_logical_devices("GPU")
print("Logical GPU: {}".format(len(logical_gpus)))

Physical GPU: 1
Logical GPU: 1


## 3. 在一个gpu训练

  - ### 3.1 获取fashion mnist数据(分布式策略)

In [3]:
# 取出fashion mnist 数据集
fashion_mnist = keras.datasets.fashion_mnist
(x_train_all, y_train_all), (x_test, y_test) = fashion_mnist.load_data()

x_valid, x_train = x_train_all[:5000], x_train_all[5000:]
y_valid, y_train = y_train_all[:5000], y_train_all[5000:]


# 标准化
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()

x_train_scaled = scaler.fit_transform(x_train.astype(np.float32).reshape(-1, 1)).reshape(-1, 28, 28, 1)
x_valid_scaled = scaler.transform(x_valid.astype(np.float32).reshape(-1, 1)).reshape(-1, 28, 28, 1)
x_test_scaled = scaler.transform(x_test.astype(np.float32).reshape(-1, 1)).reshape(-1, 28, 28, 1)

print(x_train_scaled.shape, y_train.shape)
print(x_valid_scaled.shape, y_valid.shape)
print(x_test_scaled.shape, y_test.shape)


# 制作dataset 数据集
def make_dataset(images, labels, epochs, batch_size, shuffle=True):
    dataset = tf.data.Dataset.from_tensor_slices((images, labels))
    if shuffle:
        dataset = dataset.shuffle(10000)
    dataset = dataset.repeat(epochs).batch(batch_size).prefetch(50) # prefetch 先取出50个样本准备
    return dataset


strategy = tf.distribute.MirroredStrategy()

# 数据加分布式，因为要把数据平均分到各个GPU
with strategy.scope():
    batch_size_per_replica = 256
    batch_size = batch_size_per_replica * len(logical_gpus)
    
    train_dataset = make_dataset(x_train_scaled, y_train, 1, batch_size)
    valid_dataset = make_dataset(x_valid_scaled, y_valid, 1, batch_size)
    
    train_dataset_distribute = strategy.experimental_distribute_dataset(train_dataset)
    valid_dataset_distribute = strategy.experimental_distribute_dataset(valid_dataset)

(55000, 28, 28, 1) (55000,)
(5000, 28, 28, 1) (5000,)
(10000, 28, 28, 1) (10000,)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


  - ### 3.2 模型定义（分布式策略）

In [4]:
# 模型要加分布式
with strategy.scope():
    model = keras.models.Sequential([
        keras.layers.Conv2D(filters=128, kernel_size=3, padding="same", activation="relu", input_shape=(28, 28, 1)),
        keras.layers.Conv2D(filters=128, kernel_size=3, padding="same", activation="relu"),
        keras.layers.MaxPool2D(pool_size=2),

        keras.layers.Conv2D(filters=256, kernel_size=3, padding="same", activation="relu"),
        keras.layers.Conv2D(filters=256, kernel_size=3, padding="same", activation="relu"),
        keras.layers.MaxPool2D(pool_size=2),

        keras.layers.Conv2D(filters=512, kernel_size=3, padding="same", activation="relu"),
        keras.layers.Conv2D(filters=512, kernel_size=3, padding="same", activation="relu"),
        keras.layers.MaxPool2D(pool_size=2),

        keras.layers.Flatten(),
        keras.layers.Dense(512, activation="relu"),
        keras.layers.Dense(10, activation="softmax")
    ])


  - ### 3.3 自定义训练步骤（分布式策略）

In [5]:
# 训练要加分布式
with strategy.scope():
    # 数据是分布在各个GPU ,所以loss_func 不能在每个GPU求平均，要集合到一起求平均
    #  设置： 1。 reduction=keras.losses.Reduction.NONE
    #         2. compute_loss 方法求总的损失平均
    loss_func = keras.losses.SparseCategoricalCrossentropy(reduction=keras.losses.Reduction.NONE)
    
    def compute_loss(labels, predictions):
        loss_per_replica = loss_func(labels, predictions)
        return tf.nn.compute_average_loss(loss_per_replica, global_batch_size=batch_size)
    
    test_loss = keras.metrics.Mean(name="test_loss")
    test_accuracy = keras.metrics.SparseCategoricalAccuracy(name="test_accuracy")
    train_accuracy = keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

    # 定义优化器
    optimizer = keras.optimizers.SGD(lr=0.01)


    def train_step(inputs):
        images, labels = inputs

        # 前向传播
        with tf.GradientTape() as tape:
            predictions = model(images, training=True)
            loss = compute_loss(labels, predictions)  # 记得改

        # 反向传播
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        # 计算度量
        train_accuracy.update_state(labels, predictions)

        return loss
    
    # 分布式策略
    @tf.function
    def distributed_train_step(inputs):
        average_loss_per_replica = strategy.experimental_run_v2(train_step, args=(inputs,))
        return strategy.reduce(tf.distribute.ReduceOp.SUM, average_loss_per_replica, axis=None)


    def test_step(inputs):
        images, labels = inputs
        predictions = model(images, training=False)
        t_loss = loss_func(labels, predictions)
        test_loss.update_state(t_loss)
        test_accuracy.update_state(labels, predictions)
    
    # 加分布式策略
    @tf.function
    def distribated_test_step(inputs):
        strategy.experimental_run_v2(test_step, args=(inputs, ))

    # 遍历数据训练
    epochs = 10
    for epoch in range(epochs):
        total_loss = 0.0
        num_batches = 0

        # 训练
        for x in train_dataset_distribute: # 数据是分布式的
            start_time = time.time()
            total_loss += distributed_train_step(x)
            run_time = time.time() - start_time
            num_batches += 1
            print("\rtotal: {:.4f}, num: {}, average: {:.4f}, acc: {:.2f}, run_time: {:.4f}".format(
                total_loss, num_batches, total_loss/num_batches, train_accuracy.result(), run_time), end="")

        train_loss = total_loss/num_batches
        
        # 测试
        for x in valid_dataset_distribute: # 数据是分布式的
            distribated_test_step(x)

        print("\rEpoch: {}, Loss:{:.4f}, train_acc: {:.3f} test_loss: {:.4f}, test_acc: {:.3f}".format(
            epoch+1, train_loss, train_accuracy.result(), test_loss.result(),test_accuracy.result()))

        # 清空度量，记得写
        test_loss.reset_states()
        train_accuracy.reset_states()
        test_accuracy.reset_states()

INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
total: 2.3035, num: 1, average: 2.3035, acc: 0.10, run_time: 2.7735INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
total: 4.6053, num: 2, average: 2.3026, acc: 0.12, run_time: 0.0573INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
total: 6.9085, num: 3, ave