In [1]:
import matplotlib.pyplot as plt
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time

import tensorflow as tf
from tensorflow import keras

In [2]:
# os.environ['CUDA_VISIBLE_DEVICES'] = "2"

tf.config.set_soft_device_placement(True) # 无需手动指定

gpus = tf.config.experimental.list_physical_devices('GPU')
tf.debugging.set_log_device_placement(True) # 把各个变量分布在哪个gpu上打印出来
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True) # 内存自增长

print(gpus)    
print(len(gpus))
logical_gpus = tf.config.experimental.list_physical_devices('GPU')
print(len(logical_gpus))

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:2', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:3', device_type='GPU')]
4
4


In [3]:
fashion_mnist = keras.datasets.fashion_mnist
(x_train_all,y_train_all),(x_test,y_test) = fashion_mnist.load_data()
x_valid,x_train = x_train_all[:5000],x_train_all[5000:]
y_valid,y_train = y_train_all[:5000],y_train_all[5000:]

In [4]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train.astype(np.float32).reshape(-1,1)).reshape(-1,28,28,1)
x_valid_scaled = scaler.transform(x_valid.astype(np.float32).reshape(-1,1)).reshape(-1,28,28,1)
x_test_scaled = scaler.transform(x_test.astype(np.float32).reshape(-1,1)).reshape(-1,28,28,1)

In [5]:
def make_dataset(images,labels,epochs,batch_size,shuffle=True):
    dataset = tf.data.Dataset.from_tensor_slices((images,labels))
    if shuffle:
        dataset=dataset.shuffle(10000)
    dataset = dataset.repeat(epochs).batch(batch_size).prefetch(50)
    return dataset

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    batch_size_per_replica = 256
    batch_size= batch_size_per_replica*len(logical_gpus)
    
    train_dataset = make_dataset(x_train_scaled,y_train,1,batch_size)
    valid_dataset = make_dataset(x_valid_scaled,y_valid,1,batch_size)
    
    train_dataset_distribute=strategy.experimental_distribute_dataset(train_dataset)
    valid_dataset_distribute=strategy.experimental_distribute_dataset(valid_dataset)
    

Executing op TensorSliceDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op AnonymousRandomSeedGenerator in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op ShuffleDatasetV2 in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op RepeatDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op BatchDatasetV2 in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op PrefetchDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op RebatchDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op AutoShardDataset in device /job:localhost/replica:0/task:0/device:CPU:0


In [6]:
with strategy.scope():

    model=keras.models.Sequential()
    model.add(keras.layers.Conv2D(filters=128,kernel_size=3,padding='same',activation='relu',input_shape=(28,28,1)))
    model.add(keras.layers.Conv2D(filters=128,kernel_size=3,padding='same',activation='relu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))

    model.add(keras.layers.Conv2D(filters=256,kernel_size=3,padding='same',activation='relu'))
    model.add(keras.layers.Conv2D(filters=256,kernel_size=3,padding='same',activation='relu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))

    model.add(keras.layers.Conv2D(filters=512,kernel_size=3,padding='same',activation='relu'))
    model.add(keras.layers.Conv2D(filters=512,kernel_size=3,padding='same',activation='relu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(128,activation='relu'))
    model.add(keras.layers.Dense(10,activation='softmax'))

Executing op RandomUniform in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Sub in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Mul in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Add in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarIsInitializedOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op LogicalNot in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Assert in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op ReadVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarIsInitializedOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op LogicalNot in device /job:localho

In [7]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 28, 28, 128)       1280      
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 28, 28, 128)       147584    
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 14, 14, 128)       0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 14, 14, 256)       295168    
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 14, 14, 256)       590080    
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 7, 7, 256)         0         
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 7, 7, 512)         1

In [None]:
with strategy.scope():
    # batch_size,batch_size/#{gpu}
    loss_func = keras.losses.SparseCategoricalCrossentropy(reduction=keras.losses.Reduction.NONE)
    def compute_loss(labels,predictions):
        per_replica_loss = loss_func(labels,predictions)
        return tf.nn.compute_average_loss(per_replica_loss,global_batch_size=batch_size)
    
    test_func = keras.metrics.Mean(name='test_loss')
    train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
    test_accuracy = keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')
    
    optimizer = keras.optimizers.SGD(lr=0.01)
    
    
#     @tf.function
    def train_step(inputs):
        images,labels=inputs
        with tf.GradientTape() as tape:
            predictions = model(images,training=True)
            loss = compute_loss(labels,predictions)
        gradients = tape.gradient(loss,model.trainable_variables)
        optimizer.apply_gradients(zip(gradients,model.trainable_variables))
        train_accuracy.update_state(labels,predictions)
        return loss
    
    @tf.function
    def distributed_train_step(inputs):
        per_replica_average_loss=strategy.experimental_run_v2(train_step,args=(inputs,))
        return strategy.reduce(tf.distribute.ReduceOp.SUM,per_replica_average_loss,axis=None)
    
    
    
#     @tf.function
    def test_step(inputs):
        images,labels = inputs
        predictions = model(images)
        t_loss = loss_func(labels,predictions)
        test_loss.upate_state(t_loss)
        test_accuracy.update_state(labels,predictions)
        
    @tf.function
    def distributed_test_step(inputs):
        strategy.experimental_run_v2(test_step,args=(inputs,))
        
    
    epochs = 10
    for epoch in range(epochs):
        total_loss=0.0
        num_batches = 0
        for x in train_dataset:
            start_time = time.time()
#             total_loss += train_step(x)
            total_loss += distributed_train_step(x)
            run_time = time.time()-start_time
            num_batches +=1
            print('\rtotal: %3.3f, num_batches:%d, average: %3.3f, time:%3.3f'% (total_loss,num_batches,total_loss/num_batches,run_time),end='')
            train_loss = total_loss/num_batches
            for x in valid_dataset:
#                 test_step(x)
                  distributed_test_step(x)
            print('\rEpoch: %d,Loss:%3.3f, Acc:%3.3f,Val_Loss: %3.3f,Val_Acc:%3.3f'%(epoch+1,train_loss,train_accuracy.result(),test_loss.result(),test_accuracy.result()))
            test_loss.reset_states()
            train_accuracy.reset_states()
            test_accuracy.reset_states()

Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:2
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:3
Executing op OptimizeDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op ModelDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op AnonymousIteratorV2 in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op MakeIterator in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op IteratorGetNextSync in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op ReadVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing 