<a href="https://colab.research.google.com/github/Praveen76/Introduction-to-RAY/blob/main/Introduction_to_Ray.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Learning Objectives

At the end of the experiment, you will be able to:

* load the data into RayDMatrix
* train the XGBoost Ray model and save it
* tune the Hyperparameters using Ray tune

## Introduction

Compute demands for machine learning (ML) training have grown 10x every
18 months since 2010. Over the same time period, the compute capabilities of
AI accelerators such as GPUs and TPUs have less than doubled. This means
that every year and a half organizations need 5x more AI accelerators/nodes
to train the latest ML models and leverage cutting edge ML capabilities.
Distributed computing is the only way to meet these requirements.

While solutions such as AWS SageMaker and GCP Vertex AI have emerged
to help organizations deal with scaling AI workloads, these solutions put
significant constraints on how applications are developed and which libraries
they can use. This makes it difficult to keep up with the latest models and
algorithms, and freely integrate with the rapidly evolving open ML ecosystem.

Ray, addresses these challenges head on by
allowing ML engineers and developers to scale their workloads effortlessly
from their laptops to the cloud without the need to build complex compute
infrastructures.

### <img src='https://global.discourse-cdn.com/business7/uploads/ray/original/1X/8f4dcb72f7cd34e2a332d548bd65860994bc8ff1.png' width=20px> **Ray**

Ray is an open-source unified framework for scaling AI and Python applications like machine learning. It provides the compute layer for parallel processing and reduce the need of a distributed systems expert. Ray minimizes the complexity of running your distributed individual and end-to-end machine learning workflows with these components:

- Scalable libraries for common machine learning tasks such as data preprocessing, distributed training, hyperparameter tuning, reinforcement learning, and model serving.

- Pythonic distributed computing primitives for parallelizing and scaling Python applications.

- Integrations and utilities for integrating and deploying a Ray cluster with existing tools and infrastructure such as Kubernetes, AWS, GCP, and Azure.
<br>

Some common ML workloads that individuals, organizations, and companies leverage Ray to build their AI applications include:

- Batch inference on CPUs and GPUs
- Parallel training
- Model serving
- Distributed training of large models
- Parallel hyperparameter tuning experiments
- Reinforcement learning
- ML platform


### **Ray framework**

<center>
<img src="https://cdn.iisc.talentsprint.com/AIandMLOps/Images/Ray_framework.png" width=500px></center>
<br><br>

Ray's unified compute framework consists of three layers:

- ***Ray AI Libraries:*** An open-source, Python, domain-specific set of libraries that equip ML engineers, data scientists, and researchers with a scalable and unified toolkit for ML applications.

- ***Ray Core:*** An open-source, Python, general purpose, distributed computing library that enables ML engineers and Python developers to scale Python applications and accelerate machine learning workloads.

- ***Ray Clusters:*** A set of worker nodes connected to a common Ray head node. Ray clusters can be fixed-size, or they can autoscale up and down according to the resources requested by applications running on the cluster.

<br>

Each of Ray's five native libraries distributes a specific ML task:

- **`Data`**: Scalable, framework-agnostic data loading and transformation across training, tuning, and prediction

- **`Train`**: Distributed multi-node and multi-core model training with fault tolerance that integrates with popular training libraries

- **`Tune`**: Scalable hyperparameter tuning to optimize model performance

- **`Serve`**: Scalable and programmable serving to deploy models for online inference, with optional microbatching to improve performance

- **`RLlib`**: Scalable distributed reinforcement learning workloads


Find the official Ray website [here](https://www.ray.io/), and its documentation [here](https://docs.ray.io/en/latest/ray-overview/index.html).

### Setup Steps:

In [None]:
#@title Please enter your registration id to start: { run: "auto", display-mode: "form" }
Id = "" #@param {type:"string"}

In [None]:
#@title Please enter your password (your registered phone number) to continue: { run: "auto", display-mode: "form" }
password = "" #@param {type:"string"}

In [None]:
#@title Run this cell to complete the setup for this Notebook
from IPython import get_ipython

ipython = get_ipython()

notebook= "M8_AST_05_Distributed_XGBoost_with_Ray_C" #name of the notebook

def setup():
#  ipython.magic("sx pip3 install torch")

    from IPython.display import HTML, display
    display(HTML('<script src="https://dashboard.talentsprint.com/aiml/record_ip.html?traineeId={0}&recordId={1}"></script>'.format(getId(),submission_id)))
    print("Setup completed successfully")
    return

def submit_notebook():
    ipython.magic("notebook -e "+ notebook + ".ipynb")

    import requests, json, base64, datetime

    url = "https://dashboard.talentsprint.com/xp/app/save_notebook_attempts"
    if not submission_id:
      data = {"id" : getId(), "notebook" : notebook, "mobile" : getPassword()}
      r = requests.post(url, data = data)
      r = json.loads(r.text)

      if r["status"] == "Success":
          return r["record_id"]
      elif "err" in r:
        print(r["err"])
        return None
      else:
        print ("Something is wrong, the notebook will not be submitted for grading")
        return None

    elif getAnswer() and getComplexity() and getAdditional() and getConcepts() and getComments() and getMentorSupport():
      f = open(notebook + ".ipynb", "rb")
      file_hash = base64.b64encode(f.read())

      data = {"complexity" : Complexity, "additional" :Additional,
              "concepts" : Concepts, "record_id" : submission_id,
              "answer" : Answer, "id" : Id, "file_hash" : file_hash,
              "notebook" : notebook,
              "feedback_experiments_input" : Comments,
              "feedback_mentor_support": Mentor_support}
      r = requests.post(url, data = data)
      r = json.loads(r.text)
      if "err" in r:
        print(r["err"])
        return None
      else:
        print("Your submission is successful.")
        print("Ref Id:", submission_id)
        print("Date of submission: ", r["date"])
        print("Time of submission: ", r["time"])
        print("View your submissions: https://aimlops-iisc.talentsprint.com/notebook_submissions")
        #print("For any queries/discrepancies, please connect with mentors through the chat icon in LMS dashboard.")
        return submission_id
    else: submission_id


def getAdditional():
  try:
    if not Additional:
      raise NameError
    else:
      return Additional
  except NameError:
    print ("Please answer Additional Question")
    return None

def getComplexity():
  try:
    if not Complexity:
      raise NameError
    else:
      return Complexity
  except NameError:
    print ("Please answer Complexity Question")
    return None

def getConcepts():
  try:
    if not Concepts:
      raise NameError
    else:
      return Concepts
  except NameError:
    print ("Please answer Concepts Question")
    return None


# def getWalkthrough():
#   try:
#     if not Walkthrough:
#       raise NameError
#     else:
#       return Walkthrough
#   except NameError:
#     print ("Please answer Walkthrough Question")
#     return None

def getComments():
  try:
    if not Comments:
      raise NameError
    else:
      return Comments
  except NameError:
    print ("Please answer Comments Question")
    return None


def getMentorSupport():
  try:
    if not Mentor_support:
      raise NameError
    else:
      return Mentor_support
  except NameError:
    print ("Please answer Mentor support Question")
    return None

def getAnswer():
  try:
    if not Answer:
      raise NameError
    else:
      return Answer
  except NameError:
    print ("Please answer Question")
    return None


def getId():
  try:
    return Id if Id else None
  except NameError:
    return None

def getPassword():
  try:
    return password if password else None
  except NameError:
    return None

submission_id = None
### Setup
if getPassword() and getId():
  submission_id = submit_notebook()
  if submission_id:
    setup()
else:
  print ("Please complete Id and Password cells before running setup")



### Install necessary packages

In [None]:
!pip -q install ray
!pip -q install ray[tune]
!pip -q install xgboost_ray

### Import necessary libraries

In [None]:
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

import xgboost as xgb
from xgboost_ray import RayDMatrix, RayParams, train, predict

from ray import tune
from ray import train as raytrain

### Load the data

In [None]:
train_x, train_y = load_breast_cancer(return_X_y=True)
train_x.shape, train_y.shape

In [None]:
train_x

In [None]:
train_y

### XGBoost-Ray uses the same API as core XGBoost

There are only two differences:

* Instead of using a `xgboost.DMatrix`, it uses `xgboost_ray.RayDMatrix` object

* There is an additional `ray_params` parameter that is used to configure distributed training (it takes a `RayParams` object)

**Data loading**

Data is passed to XGBoost-Ray via a `RayDMatrix` object.

The `RayDMatrix` lazy loads data and stores it sharded in the Ray object store. The Ray XGBoost actors then access these shards to run their training on.

A `RayDMatrix` support various data and file types, like Pandas DataFrames, Numpy Arrays, CSV files and Parquet files.

In [None]:
train_set = RayDMatrix(train_x, train_y)

In [None]:
ray_params = RayParams(num_actors = 2,               # Number of remote actors
                       cpus_per_actor = 1
                       )

### Train the XGBoost Ray model and save it

In [None]:
evals_result = {}
bst = train(
    params={
        "objective": "binary:logistic",                # tells XGBoost that we aim to train a logistic regression model for a binary classification task
        "eval_metric": ["logloss", "error"],
    },
    dtrain=train_set,
    evals_result=evals_result,
    evals=[(train_set, "train")],
    verbose_eval=False,
    ray_params=ray_params)

bst.save_model("model.xgb")

### Final training error

In [None]:
print("Final training error: {:.4f}".format(evals_result["train"]["error"][-1]))
print("Final training accuracy: {:.4f}".format(1 - evals_result["train"]["error"][-1]))

### Prediction

Here, we will create an object of regular non-distributed API instance i.e. `xgboost.Booster`, and pass the saved XGBoost-Ray model.

In [None]:
dpred = RayDMatrix(train_x, train_y)

bst = xgb.Booster(model_file="model.xgb")                    # non-distributed XGBoost API instance

pred_ray = predict(bst,
                   dpred,
                   ray_params = RayParams(num_actors=2)      # The data will be split across two actors. The result array will integrate this data in the correct order.
                   )

print(pred_ray)

In [None]:
# Convert model output to labels
prediction = [int(i > 0.5) for i in pred_ray]
print(prediction)

In [None]:
from sklearn.metrics import accuracy_score

accuracy_score(prediction, train_y)

## Hyperparameter Tuning with Ray Tune

By using tuning libraries such as **Ray Tune** we can try out combinations of hyperparameters. Using sophisticated search strategies, these parameters can be selected so that they are likely to lead to good results (avoiding an expensive exhaustive search).

Also, trials that do not perform well can be preemptively stopped to reduce waste of computing resources. Ray Tune also takes care of training these runs in parallel, greatly increasing search speed.

**Steps:**
1. Put the non-distributed XGBoost training call into a function accepting parameter configurations (`train_breast_cancer_model()` in the example below)

2. Define the parameter search space (`config` dictionary)

3. Create `tune.Tuner()` object:
    * pass training call function
    * pass tuning configuration `tune.TuneConfig()`
        * `num_samples`: number of different hyperparameter configurations from the search space
        * `metric`: the metric to optimized
        * `mode`: should either be min or max, depending on whether the metric is to be minimized or maximized
    * pass parameter search space

4. Call `tuner.fit()`

In [None]:
# Function for XGBoost training
def train_breast_cancer_model(config):
    # Load dataset
    data, labels = load_breast_cancer(return_X_y=True)
    # Split into train and test set
    train_x, test_x, train_y, test_y = train_test_split(data, labels, test_size=0.25)

    # Build input matrices for XGBoost
    train_set = xgb.DMatrix(train_x, label=train_y)
    test_set = xgb.DMatrix(test_x, label=test_y)

    # Train the classifier
    results = {}
    xgb.train(
        params=config,
        dtrain=train_set,
        evals=[(test_set, "eval")],
        evals_result=results,
        verbose_eval=False,
    )
    # Return prediction accuracy
    accuracy = 1.0 - results["eval"]["error"][-1]
    raytrain.report({"mean_accuracy": accuracy, "done": True})        #  instead of returning the accuracy value, we report it back to Tune using session.report()


# Define the parameter search space
config = {
    "objective": "binary:logistic",                            # tells XGBoost that we aim to train a logistic regression model for a binary classification task
    "eval_metric": ["logloss", "error"],
    "max_depth": tune.randint(1, 9),                           # hyperparameter    'tune.randint(min, max)' chooses a random integer value between min and max
    "min_child_weight": tune.choice([1, 2, 3]),                # hyperparameter    'tune.choice([a, b, c])' chooses one of the items of the list at random
    "subsample": tune.uniform(0.5, 1.0),                       # hyperparameter    'tune.uniform(min, max)' samples a floating point number between min and max
    "eta": tune.loguniform(1e-4, 1e-1),                        # hyperparameter    'tune.loguniform(min, max, base=10)' samples a floating point number between min and max,
                                                               #                    but applies a logarithmic transformation to these boundaries first
    }

tuner = tune.Tuner(
    train_breast_cancer_model,
    tune_config = tune.TuneConfig(num_samples=10,              # sample 10 different hyperparameter configurations from the search space
                                  metric="mean_accuracy",      # the metric to optimized
                                  mode="max"                   # the mode should either be min or max, depending on whether the metric is to be minimized or maximized
                                  ),
    param_space = config                                       # parameter search space
)

results = tuner.fit()

In [None]:
# Best hyperparameters
best_params = results.get_best_result().config
best_params

In [None]:
# All trial results
df = results.get_dataframe()
df.head(2)