
### Quick examples to demostrate AML Ray/Dask cluster usage

### Interactive mode

Install at this your conda notebook environment to run interactive examples

In [None]:
pip install --upgrade adlfs ray==2.2.0 ray[air]==2.2.0 ray[data]==2.2.0 azure-ai-ml ray-on-aml 

In [None]:
#for torch
# conda install -y -c pytorch -c conda-forge cudatoolkit=11.1 pytorch torchvision torchaudio


Start ML client 

In [None]:
from azure.ai.ml import MLClient
from azure.ai.ml import command, Input
from azure.identity import DefaultAzureCredential
# Enter details of your AML workspace
subscription_id = "840b5c5c-3f4a-459a-94fc-6bad2a969f9d"
resource_group = "ml"
workspace = "ws02ent"
# get a handle to the workspace
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)


##### Please make sure to create compute cluster in the same vnet with your compute instance. You need to have vnet, otherwise compute cannot communicate with each other


In [None]:
from ray_on_aml.core import Ray_On_AML

# from src.ray_on_aml.core import Ray_On_AML

import logging
ray_on_aml =Ray_On_AML(ml_client=ml_client, compute_cluster ="d13")
#Note that if you need to customize the pip installation of the cluster, you also needs to support the ray package e.g. ray[data] which 
#match the version of the ray package(s) in your compute instance. If you don't specify pip_packages then ray[default] is inserted 
#automatically

ray = ray_on_aml.getRay(num_node=2,pip_packages=["ray[air]==2.2.0","ray[data]==2.2.0","torch==1.13.0","fastparquet==2022.12.0", 
"azureml-mlflow==1.48.0", "pyarrow==6.0.1", "dask==2022.12.0", "adlfs==2022.11.2", "fsspec==2022.11.0"])
client = ray.init(f"ray://{ray_on_aml.headnode_private_ip}:10001")

#if you want to use CI as head node set ci_is_head=True as in ray_on_aml.getRay(ci_is_head=True)

In [None]:
ray.cluster_resources()
#if may take some time for all cluster resources to show up, run this again if you don't see all resources

#### Dask on Ray

##### You can use Dask on this Ray cluster by telling Dask to use Ray as the scheduler. By doing this, you will have a cluster with both Dask and Ray without having to setup them saperately

In [None]:
#Scaling up date with Dask dataframe API.
#Please make sure you have pandas version 1.4+ and restart to run this successfully.
import dask
from ray.util.dask import enable_dask_on_ray
enable_dask_on_ray()

import dask.dataframe as dd

storage_options = {'account_name': 'azureopendatastorage'}
ddf = dd.read_parquet('az://nyctlc/green/puYear=2015/puMonth=*/*.parquet', storage_options=storage_options)
ddf.count().compute()


### Ray Dataset

In [None]:
#Ray also support Ray dataset. You can read into ray dataset then convert to Dask or other ML format which is convenient for ML training.https://docs.ray.io/en/latest/data/dataset.html
# you may need to upgrade pyarrow to run this successfully
from adlfs import AzureBlobFileSystem

abfs = AzureBlobFileSystem(account_name="azureopendatastorage",  container_name="isdweatherdatacontainer")
#if read all years and months
# data = ray.data.read_parquet("az://isdweatherdatacontainer/ISDWeather//", filesystem=abfs)
data =ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2015/"], filesystem=abfs)
data.count()
# 1,584,481,119 is the count for all data 


In [None]:
#disconnect your client and shutdown ray cluster after use
client.disconnect()
ray_on_aml.shutdown()

### Mount remote data to your ray cluster

In [None]:
#Example of using input and output for interactive job. You need to define adlsstore0001 in your Azure ML workspace first
from azure.ai.ml import command, Input, Output
from ray_on_aml.core import Ray_On_AML
import logging
ray_on_aml =Ray_On_AML(ml_client=ml_client, compute_cluster ="ds11", verbosity=logging.INFO )
#Define inputs and/or outputs and setup ray cluster & client
inputs={

    "ISDWeather": Input(
        type="uri_folder",
        path="azureml://datastores/adlsstore0001/paths/ISDWeather/year=2008",
    )
}

outputs={
    "output1": Output(
        type="uri_folder",
        path="azureml://datastores/adlsstore0001/paths/dev",
    ),
    "output2": Output(
        type="uri_folder",
        path="azureml://datastores/adlsstore0001/paths/dev",
    )
}

ray = ray_on_aml.getRay(inputs = inputs,outputs=outputs, num_node=2,
pip_packages=["ray[air]==2.2.0","ray[data]==2.2.0","torch==1.13.0","fastparquet==2022.12.0", 
"azureml-mlflow==1.48.0", "pyarrow==6.0.1", "dask==2022.2.0", "adlfs==2022.11.2", "fsspec==2022.11.0"])
client = ray.init(f"ray://{ray_on_aml.headnode_private_ip}:10001")


In [None]:
#Now you can refer to the mount points with assigned name to use 
data = ray.data.read_parquet(ray_on_aml.mount_points['ISDWeather'])
data.count()

In [None]:
#disconnect your client and shutdown ray cluster after use
client.disconnect()
ray_on_aml.shutdown()

#### Ray Tune for distributed ML tunning

Pls use azureml_py38_PT_TF environment. It has stable tensorflow and pytorch installation and install following additional library

In [None]:
pip install ray[air]==2.2.0 xgboost xgboost_ray

In [None]:
from ray_on_aml.core import Ray_On_AML
ray_on_aml =Ray_On_AML(ml_client=ml_client, compute_cluster ="d13")


# For use as client mode, uncomment these lines
ray = ray_on_aml.getRay(ci_is_head=True, num_node=2,pip_packages=["ray[air]==2.2.0","torch==1.13.0","xgboost", "xgboost_ray","azureml-mlflow==1.48.0", "pyarrow==6.0.1"])



Train ML with Ray[AIR] and pytorch

In [None]:
# import ray

# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")

# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

# Create a test dataset by dropping the target column.
test_dataset = valid_dataset.drop_columns(cols=["target"])

In [None]:
import numpy as np

from ray.data.preprocessors import Concatenator, Chain, StandardScaler

# Create a preprocessor to scale some columns and concatenate the result.
preprocessor = Chain(
    StandardScaler(columns=["mean radius", "mean texture"]),
    Concatenator(exclude=["target"], dtype=np.float32),
)

In [None]:
import torch
import torch.nn as nn

from ray import train
from ray.air import session
from ray.air.config import ScalingConfig
from ray.train.torch import TorchCheckpoint, TorchTrainer


def create_model(input_features):
    return nn.Sequential(
        nn.Linear(in_features=input_features, out_features=16),
        nn.ReLU(),
        nn.Linear(16, 16),
        nn.ReLU(),
        nn.Linear(16, 1),
        nn.Sigmoid(),
    )


def train_loop_per_worker(config):
    batch_size = config["batch_size"]
    lr = config["lr"]
    epochs = config["num_epochs"]
    num_features = config["num_features"]

    # Get the Ray Dataset shard for this data parallel worker,
    # and convert it to a PyTorch Dataset.
    train_data = session.get_dataset_shard("train")
    # Create model.
    model = create_model(num_features)
    model = train.torch.prepare_model(model)

    loss_fn = nn.BCELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    for cur_epoch in range(epochs):
        for batch in train_data.iter_torch_batches(
            batch_size=batch_size, dtypes=torch.float32
        ):
            # "concat_out" is the output column of the Concatenator.
            inputs, labels = batch["concat_out"], batch["target"]
            optimizer.zero_grad()
            predictions = model(inputs)
            train_loss = loss_fn(predictions, labels.unsqueeze(1))
            train_loss.backward()
            optimizer.step()
        loss = train_loss.item()
        session.report({"loss": loss}, checkpoint=TorchCheckpoint.from_model(model))


num_features = len(train_dataset.schema().names) - 1

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={
        "batch_size": 128,
        "num_epochs": 20,
        "num_features": num_features,
        "lr": 0.001,
    },
    scaling_config=ScalingConfig(
        num_workers=3,  # Number of workers to use for data parallelism.
        use_gpu=False,
        trainer_resources={"CPU": 0},  # so that the example works on Colab.
    ),
    datasets={"train": train_dataset},
    preprocessor=preprocessor,
)
# Execute training.
result = trainer.fit()
print(f"Last result: {result.metrics}")
# Last result: {'loss': 0.6559339960416158, ...}

#### Distributed XGBoost https://docs.ray.io/en/latest/xgboost-ray.html

In [None]:
from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

seed = 42

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=0.25, random_state=42
)

clf = RayXGBClassifier(
    n_jobs=10,  # In XGBoost-Ray, n_jobs sets the number of actors
    random_state=seed
)

# scikit-learn API will automatically conver the data
# to RayDMatrix format as needed.
# You can also pass X as a RayDMatrix, in which case
# y will be ignored.

clf.fit(X_train, y_train)

pred_ray = clf.predict(X_test)
print(pred_ray.shape)

pred_proba_ray = clf.predict_proba(X_test)
print(pred_proba_ray.shape)

# It is also possible to pass a RayParams object
# to fit/predict/predict_proba methods - will override
# n_jobs set during initialization

clf.fit(X_train, y_train, ray_params=RayParams(num_actors=10))

pred_ray = clf.predict(X_test, ray_params=RayParams(num_actors=10))
print(pred_ray.shape)


In [None]:
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)

evals_result = {}
bst = train(
    {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train")],
    verbose_eval=False,
    ray_params=RayParams(
        num_actors=10,  # Number of remote actors
        cpus_per_actor=1))

bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
    evals_result["train"]["error"][-1]))


### Reinforcement Learning

##### Install library at compute instance: 

In [None]:
pip install gym dm-tree


In [4]:
#Install additional library at Ray cluster
from ray_on_aml.core import Ray_On_AML
ray_on_aml =Ray_On_AML(ml_client=ml_client, compute_cluster ="d13")


# For use as client mode, uncomment these lines
ray = ray_on_aml.getRay(ci_is_head=True, num_node=2,pip_packages=["ray[air]==2.2.0","torch==1.13.0","xgboost", "xgboost_ray","azureml-mlflow==1.48.0", "pyarrow==6.0.1",
'gym','dm-tree','scikit-image','opencv-python','tensorflow'])


ray.cluster_resources()

.

In [None]:
# Import the RL algorithm (Trainer) we would like to use.
from ray.rllib.agents.ppo import PPOTrainer

# Configure the algorithm.
config = {
    # Environment (RLlib understands openAI gym registered strings).
    "env": "Taxi-v3",
    # Use 2 environment workers (aka "rollout workers") that parallelly
    # collect samples from their own environment clone(s).
    "num_workers": 2,
    # Change this to "framework: torch", if you are using PyTorch.
    # Also, use "framework: tf2" for tf2.x eager execution.
    "framework": "torch",
    # Tweak the default model provided automatically by RLlib,
    # given the environment's observation- and action spaces.
    "model": {
        "fcnet_hiddens": [64, 64],
        "fcnet_activation": "relu",
    },
    # Set up a separate evaluation worker set for the
    # `trainer.evaluate()` call after training (see below).
    "evaluation_num_workers": 1,
    # Only for evaluation runs, render the env.
    "evaluation_config": {
        "render_env": True,
    }
}

# Create our RLlib Trainer.
trainer = PPOTrainer(config=config)

# Run it for n training iterations. A training iteration includes
# parallel sample collection by the environment workers as well as
# loss calculation on the collected batch and a model update.
for _ in range(3):
    print(trainer.train())

# Evaluate the trained Trainer (and render each timestep to the shell's
# output).
trainer.evaluate()



##### Shutdown interactive cluster when not used


In [None]:
ray_on_aml.shutdown()


### Ray on Job Cluster 

You can ray this example with either SDK v1 or with CLI v2

SDK V1

In [2]:
#this example uses v1 sdk but you can also use v2 or CLI to submit the job
from azureml.core import Workspace, Experiment, Environment,ScriptRunConfig
# from azureml.widgets import RunDetails
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DockerConfiguration,RunConfiguration

#Remember the AML job has to have distribted setings (MPI type) for ray-on-aml to work correctly.
ws = Workspace.from_config()
compute_cluster = 'nc6s' #This can be another cluster different from the interactive cluster. 
ray_cluster = ComputeTarget(workspace=ws, name=compute_cluster)

aml_run_config_ml = RunConfiguration(communicator='OpenMpi')
docker_config = DockerConfiguration(use_docker=True, shm_size='24g')


rayEnv = Environment.from_conda_specification(name = "RLEnv",
                                             file_path = "job/conda_env.yml")
rayEnv.docker.base_image = "mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.1-cudnn8-ubuntu18.04:20211221.v1"

aml_run_config_ml.target = ray_cluster
aml_run_config_ml.node_count = 2
aml_run_config_ml.environment = rayEnv
aml_run_config_ml.docker =docker_config

src = ScriptRunConfig(source_directory='../examples/job',
                    script='aml_job.py',
                    run_config = aml_run_config_ml,
                   )

run = Experiment(ws, "rl_on_aml_job").submit(src)


For CLI v2 run this in a command line:
az ml job create -f job/job.yml

In [3]:
from azureml.widgets import RunDetails
RunDetails(run).show()


_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

In [None]:
run.cancel()