# Ray TF MNIST

https://docs.ray.io/en/latest/train/examples/tf/tensorflow_mnist_example.html

Using `tensorflow-2.9.0` NERSC kernel.



In [1]:
import sys
import os

In [2]:
#!{sys.executable} -m pip install "ray[air]==2.3.1"

# Start Ray

In [3]:
from ray_utils import start_ray_cluster
ray_cluster = start_ray_cluster()

<> Starting Ray head node


In case of issues, please refer to our known issues: https://docs.nersc.gov/current/
and open a help ticket if your issue is not listed: https://help.nersc.gov/


2023-04-22 21:49:51,277	INFO usage_lib.py:435 -- Usage stats collection is disabled.
2023-04-22 21:49:51,278	INFO scripts.py:710 -- [37mLocal node IP[39m: [1m128.55.64.15[22m
2023-04-22 21:49:53,378	SUCC scripts.py:747 -- [32m--------------------[39m
2023-04-22 21:49:53,378	SUCC scripts.py:748 -- [32mRay runtime started.[39m
2023-04-22 21:49:53,378	SUCC scripts.py:749 -- [32m--------------------[39m
2023-04-22 21:49:53,378	INFO scripts.py:751 -- [36mNext steps[39m
2023-04-22 21:49:53,378	INFO scripts.py:752 -- To connect to this Ray runtime from another node, run
2023-04-22 21:49:53,378	INFO scripts.py:755 -- [1m  ray start --address='128.55.64.15:6379'[22m
2023-04-22 21:49:53,378	INFO scripts.py:771 -- Alternatively, use the following Python code:
2023-04-22 21:49:53,378	INFO scripts.py:773 -- [35mimport[39m[26m ray
2023-04-22 21:49:53,378	INFO scripts.py:777 -- ray[35m.[39m[26minit(address[35m=[39m[26m[33m'auto'[39m[26m)
2023-04-22 21:49:53,378	INFO scripts.

In [4]:
import ray

if ray.is_initialized:
    ray.shutdown()

ray_log = False
if not ray_log:
    import logging
    ray.init(address='auto', logging_level=logging.ERROR)
else:
    ray.init(address='auto')

### Check ray cluster resources

In [5]:
ray.cluster_resources()

{'accelerator_type:A100': 1.0,
 'object_store_memory': 142508620185.0,
 'memory': 322520113767.0,
 'GPU': 1.0,
 'node:128.55.64.15': 1.0,
 'CPU': 256.0}

### View dashboards

In [6]:
f'https://jupyter.nersc.gov{os.getenv("JUPYTERHUB_SERVICE_PREFIX")}proxy/localhost:8265/#/new/overview'

'https://jupyter.nersc.gov/user/asnaylor/perlmutter-shared-node-cpu/proxy/localhost:8265/#/new/overview'

In [7]:
f'https://jupyter.nersc.gov{os.getenv("JUPYTERHUB_SERVICE_PREFIX")}proxy/3000/d/rayDefaultDashboard'

'https://jupyter.nersc.gov/user/asnaylor/perlmutter-shared-node-cpu/proxy/3000/d/rayDefaultDashboard'

# MINST Code

In [8]:
import numpy as np
import json
from ray.air.result import Result
import tensorflow as tf

from ray.train.tensorflow import TensorflowTrainer
from ray.air.integrations.keras import Callback as TrainCheckpointReportCallback
from ray.air.config import ScalingConfig

In [9]:
def mnist_dataset(batch_size: int) -> tf.data.Dataset:
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .shuffle(60000)
        .repeat()
        .batch(batch_size)
    )
    return train_dataset

In [10]:
def build_cnn_model() -> tf.keras.Model:
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )
    return model


In [11]:
#import atexit

def train_func(config: dict):
    per_worker_batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)
    steps_per_epoch = config.get("steps_per_epoch", 70)

    tf_config = json.loads(os.environ["TF_CONFIG"])
    num_workers = len(tf_config["cluster"]["worker"])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    #atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore
    
    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_cnn_model()
        learning_rate = config.get("lr", 0.001)
        multi_worker_model.compile(
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
            metrics=["accuracy"],
        )

    history = multi_worker_model.fit(
        multi_worker_dataset,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        callbacks=[TrainCheckpointReportCallback()],
    )
    results = history.history
    return results

# Run MNIST Code

In [17]:
args = {
    'num_workers': int(ray.cluster_resources()['GPU']),
    'use_gpu': True,
    'epochs': 20
}
config = {"lr": 1e-3, "batch_size": 64, "epochs": args['epochs']}

In [18]:
trainer = TensorflowTrainer(
    train_loop_per_worker=train_func,
    train_loop_config=config,
    scaling_config=ScalingConfig(num_workers=args['num_workers'], use_gpu=args['use_gpu']),
)
results = trainer.fit()

0,1
Current time:,2023-04-22 21:53:01
Running for:,00:00:19.35
Memory:,96.3/503.1 GiB

Trial name,status,loc,iter,total time (s),loss,accuracy,_timestamp
TensorflowTrainer_ae678_00000,TERMINATED,128.55.64.15:165042,20,13.1295,0.553993,0.86808,1682225578


[2m[36m(RayTrainWorker pid=166532)[0m 2023-04-22 21:52:49.022524: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
[2m[36m(RayTrainWorker pid=166532)[0m To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
[2m[36m(RayTrainWorker pid=166532)[0m 2023-04-22 21:52:49.578053: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 38219 MB memory:  -> device: 0, name: NVIDIA A100-PCIE-40GB, pci bus id: 0000:c3:00.0, compute capability: 8.0
[2m[36m(RayTrainWorker pid=166532)[0m 2023-04-22 21:52:49.598188: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:worker/replica:0/task:0/device:GPU:0 with 38219 MB memory:  -> device: 0, name: NVIDIA A100-PCIE-40GB, pci bus id: 0000:c3:00.0,

[2m[36m(RayTrainWorker pid=166532)[0m Epoch 1/20


[2m[36m(RayTrainWorker pid=166532)[0m 2023-04-22 21:52:53.733165: I tensorflow/stream_executor/cuda/cuda_dnn.cc:384] Loaded cuDNN version 8302
[2m[36m(RayTrainWorker pid=166532)[0m 2023-04-22 21:52:54.461153: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.




Trial name,_time_this_iter_s,_timestamp,_training_iteration,accuracy,date,done,episodes_total,experiment_id,experiment_tag,hostname,iterations_since_restore,loss,node_ip,pid,should_checkpoint,time_since_restore,time_this_iter_s,time_total_s,timestamp,timesteps_since_restore,timesteps_total,training_iteration,trial_id,warmup_time
TensorflowTrainer_ae678_00000,0.212433,1682225578,20,0.86808,2023-04-22_21-52-59,True,,0a3ecf71f4694fd294f6e0dbaaefdc8b,0,login06,20,0.553993,128.55.64.15,165042,True,13.1295,0.212926,13.1295,1682225579,0,,20,ae678_00000,0.00950599


[2m[36m(RayTrainWorker pid=166532)[0m Epoch 2/20
 1/70 [..............................] - ETA: 0s - loss: 2.2556 - accuracy: 0.2656
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 3/20
 1/70 [..............................] - ETA: 0s - loss: 2.1918 - accuracy: 0.4844
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 4/20
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 5/20
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 6/20
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 7/20
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 8/20
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 9/20
 1/70 [..............................] - ETA: 0s - loss: 1.4502 - accuracy: 0.8281
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 10/20
 1/70 [..............................] - ETA: 0s - loss: 1.1883 - accuracy: 0.8906
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 11/20
 1/70 [..............................] - ETA: 0s - loss: 0.9863 - accuracy: 0.8750
[2m[36m(RayTrainWorker pid=166532)[0m Epoch 12/20
[2m[36m(

[2m[36m(RayTrainWorker pid=166532)[0m Exception ignored in: <function Pool.__del__ at 0x7f5bec4b5e50>
[2m[36m(RayTrainWorker pid=166532)[0m Traceback (most recent call last):
[2m[36m(RayTrainWorker pid=166532)[0m   File "/global/common/software/nersc/pm-2022q4/sw/tensorflow/2.9.0/lib/python3.9/multiprocessing/pool.py", line 268, in __del__
[2m[36m(RayTrainWorker pid=166532)[0m     self._change_notifier.put(None)
[2m[36m(RayTrainWorker pid=166532)[0m   File "/global/common/software/nersc/pm-2022q4/sw/tensorflow/2.9.0/lib/python3.9/multiprocessing/queues.py", line 377, in put
[2m[36m(RayTrainWorker pid=166532)[0m     self._writer.send_bytes(obj)
[2m[36m(RayTrainWorker pid=166532)[0m   File "/global/common/software/nersc/pm-2022q4/sw/tensorflow/2.9.0/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes
[2m[36m(RayTrainWorker pid=166532)[0m     self._send_bytes(m[offset:offset + size])
[2m[36m(RayTrainWorker pid=166532)[0m   File "/global/common/

In [19]:
results

Result(metrics={'loss': 0.5539929866790771, 'accuracy': 0.8680803775787354, '_timestamp': 1682225578, '_time_this_iter_s': 0.212432861328125, '_training_iteration': 20, 'should_checkpoint': True, 'done': True, 'trial_id': 'ae678_00000', 'experiment_tag': '0'}, error=None, log_dir=PosixPath('/global/homes/a/asnaylor/ray_results/TensorflowTrainer_2023-04-22_21-52-42/TensorflowTrainer_ae678_00000_0_2023-04-22_21-52-42'))

### Shutdown

In [20]:
ray.shutdown()

In [21]:
ray_cluster.kill()