## Import required libraries

In [None]:
import argparse
from typing import Dict

import numpy as np
from sklearn.model_selection import train_test_split

import xgboost_ray as xgb
from xgboost.core import QuantileDMatrix

from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication

## Setup a distributed cluster

### Login to the Openshift cluster   

In [None]:
import os
from dotenv import load_dotenv

load_dotenv(override=True)
if os.environ["CLUSTER_ACCESS_TOKEN"] != "":
    auth = TokenAuthentication(
        token = os.environ["CLUSTER_ACCESS_TOKEN"],
        server = "https://api.mindaro.int.nefast.me:6443",
        skip_tls = True,
    )
    auth.login()

### Create a Ray cluster to distribute the workload

In [None]:
cluster_name = "xgboost-distributed-cluster"
cluster = Cluster(ClusterConfiguration(
    name=cluster_name,
    namespace="ray",
    head_cpu_requests='250m',
    head_cpu_limits=1,
    head_memory_requests='250Mi',
    head_memory_limits=6,
    num_workers=2,
    worker_cpu_requests='250m',
    worker_cpu_limits=1,
    worker_memory_requests='250Mi',
    worker_memory_limits=6,
    image="quay.io/modh/ray:2.44.1-py311-cu121",
    write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources
))

### Bring up the Ray cluster and print its status

In [None]:
cluster.up()
cluster.wait_ready()
cluster.details()

ray_dashboard_uri = cluster.cluster_dashboard_uri()
ray_cluster_uri = cluster.cluster_uri()
print(ray_dashboard_uri)
print(ray_cluster_uri)

### Bind Ray to the new cluster

In [None]:
from codeflare_sdk import generate_cert

# Create required TLS cert and export the environment variables to enable TLS
generate_cert.generate_tls_cert(cluster_name, cluster.config.namespace)
generate_cert.export_env(cluster_name, cluster.config.namespace)

import ray

ray_cluster_uri = "ray://rayclient-xgboost-distributed-cluster-ray.apps.mindaro.int.nefast.me"

# Reset the ray context in case there's already one.
print("Connecting to remote Ray cluster at: ", ray_cluster_uri)
ray.shutdown()

# Load dependencies to then install on the Ray cluster.
with open("compute.requirements.txt", "r") as f:
    requirements = f.read().splitlines()

runtime_env = {"pip": requirements}
ray.init(address=ray_cluster_uri, runtime_env=runtime_env)

print("Ray cluster is up and running: ", ray.is_initialized())

## Define the function that will have to be approximated through log. reg.

In [None]:
def f(x: np.ndarray) -> np.ndarray:
    """The function to predict."""
    return np.sin(x) * x

In [None]:
import xgboost_ray as xgb

"""Train a quantile regression model."""
rng = np.random.RandomState(1994)
# Generate a synthetic dataset for demo, the generate process is from the sklearn
# example.
X = np.atleast_2d(rng.uniform(0, 20.0, size=100000)).T
expected_y = f(X).ravel()

sigma = 0.5 + X.ravel() / 20.0
noise = rng.lognormal(sigma=sigma) - np.exp(sigma**2.0 / 2.0)
y = expected_y + noise

# Train on 0.05 and 0.95 quantiles. The model is similar to multi-class and
# multi-target models.
alpha = np.array([0.05, 0.25, 0.5, 0.75, 0.95])
evals_result: Dict[str, Dict] = {}

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=rng, test_size=0.1)
# We will be using the `hist` tree method, quantile DMatrix can be used to preserve
# memory (which has nothing to do with quantile regression itself, see its document
# for details).
# Do not use the `exact` tree method for quantile regression, otherwise the
# performance might drop.
Xy = xgb.RayDMatrix(X_train, y_train)
# use Xy as a reference
Xy_test = xgb.RayDMatrix(X_test, y_test, ref=Xy)

booster = xgb.train(
    {
        # Use the quantile objective function.
        "objective": "reg:quantileerror",
        "tree_method": "hist",
        "quantile_alpha": alpha,
        # Let's try not to overfit.
        "learning_rate": 0.04,
        "max_depth": 15,
    },
    Xy,
    num_boost_round=32,
    ray_params=xgb.RayParams(num_actors=2, cpus_per_actor=1),
    early_stopping_rounds=2,
    # The evaluation result is a weighted average across multiple quantiles.
    evals=[(Xy, "Train"), (Xy_test, "Test")],
    evals_result=evals_result,
)
xx = np.atleast_2d(np.linspace(0, 20, 100000)).T
scores = booster.inplace_predict(xx)
# dim 1 is the quantiles
assert scores.shape[0] == xx.shape[0]
assert scores.shape[1] == alpha.shape[0]

y_lower = scores[:, 0]  # alpha=0.05
y_med = scores[:, 1]  # alpha=0.5, median
y_upper = scores[:, 2]  # alpha=0.95

# Train a mse model for comparison
booster = xgb.train(
    {
        "objective": "reg:squarederror",
        "tree_method": "hist",
        # Let's try not to overfit.
        "learning_rate": 0.04,
        "max_depth": 15,
    },
    Xy,
    num_boost_round=32,
    early_stopping_rounds=2,
    evals=[(Xy, "Train"), (Xy_test, "Test")],
    evals_result=evals_result,
    ray_params=xgb.RayParams(num_actors=2, cpus_per_actor=1)
)
xx = np.atleast_2d(np.linspace(0, 20, 100000)).T
y_pred = booster.inplace_predict(xx)

## Plot the results

In [None]:
from matplotlib import pyplot as plt

fig = plt.figure(figsize=(20, 30))
plt.plot(xx, f(xx), "g:", linewidth=3, label=r"$f(x) = x\,\sin(x)$")
plt.plot(X_test, y_test, "b.", markersize=10, label="Test observations")
plt.plot(xx, y_med, "r-", label="Predicted median")
plt.plot(xx, y_pred, "m-", label="Predicted mean")
plt.plot(xx, y_upper, "k-")
plt.plot(xx, y_lower, "k-")
plt.fill_between(
    xx.ravel(), y_lower, y_upper, alpha=0.4, label="Predicted 90% interval"
)
plt.xlabel("$x$")
plt.ylabel("$f(x)$")
plt.ylim(-40, 40)
plt.legend(loc="upper left")
plt.show()