In [None]:
!pip install ray==2.6.1 tensorflow==2.12.1 pyarrow tblib

In [6]:
import argparse
from filelock import FileLock
import json
import os

import numpy as np
from ray.air.result import Result
import tensorflow as tf

from ray.train.tensorflow import TensorflowTrainer
from ray.air.integrations.keras import ReportCheckpointCallback
from ray.air.config import ScalingConfig


def mnist_dataset(batch_size: int) -> tf.data.Dataset:
    with FileLock(os.path.expanduser("~/.mnist_lock")):
        (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .shuffle(60000)
        .repeat()
        .batch(batch_size)
    )
    return train_dataset


def build_cnn_model() -> tf.keras.Model:
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )
    return model


def train_func(config: dict):
    per_worker_batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)
    steps_per_epoch = config.get("steps_per_epoch", 70)

    tf_config = json.loads(os.environ["TF_CONFIG"])
    num_workers = len(tf_config["cluster"]["worker"])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_cnn_model()
        learning_rate = config.get("lr", 0.001)
        multi_worker_model.compile(
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
            metrics=["accuracy"],
        )

    history = multi_worker_model.fit(
        multi_worker_dataset,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        callbacks=[ReportCheckpointCallback()],
    )
    results = history.history
    return results


def train_tensorflow_mnist(
    num_workers: int = 2, use_gpu: bool = False, epochs: int = 4
) -> Result:
    config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs}
    trainer = TensorflowTrainer(
        train_loop_per_worker=train_func,
        train_loop_config=config,
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
    )
    results = trainer.fit()
    return results

2023-08-08 11:00:11.689710: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-08-08 11:00:11.754890: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-08-08 11:00:11.757280: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [1]:
import ray

In [None]:
runtime_env = {
    'env_vars': {'RAY_AIR_NEW_OUTPUT': '0'}
}

In [None]:
ray.init(address="ray://example-cluster-head-svc:10001", runtime_env=runtime_env)

In [4]:
@ray.remote(num_gpus=2, runtime_env=runtime_env)
def f():
    print(ray.get_gpu_ids())

In [5]:
f.remote()

ClientObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)

[2m[36m(f pid=254)[0m [0, 1]


In [None]:
train_tensorflow_mnist(num_workers=2, use_gpu=True, epochs=3)

[2m[36m(TunerInternal pid=292)[0m [output] This will use the new output engine with verbosity 1. To disable the new output and use the legacy output engine, set the environment variable RAY_AIR_NEW_OUTPUT=0. For more information, please see https://github.com/ray-project/ray/issues/36949


[2m[36m(TunerInternal pid=292)[0m 
[2m[36m(TunerInternal pid=292)[0m View detailed results here: /home/ray/ray_results/TensorflowTrainer_2023-08-08_04-00-31
[2m[36m(TunerInternal pid=292)[0m To visualize your results with TensorBoard, run: `tensorboard --logdir /home/ray/ray_results/TensorflowTrainer_2023-08-08_04-00-31`
[2m[36m(TunerInternal pid=292)[0m 


[2m[36m(TunerInternal pid=292)[0m AIR_VERBOSITY is set, ignoring passed-in ProgressReporter for now.
[2m[36m(pid=405)[0m 2023-08-08 04:00:41.034899: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
[2m[36m(pid=405)[0m To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
[2m[36m(pid=405)[0m 2023-08-08 04:00:41.273134: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[2m[36m(pid=405)[0m 2023-08-08 04:00:42.614142: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.

[2m[36m(TunerInternal pid=292)[0m Training started with configuration:
[2m[36m(TunerInternal pid=292)[0m ╭──────────────────────────────────────╮
[2m[36m(TunerInternal pid=292)[0m │ Training config                      │
[2m[36m(TunerInternal pid=292)[0m ├──────────────────────────────────────┤
[2m[36m(TunerInternal pid=292)[0m │ train_loop_config/batch_size      64 │
[2m[36m(TunerInternal pid=292)[0m │ train_loop_config/epochs           3 │
[2m[36m(TunerInternal pid=292)[0m │ train_loop_config/lr           0.001 │
[2m[36m(TunerInternal pid=292)[0m ╰──────────────────────────────────────╯
[2m[36m(TunerInternal pid=292)[0m 


[2m[36m(TensorflowTrainer pid=405)[0m Starting distributed worker processes: ['500 (10.42.1.29)', '539 (10.42.1.29)']
[2m[36m(RayTrainWorker pid=500)[0m 2023-08-08 04:00:51.329339: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
[2m[36m(RayTrainWorker pid=500)[0m To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
[2m[36m(RayTrainWorker pid=539)[0m 2023-08-08 04:00:51.330414: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
[2m[36m(RayTrainWorker pid=539)[0m To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
[2

[2m[36m(RayTrainWorker pid=539)[0m Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
   24576/11490434 [..............................] - ETA: 25s
   49152/11490434 [..............................] - ETA: 27s
  147456/11490434 [..............................] - ETA: 21s
  278528/11490434 [..............................] - ETA: 15s
  557056/11490434 [>.............................] - ETA: 9s 
 1064960/11490434 [=>............................] - ETA: 5s
 2138112/11490434 [====>.........................] - ETA: 3s


[2m[36m(RayTrainWorker pid=500)[0m 2023-08-08 04:01:03.930382: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
[2m[36m(RayTrainWorker pid=500)[0m op: "TensorSliceDataset"
[2m[36m(RayTrainWorker pid=500)[0m input: "Placeholder/_0"
[2m[36m(RayTrainWorker pid=500)[0m input: "Placeholder/_1"
[2m[36m(RayTrainWorker pid=500)[0m attr {
[2m[36m(RayTrainWorker pid=500)[0m   key: "Toutput_types"
[2m[36m(RayTrainWorker pid=500)[0m   value {
[2m[36m(RayTrainWorker pid=500)[0m     list {
[2m[36m(RayTrainWorker pid=500)[0m       type: DT_FLOAT
[2m[36m(RayTrainWorker pid=500)[0m       type: DT_INT64
[2m[36m(RayTrainWorker pid=500)[0m     }
[2m[36m(RayTrainWorker pid=500)[0m   }
[2m[36m(RayTrainWorker pid=500)[0m }
[2m[36m(RayTrainWorker pid=500)[0m attr

[2m[36m(RayTrainWorker pid=539)[0m Epoch 1/3
[2m[36m(RayTrainWorker pid=500)[0m Epoch 1/3
 1/70 [..............................] - ETA: 5:33 - loss: 4.5878 - accuracy: 0.3125
 1/70 [..............................] - ETA: 5:38 - loss: 4.5878 - accuracy: 0.3125
 2/70 [..............................] - ETA: 20s - loss: 4.5900 - accuracy: 0.2656 
 2/70 [..............................] - ETA: 15s - loss: 4.5900 - accuracy: 0.2656 
 3/70 [>.............................] - ETA: 19s - loss: 4.5907 - accuracy: 0.2656
 3/70 [>.............................] - ETA: 17s - loss: 4.5907 - accuracy: 0.2656
 4/70 [>.............................] - ETA: 19s - loss: 4.5930 - accuracy: 0.2656
 4/70 [>.............................] - ETA: 17s - loss: 4.5930 - accuracy: 0.2656
 5/70 [=>............................] - ETA: 19s - loss: 4.5958 - accuracy: 0.2656
 5/70 [=>............................] - ETA: 18s - loss: 4.5958 - accuracy: 0.2656
 6/70 [=>............................] - ETA: 18s - loss: 4.