# Ray on interactive compute cluster

In [ray-on-compute-instance notebook](../1.ray-on-compute-instance/ray-on-compute-instance.ipynb), we learned how to start a local Ray cluster and interactively execute Ray script on compute instance.

In [ray-on-compute-cluster](../2.ray-on-compute-cluster/ray-on-compute-cluster.ipynb), we learned how to submit a distributed training job with Ray cluster enabled onto multi-nodes Azure ML compute clusters.

In this notebook, we would learn how to combine this 2 scenarios to build a multi-nodes heterogeneous Ray cluster, and interactively execute Ray application.


## Prerequisites
To build an interactive multi-nodes heterogeneous Ray cluster, we need one compute instance as head node and one or more cpu/gpu compute clusters as worker nodes.

The compute instance and compute cluster are required to be placed in one virtual network and subnet.

Please follow [this document](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-secure-training-vnet?view=azureml-api-2&tabs=cli%2Crequired) to setup 1 cpu compute instance and 2 nodes gpu compute cluster.

## Install required packages

More info about installing Ray could be found [here](https://docs.ray.io/en/latest/ray-overview/installation.html).

In [None]:
%pip install \
  "azure-ai-ml>=1.6.0" \
  "ray[default, air, tune]==2.4.0" \
  gpustat==1.0.0 \
  torch \
  torchvision

## Start a Ray cluster on compute instance

We would use the current compute instance as head node of the Ray cluster we are trying to build.

In [None]:
import ray
import configparser

dashboard_port = 8265

ray_instance = ray.init(
    include_dashboard= True,
    dashboard_port=dashboard_port,
    ignore_reinit_error=True
)


# update Ray dashboard link
try:
    parser = configparser.ConfigParser()
    with open("/mnt/azmnt/.nbvm") as stream:
        parser.read_string("[config]\n" + stream.read())

    config = parser['config']
    ci_name = config['instance']
    domainsuffix = config['domainsuffix']

    dashboard_url = f'{ci_name}-{dashboard_port}.{domainsuffix}'
except:
    dashboard_url = ray_instance.dashboard_url

ray_instance.dashboard_url = dashboard_url
ray_instance

## Attach worker nodes using compute cluster

After head node started, we can submit a worker nodes only job by passing the head node address.

### Import required libraries

In [None]:
from azure.identity import DefaultAzureCredential

from azure.ai.ml import MLClient, command
from azure.ai.ml.entities import Environment

### Connect to workspace using DefaultAzureCredential

`DefaultAzureCredential` should be capable of handling most Azure SDK authentication scenarios. 


In [None]:
credential = DefaultAzureCredential()
ml_client = None
try:
    ml_client = MLClient.from_config(credential)
    workspace = ml_client.workspace_name
    subscription_id = ml_client.workspaces.get(workspace).id.split("/")[2]
    resource_group = ml_client.workspaces.get(workspace).resource_group
except Exception as ex:
    print(ex)
    # Enter details of your AML workspace
    subscription_id = "<SUBSCRIPTION_ID>"
    resource_group = "<RESOURCE_GROUP>"
    workspace = "<AML_WORKSPACE_NAME>"
    ml_client = MLClient(credential, subscription_id, resource_group, workspace)

## Build environment

We would use Azure ML image and a conda yaml file to build an environment. More info about how to build environment could be found [here](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-manage-environments-v2?view=azureml-api-2&tabs=python).

As Ray requires exact version match of both `python` and `ray`, let's generate a `conda.yml` file matches current kernel.


In [None]:
import yaml
from platform import python_version

# Get and set python and ray version
python_version = python_version()
ray_version = '2.4.0'

conda = yaml.load(f"""
    name: ray-environment
    dependencies:
    - python={python_version}
    - pip:
        - ray[default, tune]=={ray_version}
        - torch
        - torchvision
""", Loader=yaml.CLoader)

# Write to conda.yml file
with open('conda.yml', 'w') as conda_file:
    yaml.dump(conda, conda_file, default_flow_style=False)


# Build environment using AzureML image and conda.yml we built
environment=Environment(
    image="mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.3-cudnn8-ubuntu20.04",
    conda_file="conda.yml"
)


### Configure and Run Command
In this section we will be configuring and running a `Command` job.

The `command` allows user to configure the following key aspects.
- `command` - This is the command that needs to be run. In this example, we would execute `sleep infinity` which would block the job to complete.
- `environment` - This is the environment needed for the command to run. In this example, we would use the environment we just build.
- `compute` - The compute on which the command will run. In this example, we specify the compute we created in the same vnet of current compute instance.
- `instance_count` - The number of nodes to use for the job. In this example, we would scale `2` nodes.
- `distribution` - Distribution configuration for distributed training scenarios. In this example, we would set it to `ray`. Azure ML job engine would setup Ray cluster automatically.
  - `port` - \[Optional\] The port of the head ray process. Default is `6379`
  - `address` - \[Optional\] The address of Ray head node.
  - `worker_node_additional_args` - \[Optional\] Additional arguments passed to ray start in worker node.

In [None]:
job = command(
    experiment_name="mnist_pytorch",
    command="sleep infinity",
    environment=environment,
    compute="gpu-cluster",
    instance_count=2,  # In this, only 2 node cluster was created.
    distribution={
        "type": "ray",
        "address": ray_instance.address_info["address"], # [Optional] The address of ray head node
        # "worker_node_additional_args": "--verbose", # [Optional] Additional arguments passed to ray start in head node.
    },
)

### Submit the job

By submitting the command job, Azure ML would scale up the compute cluster and connect to the head node.

In [None]:
active_job = ml_client.jobs.create_or_update(job)

active_job

## Prepare the training script
We would continue to use the same PyTorch example from Ray:
[https://github.com/ray-project/ray/blob/master/python/ray/tune/examples/mnist_pytorch.py](https://github.com/ray-project/ray/blob/master/python/ray/tune/examples/mnist_pytorch.py)

Script is downloaded into [src/mnist_pytorch.py](./src/mnist_pytorch.py)

We would run the application _interactively_ and see the output in real time.

In [None]:
%run src/mnist_pytorch.py --cuda

### Show Ray cluster resources

In [None]:
ray.cluster_resources()

## Shutdown the head and worker node

In [None]:
# shutdown head node
ray.shutdown()

# cancel worker job would automaticlaly shutdown worker node
poller = ml_client.jobs.begin_cancel(name=active_job.name)

# wait until job cancelled
poller.wait()