<img src="https://cdn.comet.ml/img/notebook_logo.png">

[Comet](https://www.comet.com/site/products/ml-experiment-tracking/?utm_campaign=ray_train&utm_medium=colab) is an MLOps Platform that is designed to help Data Scientists and Teams build better models faster! Comet provides tooling to track, Explain, Manage, and Monitor your models in a single place! It works with Jupyter Notebooks and Scripts and most importantly it's 100% free to get started!

[Ray Train](https://docs.ray.io/en/latest/train/train.html) abstracts away the complexity of setting up a distributed training system.

Instrument your runs with Comet to start managing experiments, create dataset versions and track hyperparameters for faster and easier reproducibility and collaboration.

[Find more information about our integration with Ray Train](https://www.comet.com/docs/v2/integrations/ml-frameworks/ray/)

Get a preview for what's to come. Check out a completed experiment created from this notebook [here](https://www.comet.com/examples/comet-example-ray-train-keras/99d169308c854be7ac222c995a2bfa26?experiment-tab=systemMetrics).

This example is based on the [following Ray Train Tensorflow example](https://docs.ray.io/en/latest/train/examples/tf/tensorflow_mnist_example.html).

# Install Dependencies

In [None]:
%pip install -U "comet_ml>=3.44.0" "ray[air]>=2.1.0" "keras<3" "tensorflow<2.16.0"

# Login to Comet

In [None]:
import comet_ml

comet_ml.login(project_name="comet-example-ray-train-keras")

# Import Dependencies

In [None]:
import argparse
import json
import os

import comet_ml.integration.ray

import numpy as np
import ray
from ray.air.config import RunConfig, ScalingConfig
from ray.air.result import Result
from ray.train.tensorflow import TensorflowTrainer

import tensorflow as tf

# Prepare your dataset

In [None]:
def mnist_dataset(batch_size: int) -> tf.data.Dataset:
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .shuffle(60000)
        .repeat()
        .batch(batch_size)
    )
    return train_dataset

# Define your model

In [None]:
def build_cnn_model() -> tf.keras.Model:
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )
    return model

# Define your distributed training function

This function is gonna be distributed and executed on each distributed worker.

In [None]:
def train_func(config: dict):
    from comet_ml.integration.ray import comet_worker_logger
    from ray.air import session

    per_worker_batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)
    steps_per_epoch = config.get("steps_per_epoch", 70)

    with comet_worker_logger(config) as experiment:

        tf_config = json.loads(os.environ["TF_CONFIG"])
        num_workers = len(tf_config["cluster"]["worker"])

        strategy = tf.distribute.MultiWorkerMirroredStrategy()

        global_batch_size = per_worker_batch_size * num_workers
        multi_worker_dataset = mnist_dataset(global_batch_size)

        with strategy.scope():
            # Model building/compiling need to be within `strategy.scope()`.
            multi_worker_model = build_cnn_model()
            learning_rate = config.get("lr", 0.001)
            multi_worker_model.compile(
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
                metrics=["accuracy"],
            )

        callbacks = []
        if session.get_world_rank() == 0:
            callbacks.append(experiment.get_callback("tf-keras"))

        history = multi_worker_model.fit(
            multi_worker_dataset,
            epochs=epochs,
            steps_per_epoch=steps_per_epoch,
            callbacks=callbacks,
        )
        results = history.history

    return results

# Define the function that schedule the distributed job

In [None]:
def train_tensorflow_mnist(
    num_workers: int = 2, use_gpu: bool = False, epochs: int = 4
) -> Result:
    config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs}

    callback = comet_ml.integration.ray.CometTrainLoggerCallback(config)

    trainer = TensorflowTrainer(
        train_loop_per_worker=train_func,
        train_loop_config=config,
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
        run_config=RunConfig(callbacks=[callback]),
    )
    results = trainer.fit()
    return results

# Train the model

Ray will wait indefinitely if we request more num_workers that the available resources, the code below ensure we never request more CPU than available locally.

In [None]:
ideal_num_workers = 2

available_local_cpu_count = os.cpu_count() - 1
num_workers = min(ideal_num_workers, available_local_cpu_count)

if num_workers < 1:
    num_workers = 1

train_tensorflow_mnist(num_workers, use_gpu=False, epochs=10)