# Intro to Notebook
This notebook was made so that I can learn how to distribute learning among multiple GPUs. My local machine only has one GPU. My GPU is an Nvidia GTX 960 4GB.

# Start to Learn
Following the tutorial on tensorflow website (with some minor changes to variable names and directory structure): 
https://www.tensorflow.org/tutorials/distribute/keras

### Import Dependences and Download Dataset

In [1]:
import tensorflow as tf
import tensorflow_datasets as tfds
tfds.disable_progress_bar()
import os

tf.__version__

'2.3.0'

In [2]:
datasets, info = tfds.load(name="mnist", with_info=True, as_supervised=True) # with_info includes the metadata of dataset
mnist_train, mnist_test = datasets['train'], datasets['test']

In [3]:
info

tfds.core.DatasetInfo(
    name='mnist',
    version=3.0.1,
    description='The MNIST database of handwritten digits.',
    homepage='http://yann.lecun.com/exdb/mnist/',
    features=FeaturesDict({
        'image': Image(shape=(28, 28, 1), dtype=tf.uint8),
        'label': ClassLabel(shape=(), dtype=tf.int64, num_classes=10),
    }),
    total_num_examples=70000,
    splits={
        'test': 10000,
        'train': 60000,
    },
    supervised_keys=('image', 'label'),
    citation="""@article{lecun2010mnist,
      title={MNIST handwritten digit database},
      author={LeCun, Yann and Cortes, Corinna and Burges, CJ},
      journal={ATT Labs [Online]. Available: http://yann.lecun.com/exdb/mnist},
      volume={2},
      year={2010}
    }""",
    redistribution_info=,
)

### Define Distribution Strategy
Using MirroredStrategy: https://www.tensorflow.org/api_docs/python/tf/distribute/MirroredStrategy#attributes_1
Other Useful resources: 
https://keras.io/guides/distributed_training/
https://towardsdatascience.com/distributed-training-in-tf-keras-with-w-b-ccf021f9322e

Used with one machine that has multiple GPUs. Each GPU has a copy of the model (replica). The global batch is distributed (if possible, equally) among the available GPUs into local batches. Each GPU independently runs a forward pass on its own local batch and then runs a backwards pass, calculating gradients with respect to its local batch. Finally, the weights of each replica are updated at the same time with respect to all local gradients. 

In [4]:
strategy = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


In [5]:
print("Num of devices: {}".format(strategy.num_replicas_in_sync))

Num of devices: 1


As expected on my own machine, only one GPU is discovered

### Setup Input Pipeline
Setting up how to distribute data among the GPUs

In [6]:
# Get the number of train and test examples (can see in the output of "info")
num_train_exs = info.splits["train"].num_examples
num_test_exs = info.splits["test"].num_examples

BUFFER_SIZE = 10000 # TODO: figure out what this is for

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync # The global batch size

In [7]:
BATCH_SIZE # since only have one GPU, BATCH_SIZE = BATCH_SIZE_PER_REPLICA

64

In [8]:
def scale(image, label):
    print(type(image))
    image = tf.cast(image, tf.float32) # cast image to numpy array of float32
    print(type(image))
    image /= 255 # normalize pixel values to have a range of [0,1]
    
    return image, label

Prepare train and test sets for training and evaluation. <br>
- Train: 
    - scale the images as defined by the function scale above
    - cache the images for improved performance (reduce read from disc)
    - shuffle data
    - put in batches
- Dev:
    - scale images
    - put in batches

In [9]:
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE) # cache for improved performance (TODO: check: I believe this only works if have enough memory on GPU)
dev_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

<class 'tensorflow.python.framework.ops.Tensor'>
<class 'tensorflow.python.framework.ops.Tensor'>
<class 'tensorflow.python.framework.ops.Tensor'>
<class 'tensorflow.python.framework.ops.Tensor'>


### Create and Compile Model

In [10]:
with strategy.scope(): # Must define model in here in order create MirroredVariables
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation="relu", input_shape=(28,28,1)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(), # flatten so we can then apply a dense layer
        tf.keras.layers.Dense(64, activation="relu"),
        tf.keras.layers.Dense(10)
    ])
    
    opt = tf.keras.optimizers.Adam()
    
    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                 optimizer=opt,
                 metrics=["accuracy"])

In [11]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 26, 26, 32)        320       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 13, 13, 32)        0         
_________________________________________________________________
flatten (Flatten)            (None, 5408)              0         
_________________________________________________________________
dense (Dense)                (None, 64)                346176    
_________________________________________________________________
dense_1 (Dense)              (None, 10)                650       
Total params: 347,146
Trainable params: 347,146
Non-trainable params: 0
_________________________________________________________________


### Callbacks and Training

In [12]:
checkpoint_dir = './start_to_learn/training_checkpoints' # directory to save training checklpoints
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}") # name the files of the checkpoints

In [13]:
# Learning Rate Scheduler: change the learning rate with the epoch
def decay(epoch):
    if epoch < 3:
        return 1e-3
    elif epoch >= 3 and epoch < 7:
        return 1e-4
    else:
        return 1e-5

In [14]:
# The Callback to print the learning rate after each epoch
class PrintLR(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        print('\nLearning rate for epoch {} is {}'.format(epoch + 1, model.optimizer.lr.numpy()))

In [15]:
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./start_to_learn/logs'), # TensorBoard logs to evaluate model
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, save_weights_only=True), #Save model checkpoints
    tf.keras.callbacks.LearningRateScheduler(decay), # Update Learning Rate
    PrintLR() # Print Learning Rate
]

In [16]:
# TODO: check warning about broadcast: why is it saying 'CPU'
model.fit(train_dataset, epochs=12, callbacks=callbacks)

Epoch 1/12
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.


Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


Instructions for updating:
use `tf.profiler.experimental.stop` instead.


Instructions for updating:
use `tf.profiler.experimental.stop` instead.






Learning rate for epoch 1 is 0.0010000000474974513
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


Epoch 2/12
Learning rate for epoch 2 is 0.0010000000474974513
Epoch 3/12
Learning rate for epoch 3 is 0.0010000000474974513
Epoch 4/12
Learning rate for epoch 4 is 9.999999747378752e-05
Epoch 5/12
Learning rate for epoch 5 is 9.999999747378752e-05
Epoch 6/12
Learning rate for epoch 6 is 9.999999747378752e-05
Epoch 7/12
Learning rate for epoch 7 is 9.999999747378752e-05
Epoch 8/12
Learning rate for epoch 8 is 9.999999747378752e-06
Epoch 9/12
Learning rate for epoch 9 is 9.999999747378752e-06
Epoch 10/12
Learning rate for epoch 10 is 9.999999747378752e-06
Epoch 11/12
Learning rate for epoch 11 is 9.999999747378752e-06
Epoch 12/12
Learning rate for epoch 12 is 9.999999747378752e-06


<tensorflow.python.keras.callbacks.History at 0x1d441a76c18>

In [17]:
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
dev_loss, dev_acc = model.evaluate(dev_dataset)
print("Dev Loss: {}, Dev Acc: {}".format(dev_loss, dev_acc))

Dev Loss: 0.039630163460969925, Dev Acc: 0.9861999750137329


### Export Model

In [18]:
model.save('start_to_learn/saved_model/', save_format="tf") # don't need to add save_format="tf" that's the default

Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


INFO:tensorflow:Assets written to: start_to_learn/saved_model/assets


INFO:tensorflow:Assets written to: start_to_learn/saved_model/assets


Now let's load in the model. This can be done either with or without scope.

In [19]:
# Without Scope
unreplicated_model = tf.keras.models.load_model('start_to_learn/saved_model/')
unreplicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                 optimizer=tf.keras.optimizers.Adam(),
                 metrics=["accuracy"]
)

dev_loss, def_acc = unreplicated_model.evaluate(dev_dataset)
print("Dev loss: {}, Dev Accuracy: {}".format(dev_loss, dev_acc))

Dev loss: 0.039630163460969925, Dev Accuracy: 0.9861999750137329


In [20]:
# With Scope
with strategy.scope():
    replicated_model = tf.keras.models.load_model('start_to_learn/saved_model/')
    replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                     optimizer=tf.keras.optimizers.Adam(),
                     metrics=["accuracy"]
    )
    dev_loss, def_acc = replicated_model.evaluate(dev_dataset)
    print("Dev loss: {}, Dev Accuracy: {}".format(dev_loss, dev_acc))

Dev loss: 0.039630163460969925, Dev Accuracy: 0.9861999750137329
