In [1]:
import numpy as np
import tensorflow as tf

def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices(
        (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset


def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28)),
        tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=['accuracy'])
    return model

2023-08-11 13:02:43.928210: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-08-11 13:02:43.977882: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
import json
import os

def train_func_distributed():
    per_worker_batch_size = 64
    # This environment variable will be set by Ray Train.
    tf_config = json.loads(os.environ['TF_CONFIG'])
    num_workers = len(tf_config['cluster']['worker'])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_cnn_model()

    multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

In [6]:
from ray.train.tensorflow import TensorflowTrainer
from ray.air.config import ScalingConfig

# For GPU Training, set `use_gpu` to True.
use_gpu = False

trainer = TensorflowTrainer(train_func_distributed, scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu))

result = trainer.fit()
print(result.metrics)

0,1
Current time:,2023-08-11 13:04:27
Running for:,00:00:24.72
Memory:,11.1/31.1 GiB

Trial name,status,loc
TensorflowTrainer_12319_00000,TERMINATED,172.31.108.40:38188


2023-08-11 13:04:02,320	INFO data_parallel_trainer.py:404 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6: no version information available (required by bash)
[2m[36m(TrainTrainable pid=38188)[0m GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[36m(TensorflowTrainer pid=38188)[0m GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6: no version information available (required by bash)
[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6:

[2m[36m(RayTrainWorker pid=38237)[0m Epoch 1/3
 2/70 [..............................] - ETA: 4s - loss: 9.1739 - accuracy: 0.3828 
 3/70 [>.............................] - ETA: 3s - loss: 9.1661 - accuracy: 0.4167
 4/70 [>.............................] - ETA: 3s - loss: 9.1645 - accuracy: 0.4336
 5/70 [=>............................] - ETA: 3s - loss: 9.1632 - accuracy: 0.4500
 7/70 [==>...........................] - ETA: 3s - loss: 9.1649 - accuracy: 0.4643
 9/70 [==>...........................] - ETA: 3s - loss: 9.1591 - accuracy: 0.4688
11/70 [===>..........................] - ETA: 3s - loss: 9.1558 - accuracy: 0.4773
13/70 [====>.........................] - ETA: 3s - loss: 9.1509 - accuracy: 0.4988
15/70 [=====>........................] - ETA: 3s - loss: 9.1523 - accuracy: 0.5031
[2m[36m(RayTrainWorker pid=38235)[0m Epoch 2/3[32m [repeated 4x across cluster][0m
 6/70 [=>............................] - ETA: 3s - loss: 8.8721 - accuracy: 1.5911
 7/70 [==>.....................

2023-08-11 13:04:27,038	INFO tune.py:1148 -- Total run time: 24.74 seconds (24.72 seconds for the tuning loop).
- /home/drew/ray_results/TensorflowTrainer_2023-08-11_13-04-02/TensorflowTrainer_12319_00000_0_2023-08-11_13-04-02


{'trial_id': '12319_00000', 'date': '2023-08-11_13-04-05', 'timestamp': 1691773445, 'pid': 38188, 'hostname': 'drew-lenovo', 'node_ip': '172.31.108.40', 'config': {}, 'done': True}


In [1]:
from ray.air import session, Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer

import numpy as np

def train_func(config):
    import tensorflow as tf
    n = 100
    # create a toy dataset
    # data   : X - dim = (n, 4)
    # target : Y - dim = (n, 1)
    X = np.random.normal(0, 1, size=(n, 4))
    Y = np.random.uniform(0, 1, size=(n, 1))

    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():
        # toy neural network : 1-layer
        model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
        model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])

    for epoch in range(config["num_epochs"]):
        model.fit(X, Y, batch_size=20)
        checkpoint = Checkpoint.from_dict(
            dict(epoch=epoch, model_weights=model.get_weights())
        )
        session.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
    train_func,
    train_loop_config={"num_epochs": 5},
    scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()

print(result.checkpoint.to_dict())

0,1
Current time:,2023-08-11 13:20:25
Running for:,00:00:10.64
Memory:,8.2/31.1 GiB

Trial name,status,loc,iter,total time (s)
TensorflowTrainer_55de2_00000,TERMINATED,172.31.108.40:43760,5,5.3894


2023-08-11 13:20:14,851	INFO data_parallel_trainer.py:404 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6: no version information available (required by bash)
[2m[36m(TrainTrainable pid=43760)[0m GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[36m(TensorflowTrainer pid=43760)[0m GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6: no version information available (required by bash)
[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6:

1/5 [=====>........................] - ETA: 0s - loss: 5.5933 - mse: 5.5933


2023-08-11 13:20:25,485	INFO tune.py:1148 -- Total run time: 10.66 seconds (10.63 seconds for the tuning loop).


{'epoch': 4, 'model_weights': [array([[0.5412861 ],
       [0.77299863],
       [0.52936023],
       [0.7033395 ]], dtype=float32), array([0.0204476], dtype=float32)], '_metadata': _CheckpointMetadata(checkpoint_type=<class 'ray.train.tensorflow.tensorflow_checkpoint.TensorflowCheckpoint'>, checkpoint_state={'_current_checkpoint_id': 4, '_flavor': None, '_h5_file_path': None}), '_preprocessor': None, '_ray_additional_checkpoint_files': {'.tune_metadata': b'././@PaxHeader\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x000000000\x000000000\x000000000\x0000000000034\x0000000000000\x00010212\x00 x\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x

In [1]:
import ray
import tensorflow as tf

from ray.air import session
from ray.air.integrations.keras import ReportCheckpointCallback
from ray.train.tensorflow import TensorflowTrainer
from ray.air.config import ScalingConfig


# If using GPUs, set this to True.
use_gpu = False

a = 5
b = 10
size = 100


def build_model() -> tf.keras.Model:
    model = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=()),
            # Add feature dimension, expanding (batch_size,) to (batch_size, 1).
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(10),
            tf.keras.layers.Dense(1),
        ]
    )
    return model


def train_func(config: dict):
    batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)

    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_model()
        multi_worker_model.compile(
            optimizer=tf.keras.optimizers.SGD(learning_rate=config.get("lr", 1e-3)),
            loss=tf.keras.losses.mean_squared_error,
            metrics=[tf.keras.metrics.mean_squared_error],
        )

    dataset = session.get_dataset_shard("train")

    results = []
    for _ in range(epochs):
        tf_dataset = dataset.to_tf(
            feature_columns="x", label_columns="y", batch_size=batch_size
        )
        history = multi_worker_model.fit(
            tf_dataset, callbacks=[ReportCheckpointCallback()]
        )
        results.append(history.history)
    return results


config = {"lr": 1e-3, "batch_size": 32, "epochs": 4}

train_dataset = ray.data.from_items(
    [{"x": x / 200, "y": 2 * x / 200} for x in range(200)]
)
scaling_config = ScalingConfig(num_workers=2, use_gpu=use_gpu)
trainer = TensorflowTrainer(
    train_loop_per_worker=train_func,
    train_loop_config=config,
    scaling_config=scaling_config,
    datasets={"train": train_dataset},
)
result = trainer.fit()
print(result.metrics)

0,1
Current time:,2023-08-14 14:07:15
Running for:,00:00:12.45
Memory:,8.5/31.1 GiB

Trial name,status,loc,iter,total time (s),loss,mean_squared_error
TensorflowTrainer_5ee55_00000,TERMINATED,172.31.108.40:17290,4,7.13188,1.3967,1.3967


2023-08-14 14:07:03,003	INFO data_parallel_trainer.py:404 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6: no version information available (required by bash)
[2m[36m(TrainTrainable pid=17290)[0m GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[36m(TensorflowTrainer pid=17290)[0m GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6: no version information available (required by bash)
[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6:

[2m[36m(SplitCoordinator pid=17454)[0m [dataset]: Run `pip install tqdm` to enable progress reporting.
      1/Unknown - 1s 1s/step - loss: 0.0949 - mean_squared_error: 0.0949
      6/Unknown - 1s 12ms/step - loss: 3.4796 - mean_squared_error: 3.4796


[2m[33m(raylet)[0m bash: /home/drew/miniconda3/envs/imgt/lib/libtinfo.so.6: no version information available (required by bash)




[2m[36m(SplitCoordinator pid=17454)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(2, equal=True)]
[2m[36m(SplitCoordinator pid=17454)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['c71245cc0888d6d76191a491cf572db0842a320d46f154a5269d46b0', 'c71245cc0888d6d76191a491cf572db0842a320d46f154a5269d46b0'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=17454)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


      4/Unknown - 0s 19ms/step - loss: 1.3588 - mean_squared_error: 1.3588 


[2m[36m(SplitCoordinator pid=17454)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(2, equal=True)]
[2m[36m(SplitCoordinator pid=17454)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['c71245cc0888d6d76191a491cf572db0842a320d46f154a5269d46b0', 'c71245cc0888d6d76191a491cf572db0842a320d46f154a5269d46b0'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=17454)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


      4/Unknown - 0s 18ms/step - loss: 1.1859 - mean_squared_error: 1.1859 


[2m[36m(SplitCoordinator pid=17454)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(2, equal=True)]
[2m[36m(SplitCoordinator pid=17454)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['c71245cc0888d6d76191a491cf572db0842a320d46f154a5269d46b0', 'c71245cc0888d6d76191a491cf572db0842a320d46f154a5269d46b0'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=17454)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-08-14 14:07:15,453	INFO tune.py:1148 -- Total run time: 12.56 seconds (12.45 seconds for the tuning loop).


{'loss': 1.396697998046875, 'mean_squared_error': 1.396697998046875, 'timestamp': 1692036433, 'time_this_iter_s': 0.6089682579040527, 'should_checkpoint': True, 'done': True, 'training_iteration': 4, 'trial_id': '5ee55_00000', 'date': '2023-08-14_14-07-13', 'time_total_s': 7.131880760192871, 'pid': 17290, 'hostname': 'drew-lenovo', 'node_ip': '172.31.108.40', 'config': {'train_loop_config': {'lr': 0.001, 'batch_size': 32, 'epochs': 4}}, 'time_since_restore': 7.131880760192871, 'iterations_since_restore': 4, 'experiment_tag': '0'}
