In [1]:
from tensorflow import keras
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time

In [2]:
# 打开显示
tf.debugging.set_log_device_placement(False)

In [3]:
# 显示电脑上的物理GPU
gpus = tf.config.experimental.list_physical_devices('GPU')

In [4]:
gpus

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [5]:
# 需要在GPU没有初始化之前执行. 
tf.config.experimental.set_virtual_device_configuration(
    gpus[0],
    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048),
    tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048),
    tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048)]
)

In [6]:
# 逻辑GPU
logical_gpus = tf.config.experimental.list_logical_devices('GPU')

In [7]:
fashion_mnist = keras.datasets.fashion_mnist
(x_train_all, y_train_all), (x_test, y_test) = fashion_mnist.load_data()
x_valid, x_train = x_train_all[:5000], x_train_all[5000:]
y_valid, y_train = y_train_all[:5000], y_train_all[5000:]

print(x_valid.shape, y_valid.shape)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)

(5000, 28, 28) (5000,)
(55000, 28, 28) (55000,)
(10000, 28, 28) (10000,)


In [8]:
x_train.astype(np.float32).reshape(-1, 784).shape

(55000, 784)

In [9]:
# 标准化
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train.astype(np.float32).reshape(55000, -1))
x_valid_scaled = scaler.transform(x_valid.astype(np.float32).reshape(5000, -1))

x_test_scaled = scaler.transform(x_test.astype(np.float32).reshape(10000, -1))

In [10]:
def make_dataset(data, target, epochs, batch_size, shuffle=True):
    dataset = tf.data.Dataset.from_tensor_slices((data, target))
    if shuffle:
        dataset = dataset.shuffle(10000)
    dataset = dataset.repeat(epochs).batch(batch_size).prefetch(50)
    return dataset

# 产生分布式的dataset
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
    batch_size_per_replica = 32
    batch_size = batch_size_per_replica * len(logical_gpus)
    train_dataset = make_dataset(x_train_scaled, y_train, 1, batch_size)
    valid_dataset = make_dataset(x_valid_scaled, y_valid, 1, batch_size)
    # 把一个dataset变成分布式dataset
    train_dataset_distribute = strategy.experimental_distribute_dataset(train_dataset)
    valid_dataset_distribute = strategy.experimental_distribute_dataset(valid_dataset)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2')


In [11]:
with strategy.scope():
    model = keras.models.Sequential()
    model.add(keras.layers.Dense(512, activation='relu', input_shape=(784,)))
    model.add(keras.layers.Dense(256, activation='relu'))
    model.add(keras.layers.Dense(10, activation='softmax'))

    model.compile(loss='sparse_categorical_crossentropy',
                 optimizer='adam',
                 metrics=['accuracy'])

INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


In [12]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 512)               401920    
_________________________________________________________________
dense_1 (Dense)              (None, 256)               131328    
_________________________________________________________________
dense_2 (Dense)              (None, 10)                2570      
Total params: 535,818
Trainable params: 535,818
Non-trainable params: 0
_________________________________________________________________


In [13]:
eval_dataset = make_dataset(x_valid_scaled, y_valid, epochs=1, batch_size=32, shuffle=False)

In [14]:
# 自定义训练过程
with strategy.scope():
    loss_func = keras.losses.SparseCategoricalCrossentropy(
        reduction=keras.losses.Reduction.NONE)
    def compute_loss(labels, predictions):
        per_relica_loss = loss_func(labels, predictions)
        return tf.nn.compute_average_loss(per_relica_loss, global_batch_size=batch_size)


    test_loss = keras.metrics.Mean(name='test_loss')

    train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
    test_accuracy = keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

    optimizer = keras.optimizers.Adam()

    def train_step(inputs):
        images, labels = inputs
        with tf.GradientTape() as tape:
            predictions = model(images, training=True)
            loss = loss_func(labels, predictions)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        train_accuracy.update_state(labels, predictions)
        return loss
    
    # 分布式训练
    @tf.function
    def distributed_train_step(inputs):
        per_replica_average_loss = strategy.run(train_step, 
                                                args=(inputs, ))
        return strategy.reduce(tf.distribute.ReduceOp.SUM, 
                               per_replica_average_loss, 
                               axis=None)
        

    def test_step(inputs):
        images, labels = inputs
        predictions = model(images)
        t_loss = loss_func(labels, predictions)
        test_loss.update_state(t_loss)
        test_accuracy.update_state(labels, predictions)
        
    @tf.function 
    def distributed_test_step(inputs):
        strategy.run(test_step, args=(inputs,))
    

INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


In [17]:
#  训练过程
epochs = 10
for epoch in range(epochs):
    total_loss = 0.0
    num_batches = 0
    for x in train_dataset:
        start_time = time.time()
        total_loss += tf.reduce_sum(distributed_train_step(x))
        run_time = time.time() - start_time
        num_batches += 1
        
#         print(total_loss)
        print('\rtotal: %3.3f, num_batches: %d, '
              'average: %3.3f, time: %3.3f'
              % (total_loss, num_batches, total_loss / num_batches, run_time), end='')
        
    train_loss = total_loss / num_batches
    for x in eval_dataset:
        distributed_test_step(x)
        
    print('\rEpoch: %d, Loss: %3.3f, Acc: %3.3f, '
         'Val_Loss: %3.3f, Val_acc:%3.3f' % (epoch + 1, total_loss, train_accuracy.result(),
                                             test_loss.result(), test_accuracy.result()))
    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

Epoch: 1, Loss: 70674.820, Acc: 0.843, Val_Loss: 0.349, Val_acc:0.874
Epoch: 2, Loss: 52547.234, Acc: 0.882, Val_Loss: 0.336, Val_acc:0.879
Epoch: 3, Loss: 46004.730, Acc: 0.896, Val_Loss: 0.361, Val_acc:0.876
Epoch: 4, Loss: 40624.246, Acc: 0.906, Val_Loss: 0.311, Val_acc:0.888
Epoch: 5, Loss: 37430.109, Acc: 0.915, Val_Loss: 0.330, Val_acc:0.884
Epoch: 6, Loss: 33788.613, Acc: 0.923, Val_Loss: 0.351, Val_acc:0.887
Epoch: 7, Loss: 31725.959, Acc: 0.927, Val_Loss: 0.337, Val_acc:0.892
Epoch: 8, Loss: 28040.779, Acc: 0.936, Val_Loss: 0.353, Val_acc:0.892
Epoch: 9, Loss: 26345.023, Acc: 0.939, Val_Loss: 0.366, Val_acc:0.885
Epoch: 10, Loss: 23572.512, Acc: 0.945, Val_Loss: 0.347, Val_acc:0.901


In [None]:
strategy.run()