# MNIST with TensorFlow using MPI through Horovod

*Last edited: 2023-12-27*

Based on: https://jean-zay-doc.readthedocs.io/en/latest/examples/tf/tf_mpi/

This notebook runs on the [SDumont](https://sdumont.lncc.br/support_manual.php) computer on Sequana nodes, using V100 GPU. For more information, [see the Manual](https://sdumont.lncc.br/support_manual.php).

In [30]:
SCRA = ! SCRA=/scratch${HOME#/prj} && echo $SCRA
SCRA = SCRA[0]
%env SCRA {SCRA}
SPWD = ! SPWD=/scratch${PWD#/prj} && echo $SPWD
SPWD = SPWD[0]
%env SPWD {SPWD}
DATA = SCRA + '/data/MNIST/raw'
%env DATA {DATA}

env: SCRA=/scratch/yyyy/xxxx
env: SPWD=/scratch/yyyy/xxxx/horov-mnist
env: DATA=/scratch/yyyy/xxxx/data/MNIST/raw


---

In [36]:
%%writefile {SPWD}/hv-tf1-mnist.py
import argparse
import errno
import gzip
import logging
import os
from os.path import join
import time
import numpy as np
import horovod.tensorflow as hvd
import tensorflow as tf
from tensorflow import keras

logging.disable(logging.WARNING)
logging.getLogger("tensorflow").disabled = True
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

DATA_PATH = join(os.environ["PWD"], "data~", "MNIST",
                 "raw").replace("/prj/", "/scratch/")

filename = [
    ["training_images",
     join(DATA_PATH, "train-images-idx3-ubyte.gz")],
    ["test_images", join(DATA_PATH, "t10k-images-idx3-ubyte.gz")],
    ["training_labels",
     join(DATA_PATH, "train-labels-idx1-ubyte.gz")],
    ["test_labels", join(DATA_PATH, "t10k-labels-idx1-ubyte.gz")],
]

layers = tf.layers

# Training settings
parser = argparse.ArgumentParser(description="Tensorflow MNIST Example")
parser.add_argument("--mnist", help="location of mnist.npz", required=True)
args = parser.parse_args()


def load_mnist():
    mnist = {}
    for name in filename[:2]:
        with gzip.open(name[1], "rb") as f:
            mnist[name[0]] = np.frombuffer(f.read(), np.uint8,
                                           offset=16).reshape(-1, 28 * 28)
    for name in filename[-2:]:
        with gzip.open(name[1], "rb") as f:
            mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=8)
    return mnist


def conv_model(feature, target, mode):
    """2-layer convolution model."""

    # Convert the target to a one-hot tensor of shape (batch_size, 10)
    # and with a on-value of 1 for each one-hot vector of length 10
    target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0)

    # Reshape feature to 4d tensor with 2nd and 3rd dimensions
    # being image width and height final dimension being the
    # number of color channels
    feature = tf.reshape(feature, [-1, 28, 28, 1])

    # First conv layer will compute 32 features for each 5x5 patch
    with tf.variable_scope("conv_layer1"):
        h_conv1 = layers.conv2d(feature,
                                32,
                                kernel_size=[5, 5],
                                activation=tf.nn.relu,
                                padding="SAME")
        h_pool1 = tf.nn.max_pool(h_conv1,
                                 ksize=[1, 2, 2, 1],
                                 strides=[1, 2, 2, 1],
                                 padding="SAME")

    # Second conv layer will compute 64 features for each 5x5 patch
    with tf.variable_scope("conv_layer2"):
        h_conv2 = layers.conv2d(h_pool1,
                                64,
                                kernel_size=[5, 5],
                                activation=tf.nn.relu,
                                padding="SAME")
        h_pool2 = tf.nn.max_pool(h_conv2,
                                 ksize=[1, 2, 2, 1],
                                 strides=[1, 2, 2, 1],
                                 padding="SAME")

        # reshape tensor into a batch of vectors
        h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])

    # Densely connected layer with 1024 neurons
    h_fc1 = layers.dropout(
        layers.dense(h_pool2_flat, 1024, activation=tf.nn.relu),
        rate=0.5,
        training=mode == tf.estimator.ModeKeys.TRAIN,
    )

    # Compute logits (1 per class) and compute loss
    logits = layers.dense(h_fc1, 10, activation=None)
    loss = tf.losses.softmax_cross_entropy(target, logits)

    return tf.argmax(logits, 1), loss


def train_input_generator(x_train, y_train, batch_size=64):
    assert len(x_train) == len(y_train)
    while True:
        p = np.random.permutation(len(x_train))
        x_train, y_train = x_train[p], y_train[p]
        index = 0
        while index <= len(x_train) - batch_size:
            yield x_train[index:index + batch_size], y_train[index:index +
                                                             batch_size],
            index += batch_size


def main(_):
    start = time.time()

    # Horovod: initialize Horovod
    hvd.init()

    # Load MNIST dataset
    dataset = load_mnist()
    x_train = dataset[filename[0][0]].reshape((60000, 28, 28))
    y_train = dataset[filename[2][0]].reshape((60000))
    x_test = dataset[filename[1][0]].reshape((10000, 28, 28))
    y_test = dataset[filename[3][0]].reshape((10000))

    # The shape of downloaded data is (-1, 28, 28), hence we need
    # to reshape it into (-1, 784) to feed into our network. Also,
    # need to normalize the features between 0 and 1
    x_train = np.reshape(x_train, (-1, 784)) / 255.0
    x_test = np.reshape(x_test, (-1, 784)) / 255.0

    # Build model...
    with tf.name_scope("input"):
        image = tf.placeholder(tf.float32, [None, 784], name="image")
        label = tf.placeholder(tf.float32, [None], name="label")
    predict, loss = conv_model(image, label, tf.estimator.ModeKeys.TRAIN)

    # Horovod: adjust learning rate based on lr_scaler
    lr_scaler = hvd.size()
    opt = tf.train.AdamOptimizer(0.001 * lr_scaler)

    # Horovod: add Horovod Distributed Optimizer
    opt = hvd.DistributedOptimizer(opt)

    global_step = tf.train.get_or_create_global_step()
    train_op = opt.minimize(loss, global_step=global_step)

    print("Mumber of GPUs:", hvd.size())
    hooks = [

        # Horovod: BroadcastGlobalVariablesHook broadcasts initial
        # variable states from rank 0 to all other processes. This is
        # necessary to ensure consistent initialization of all workers
        # when training is started with random weights or restored
        # from a checkpoint
        hvd.BroadcastGlobalVariablesHook(0),

        # Horovod: adjust number of steps based on number of GPUs.
        tf.train.StopAtStepHook(last_step=20000 // hvd.size()),
        tf.train.LoggingTensorHook(tensors={
            "step": global_step,
            "loss": loss
        },
                                   every_n_iter=10),
    ]

    # Horovod: pin GPU to be used to process local rank (one GPU per process)
    print("Hvd local rank(one GPU per process):", hvd.local_rank())
    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    config.gpu_options.visible_device_list = str(hvd.local_rank())

    # Horovod: save checkpoints only on worker 0 to prevent other
    # workers from corrupting them
    print("Hdv rank:", hvd.rank())
    checkpoint_dir = "./checkpoints~" if hvd.rank() == 0 else None
    training_batch_generator = train_input_generator(x_train,
                                                     y_train,
                                                     batch_size=100)

    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing
    # when done or an error occurs
    with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                           hooks=hooks,
                                           config=config) as mon_sess:
        while not mon_sess.should_stop():

            # Run a training step synchronously.
            image_, label_ = next(training_batch_generator)
            mon_sess.run(train_op, feed_dict={image: image_, label: label_})

    if hvd.rank() == 0:
        print("Elapsed time: ", time.time() - start)


if __name__ == "__main__":
    tf.app.run()

Overwriting /scratch/yyyy/xxxx/horov-mnist/hv-tf1-mnist.py


---

Creates the working directory if it does not exist. The output file will be written to this directory:

In [2]:
! mkdir -p /scratch${PWD#/prj}

In [47]:
%%writefile hv-tf1-mnist.srm
#!/bin/bash
#SBATCH -p sequana_gpu_shared       # SLURM_JOB_PARTITION
#SBATCH --job-name=hv-tf1-mnist     # SLURM_JOB_NAME
#SBATCH --nodes=1                   # SLURM_JOB_NUM_NODES
#SBATCH --ntasks-per-node=1         # SLURM_NTASKS_PER_NODE
#SBATCH --cpus-per-task=1           # SLURM_CPUS_PER_TASK
#SBATCH --distribution=block:cyclic # SLURM_DISTRIBUTION
#SBATCH --time=00:20:00             # Limit execution time
# SLURM output environment variables:
# https://slurm.schedmd.com/sbatch.html#SECTION_OUTPUT-ENVIRONMENT-VARIABLES

echo '======================================='
echo '- Job ID:' $SLURM_JOB_ID
echo '- Nº of nodes in the job:' $SLURM_JOB_NUM_NODES
echo '- Nº of tasks per node:' $SLURM_NTASKS_PER_NODE
echo '- Nº of tasks:' $SLURM_NTASKS
echo '- Nº of cpus per task:' $SLURM_CPUS_PER_TASK
echo '- Partition:' $SLURM_JOB_PARTITION
echo '- Dir from which sbatch was invoked:' ${SLURM_SUBMIT_DIR##*/}
echo -n '- Nodes allocated to the job: '
nodeset -e $SLURM_JOB_NODELIST
echo '----------------------------------------'

cd $SLURM_SUBMIT_DIR

# Modules
module purge
module load sequana/current
module load cuda/10.0_sequana cudnn/7.6_cuda-10.0_sequana
module load openmpi/gnu/4.0.1_sequana

# Environment
DATA=data~/MNIST/raw
SPWD=/scratch${PWD#/prj}
SCRA=/scratch${HOME#/prj}
source $SCRA/miniconda3/bin/activate
conda activate tfh01 &>/dev/null

# Executable
PRMT="--mnist $SCRA/$DATA/mnist.npz"
EXEC="python $SPWD/hv-tf1-mnist.py"
OPTI="--mpi=pmi2 --cpu_bind=cores"

# Run
echo -n '<1. starting python script > ' && date
echo '-- output -----------------------------'
srun $OPTI $EXEC $PRMT
echo '-- end --------------------------------'
echo -n '<2. quit>                    ' && date

Overwriting hv-tf1-mnist.srm


In [32]:
def runtest(batch, nodes, tasks):
    import time
    sub = ! sbatch --nodes={nodes} --ntasks-per-node={tasks} {batch}
    print(sub[0], end='.')
    job = sub[0].replace('Submitted batch job ', '')
    c = [job]
    while job in c:
        time.sleep(20)
        print(end='.')
        c = ! squeue --job {job} --noheader --format "%i"
    print('')
    %cat /scratch${{PWD#/prj}}/slurm-{job}.out

In [45]:
runtest('hv-tf1-mnist.srm', 1, 2)

Submitted batch job 11109057...............................
- Job ID: 11109057
- Nº of nodes in the job: 1
- Nº of tasks per node: 2
- Nº of tasks: 2
- Nº of cpus per task: 1
- Partition: sequana_gpu_shared
- Dir from which sbatch was invoked: horov-mnist
- Nodes allocated to the job: sdumont8038
----------------------------------------
Loading SEQUANA Software environment
<1. starting python script > Qua Dez 27 22:27:21 -03 2023
-- output -----------------------------
2023-12-27 22:30:24.989571: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-27 22:30:24.989614: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-27 22:30:33.917253: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 AVX512F FMA
2023-12-27 22:30:33.917253: I tensorflow/core/platfo

In [25]:
! sacct -o JobID,JobName,AllocCPUS,Elapsed -j 11109057

       JobID    JobName  AllocCPUS    Elapsed 
------------ ---------- ---------- ---------- 
11109057     hv-tf1-mn+          2   00:09:42 
11109057.ba+      batch          2   00:09:42 
11109057.0       python          2   00:09:18 


In [41]:
runtest('hv-tf1-mnist.srm', 1, 4)

Submitted batch job 11110682..........................
- Job ID: 11110682
- Nº of nodes in the job: 1
- Nº of tasks per node: 4
- Nº of tasks: 4
- Nº of cpus per task: 1
- Partition: sequana_gpu_shared
- Dir from which sbatch was invoked: horov-mnist
- Nodes allocated to the job: sdumont8033
----------------------------------------
Loading SEQUANA Software environment
<1. starting python script > Qui Dez 28 12:18:44 -03 2023
-- output -----------------------------
2023-12-28 12:22:31.107821: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-28 12:22:31.107759: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-28 12:22:31.107762: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-28 12:22:31.107791: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] S

In [42]:
! sacct -o JobID,AllocCPUS,Elapsed -j 11110682.0

       JobID  AllocCPUS    Elapsed 
------------ ---------- ---------- 
11110682.0            4   00:07:36 


In [48]:
runtest('hv-tf1-mnist.srm', 2, 4)

Submitted batch job 11111009.............................
- Job ID: 11111009
- Nº of nodes in the job: 2
- Nº of tasks per node: 4
- Nº of tasks: 8
- Nº of cpus per task: 1
- Partition: sequana_gpu_shared
- Dir from which sbatch was invoked: horov-mnist
- Nodes allocated to the job: sdumont8037 sdumont8038
----------------------------------------
Loading SEQUANA Software environment
<1. starting python script > Qui Dez 28 13:49:56 -03 2023
-- output -----------------------------
2023-12-28 13:54:14.777233: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-28 13:54:14.777233: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-28 13:54:14.777233: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-28 13:54:14.777233: I tensorflow/stream_executor/platform/default/dso_

In [49]:
! sacct -o JobID,AllocCPUS,Elapsed -j 11111009.0

       JobID  AllocCPUS    Elapsed 
------------ ---------- ---------- 
11111009.0            8   00:08:26 


- This job will run 32 MPI process on 8 nodes, each node will host 4 MPI process, each one pinned on a GPU.

In [50]:
runtest('hv-tf1-mnist.srm', 8, 4)

Submitted batch job 11111179..........................................................................................................................................................................................................................................................................

KeyboardInterrupt: 

In [63]:
! squeue -j 11111179 --start

             JOBID PARTITION     NAME     USER ST          START_TIME  NODES SCHEDNODES           NODELIST(REASON)
          11111179 sequana_g hv-tf1-m eduardo. PD 2023-12-28T17:19:36      8 sdumont[8033,8036-80 (Resources)


In [69]:
! sacct -o JobID,AllocCPUS,Elapsed -j 11111179.0

       JobID  AllocCPUS    Elapsed 
------------ ---------- ---------- 
11111179.0           32   00:05:24 


In [71]:
! cat /scratch${PWD#/prj}/slurm-11111179.out

- Job ID: 11111179
- Nº of nodes in the job: 8
- Nº of tasks per node: 4
- Nº of tasks: 32
- Nº of cpus per task: 1
- Partition: sequana_gpu_shared
- Dir from which sbatch was invoked: horov-mnist
- Nodes allocated to the job: sdumont8029 sdumont8030 sdumont8037 sdumont8038 sdumont8039 sdumont8040 sdumont8041 sdumont8042
----------------------------------------
Loading SEQUANA Software environment
<1. starting python script > Qui Dez 28 17:20:19 -03 2023
-- output -----------------------------
2023-12-28 17:23:12.723306: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-28 17:23:12.739354: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-28 17:23:12.723347: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.0
2023-12-28 17:23:12.723342: I tensorflow/stream_executor/platfo

---

In [24]:
! nvidia-smi

Sat Dec 23 16:39:42 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.82.01    Driver Version: 470.82.01    CUDA Version: 11.4     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla V100-PCIE...  Off  | 00000000:3B:00.0 Off |                    0 |
| N/A   38C    P0    27W / 250W |    106MiB / 32510MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla V100-PCIE...  Off  | 00000000:5E:00.0 Off |                    0 |
| N/A   30C    P0    25W / 250W |     15MiB / 32510MiB |      0%      Default |
|       