In this first notebook, we will go through the basics of using the SDK to:
 - Spin up a Ray cluster with our desired resources
 - View the status and specs of our Ray cluster
 - Take down the Ray cluster when finished

In [7]:
# Import pieces from codeflare-sdk
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration
from codeflare_sdk.job.jobs import DDPJobDefinition
# Ignoring auth for now

Here, we want to define our cluster by specifying the resources we require for our batch workload. Below, we define our cluster object (which generates a corresponding AppWrapper).

In [None]:
# Create and configure our cluster object (and appwrapper)
cluster = Cluster(ClusterConfiguration(
    name='raytest',
    namespace='default',
    min_worker=1,
    max_worker=1,
    min_cpus=2,
    max_cpus=2,
    min_memory=8,
    max_memory=8,
    image="quay.io/project-codeflare/ray:2.5.0-py38-cu116", #optional, default is ray 2.1.0
    gpu=0,
    instascale=False,
    local_interactive=False #set to True if using cluster interactively from local machine, requires auth
))

Next, we want to bring our cluster up, so we call the `up()` function below to submit our cluster AppWrapper yaml onto the MCAD queue, and begin the process of obtaining our resource cluster.

In [11]:
# Bring up the cluster
cluster.up()

Now, we want to check on the status of our resource cluster, and wait until it is finally ready for use.

In [None]:
cluster.status()

In [None]:
cluster.wait_ready()

In [None]:
cluster.status()

Let's quickly verify that the specs of the cluster are as expected.

In [None]:
cluster.details()

Now that we have our cluster, there are two ways we can use it:
 - Submit direct batch jobs via TorchX
 - Develop interactively with Ray remote function execution

 Let's start with the direct, fire-and-forget job submission approach:

In [None]:
jobdef = DDPJobDefinition(
    name="mnisttest",
    script="mnist.py",
    scheduler_args={"requirements": "requirements.txt"}
)
job = jobdef.submit(cluster)

Now we can take a look at the status of our submitted job, as well as the logs:

In [None]:
job.status()

In [None]:
job.logs()

We can also go to the dashboard of our Ray cluster to see additional details:

In [None]:
cluster.cluster_dashboard_uri()

The second approach is interacting with your Ray cluster cell-by-cell, by having local functions executed on your remote resources:

In [None]:
import ray

# reset the ray context in case there's already one. 
ray.shutdown()

# establish connection to ray cluster

#install additionall libraries that will be required for model training
runtime_env = {"pip": ["transformers", "datasets", "evaluate", "pyarrow<7.0.0", "accelerate"]}
ray.init(address=cluster.cluster_uri(), runtime_env=runtime_env)

print("Ray cluster is up and running: ", ray.is_initialized())

Now that we are connected to the Ray cluster, we can submit functions to be executed remotely.

In [None]:
@ray.remote
def train_fn():
    from datasets import load_dataset, load_metric
    import numpy as np

    import transformers
    from transformers import AutoTokenizer, TrainingArguments
    from transformers import AutoModelForSequenceClassification
    
    import ray
    from ray.train.huggingface import HuggingFaceTrainer
    from ray.air.config import ScalingConfig

    # Dataset retrieval/preprocessing

    dataset = load_dataset("imdb")
    tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True)

    tokenized_datasets = dataset.map(tokenize_function, batched=True)

    #using a fraction of dataset but you can run with the full dataset
    small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(100))
    small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(100))

    print(f"len of train {small_train_dataset} and test {small_eval_dataset}")

    ray_train_ds = ray.data.from_huggingface(small_train_dataset)
    ray_evaluation_ds = ray.data.from_huggingface(small_eval_dataset)


    # Assessment metrics

    def compute_metrics(eval_pred):
        metric = load_metric("accuracy")
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    # Training

    def trainer_init_per_worker(train_dataset, eval_dataset, **config):
        model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)

        training_args = TrainingArguments("/tmp/hf_imdb/test", eval_steps=1, disable_tqdm=True, 
                                          num_train_epochs=1, skip_memory_metrics=True,
                                          learning_rate=2e-5,
                                          per_device_train_batch_size=4,
                                          per_device_eval_batch_size=4,                                
                                          weight_decay=0.01,)
        return transformers.Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=eval_dataset,
            compute_metrics=compute_metrics
        )

    scaling_config = ScalingConfig(num_workers=2, use_gpu=False) #num workers is the number of gpus

    # we are using the ray native HuggingFaceTrainer, but you can swap out to use non ray Huggingface Trainer. Both have the same method signature. 
    # the ray native HFTrainer has built in support for scaling to multiple GPUs
    trainer = HuggingFaceTrainer(
        trainer_init_per_worker=trainer_init_per_worker,
        scaling_config=scaling_config,
        datasets={"train": ray_train_ds, "evaluation": ray_evaluation_ds},
    )
    result = trainer.fit()

In [None]:
# Call the above cell as a remote ray function
ray.get(train_fn.remote())

To disconnect at any time, simply call the following:

In [None]:
ray.shutdown()

Once complete, we can bring our Ray cluster down and clean up:

In [6]:
cluster.down()