# Ray on CDSW workers

This notebook is a POC demo of [Ray] working across CDSW workers. Alpha stage, not yet tested on CML.

First, install and import dependencies.

In [None]:
!pip3 install ray[tune]

In [None]:
import os

import cdsw
import ray
from ray import tune

We'll use the local session as the head node in the Ray cluster, then attach some worker nodes using the CDSW Workers API.

In [None]:
RAY_DASHBOARD_PORT = int(os.getenv("CDSW_READONLY_PORT"))

In [None]:
ray_head = ray.init(dashboard_port=RAY_DASHBOARD_PORT)

Since Ray runs in the background, we need to keep our CDSW workers awake. We do this with a simple sleep loop in the command we pass to the workers.

Note that this means the workers won't shut down when we shutdown Ray. If we restart the Ray process, we should be sure to close our idle workers.

In [None]:
os.getenv("CDSW_IP_ADDRESS")

In [None]:
ray_nodes = cdsw.launch_workers(
    n=2,
    cpu=1,
    memory=2,
    kernel="python3",
    code=f"!ray start --address={ray_head['redis_address']}; while true; do sleep 10; done"
)

This should give us access to some compute. We can the Ray Dashboard at the URL:

In [None]:
print(f"""http://read-only-{os.getenv('CDSW_MASTER_ID')}.{os.getenv("CDSW_DOMAIN")}""")

Let's test out Ray. The Ray docs include [this](https://docs.ray.io/en/latest/tune/index.html#quick-start) simple hyperparameter tuning routine (using Ray Tune). We'll scan over a larger range of parameters, since we need to give it enough work that we'll see the workers doing something on the Dashboard. We'll also see results reported below.

In [None]:
def objective(step, alpha, beta):
    return (0.1 + alpha * step / 100)**(-1) + beta * 0.1


def training_function(config):
    # Hyperparameters
    alpha, beta = config["alpha"], config["beta"]
    for step in range(10):
        # Iterative training function - can be any arbitrary training procedure.
        intermediate_score = objective(step, alpha, beta)
        # Feed the score back back to Tune.
        tune.report(mean_loss=intermediate_score)


analysis = tune.run(
    training_function,
    config={
        "alpha": tune.grid_search([x / 100 for x in range(100)]),
        "beta": tune.choice(range(10))
    })

print("Best config: ", analysis.get_best_config(
    metric="mean_loss", mode="min"))

# Get a dataframe for analyzing trial results.
df = analysis.results_df


Hopefully that went smoothly, and you saw some work happening in the Dashboard.

Once our analysis is complete, we should stop hogging cluster resources. Ray nodes run as background processes, so killing this notebook won't kill them (stopping the entire Jupyter session would). We can explicitly shut down ray processes like so.

In [None]:
ray.shutdown()

We should also stop the workers we created.

In [None]:
cdsw.stop_workers(*[worker['id'] for worker in ray_nodes])

That's it! Ray working across CDSW workers!