In [1]:
import tensorflow as tf
import numpy as np
import os

print(tf.__version__)
print(np.__version__)

2.1.0
1.19.2


In [2]:
strategy = tf.distribute.MirroredStrategy(cross_device_ops = tf.distribute.HierarchicalCopyAllReduce()) # kalau misalnya punya gpu yang berbeda tipe

print("Number of devices {} ".format(strategy.num_replicas_in_sync))

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Number of devices 1 


In [3]:
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images,train_labes), (test_images, test_labels) = fashion_mnist.load_data()
print(train_images.shape)

(60000, 28, 28)


In [4]:
train_images = train_images[...,None]
test_images = test_images[..., None]
print(train_images.shape)
train_images = train_images / np.float(255.0)
test_images = test_images / np.float(255.0)

BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

train_dataset = tf.data.Dataset.from_tensor_slices((train_images,train_labes)).shuffle(buffer_size=BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images,test_labels)).batch(GLOBAL_BATCH_SIZE)


(60000, 28, 28, 1)


In [5]:
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

In [6]:
%whos

Variable                 Type                  Data/Info
--------------------------------------------------------
BATCH_SIZE_PER_REPLICA   int                   64
BUFFER_SIZE              int                   60000
GLOBAL_BATCH_SIZE        int                   64
fashion_mnist            module                <module 'tensorflow_core.<...>hion_mnist\\__init__.py'>
np                       module                <module 'numpy' from 'C:\<...>ges\\numpy\\__init__.py'>
os                       module                <module 'os' from 'C:\\Us<...>DataScience\\lib\\os.py'>
strategy                 MirroredStrategy      <tensorflow.python.distri<...>ct at 0x000001EBA06EABA8>
test_dataset             BatchDataset          <BatchDataset shapes: ((N<...>: (tf.float64, tf.uint8)>
test_dist_dataset        DistributedDataset    <tensorflow.python.distri<...>ct at 0x000001EBA6EE1E10>
test_images              ndarray               10000x28x28x1: 7840000 elems, type `float64`, 62720000 bytes (59.814

In [7]:
def def_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3 , activation= tf.nn.relu),
        tf.keras.layers.MaxPool2D(),
        tf.keras.layers.Conv2D(64, 3, activation = tf.nn.relu),
        tf.keras.layers.MaxPool2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation = tf.nn.relu),
        tf.keras.layers.Dense(10)
    ])
    return model

In [12]:
with strategy.scope():
    loss_obj = tf.keras.losses.SparseCategoricalCrossentropy(from_logits= True,reduction=tf.keras.losses.Reduction.NONE)
    
    def compute_loss(labels, prediction):
        loss_per_example= loss_obj(labels,prediction)
        
        return tf.nn.compute_average_loss(loss_per_example, global_batch_size=GLOBAL_BATCH_SIZE)
    
    test_loss = tf.keras.metrics.Mean(name="test_loss")
    
    train_acc = tf.keras.metrics.SparseCategoricalAccuracy(name="train_acc")
    test_acc = tf.keras.metrics.SparseCategoricalAccuracy(name="test_acc")
    optimizer = tf.keras.optimizers.Adam()
    model = def_model()
        

In [13]:
@tf.function
def distributed_train_step(dataset):
    per_replica_losses = strategy.experimental_run_v2(fn=train_step, args=[dataset])
    return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis = None)
def train_step(inputs):
    images, labels = inputs
    with tf.GradientTape() as tape:
        pred = model(images,training=True)
        loss = compute_loss(labels,pred)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients,model.trainable_variables))
    train_acc.update_state(labels,pred)
    
    return loss
def test_step(inputs):
    images, labels = inputs
    pred = model(images,training=False)
    loss = loss_obj(labels,pred)
    test_acc.update_state(labels,pred)
    test_loss.update_statep(loss)
     
def distributed_test_step(dataset):
    return strategy.experimental_run_v2(fn = test_step,args=[dataset])

    

In [10]:
print(train_dist_dataset.element_spec)


(TensorSpec(shape=(None, 28, 28, 1), dtype=tf.float64, name=None), TensorSpec(shape=(None,), dtype=tf.uint8, name=None))


In [14]:
EPOCHS = 10
for epoch in range(EPOCHS):
    total_loss = 0.0
    
    print(epoch)
    num_batches = 0
    for batch in train_dist_dataset:
        total_loss += distributed_train_step(batch)
        num_batches += 1
    train_loss = total_loss/num_batches
    print("Epoch : %d Loss : %.2f acc : %.2f" % (epoch,train_loss,train_acc.result()))
    train_acc.reset_states()
         

0


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Epoch : 0 Loss : 0.51 acc : 0.82
1
Epoch : 1 Loss : 0.33 acc : 0.88
2
Epoch : 2 Loss : 0.29 acc : 0.89
3
Epoch : 3 Loss : 0.26 acc : 0.90
4
Epoch : 4 Loss : 0.24 acc : 0.91
5
Epoch : 5 Loss : 0.22 acc : 0.92
6
Epoch : 6 Loss : 0.20 acc : 0.93
7
Epoch : 7 Loss : 0.19 acc : 0.93
8
Epoch : 8 Loss : 0.17 acc : 0.94
9
Epoch : 9 Loss : 0.16 acc : 0.94
