# Quickstart with Ray AI Runtime

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Generic/ray_logo.png" width="20%" loading="lazy">

## Preliminaries

### Install libraries

In [1]:
!pip install -U ray
!pip install -U xgboost_ray



### Imports

In [2]:
import ray
from ray.air.config import ScalingConfig
from ray.data.preprocessors import MinMaxScaler
from ray.train.xgboost import XGBoostTrainer

### Initialize Ray runtime

In [3]:
ray.init()

2025-03-16 19:18:20,139	INFO worker.py:1841 -- Started a local Ray instance.


0,1
Python version:,3.11.11
Ray version:,2.43.0


## Load and prepare data with Ray Datasets

### Read Parquet file to Ray Dataset

In [4]:
dataset = ray.data.read_parquet(
    "s3://anyscale-training-data/intro-to-ray-air/nyc_taxi_2021.parquet"
)

Parquet Files Sample 0:   0%|          | 0.00/1.00 [00:00<?, ? file/s]

Returned `dataset` is [Ray Dataset](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html#ray-data-dataset) - standard way to load and exchange data in Ray AI Runtime.

In AIR, Datasets are used extensively for data loading and transformation. They are meant as a last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray.

### Split data into training and validation subsets

In [5]:
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

2025-03-16 19:18:32,637	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-03-16_19-18-14_958222_3663/logs/ray-data
2025-03-16 19:18:32,638	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet]


Running 0: 0.00 row [00:00, ? row/s]

- ReadParquet->SplitBlocks(147) 1: 0.00 row [00:00, ? row/s]

### Split datasets into blocks for parallel preprocessing

In [11]:
train_dataset = train_dataset.repartition(num_blocks=3)
valid_dataset = valid_dataset.repartition(num_blocks=3)

# Fit and transform the preprocessor on the training dataset
train_dataset = preprocessor.fit_transform(train_dataset)
# Transform the validation dataset using the fitted preprocessor
valid_dataset = preprocessor.transform(valid_dataset)

2025-03-16 19:22:01,671	INFO dataset.py:2787 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2025-03-16 19:22:01,678	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-03-16_19-18-14_958222_3663/logs/ray-data
2025-03-16 19:22:01,679	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Repartition] -> AllToAllOperator[Repartition] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=1]


Running 0: 0.00 row [00:00, ? row/s]

- Repartition 1: 0.00 row [00:00, ? row/s]

Split Repartition 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- Repartition 3: 0.00 row [00:00, ? row/s]

Split Repartition 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- Aggregate 5: 0.00 row [00:00, ? row/s]

Sort Sample 6:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Map 7:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Reduce 8:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- limit=1 9: 0.00 row [00:00, ? row/s]

`num_blocks` should be lower than number of cores in the cluster

### Define a preprocessor to normalize the columns by their range

In [7]:
preprocessor = MinMaxScaler(columns=["trip_distance", "trip_duration"])

[Preprocessors](https://docs.ray.io/en/latest/ray-air/key-concepts.html#preprocessors) are primitives that transform input data into features. They operate on Datasets, making them scalable and compatible with a variety of datasources and dataframe libraries.

Ray AI Runtime comes with a collection of built-in preprocessors, and you can also define your own with simple templates (see [Using preprocessors](https://docs.ray.io/en/latest/ray-air/preprocessors.html) for more information).

## Train the model with Ray Train

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Scaling_model_training/data_parallelism.png" width="50%" loading="lazy">|
|:--|
|Ray Train provides distributed data parallel training capabilities. A large dataset is sharded across multiple worker nodes each containing a model copy. Gradients calculated on independent nodes are continuously synchronized with others to produce a final trained model.|

### Create XGBoost trainer

In [8]:
trainer = XGBoostTrainer(
    label_column="is_big_tip",
    num_boost_round=100,
    scaling_config=ScalingConfig(
        use_gpu=False,  # True for the GPU training, 1 GPU per worker
    ),
    params={
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "tree_method": "approx",  # use "gpu_hist" for GPU training
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    #preprocessor=preprocessor,
)

During training, `trainer` will use `num_blocks` workers, defined when repartitioning dataset.

Ray AI Runtime comes with built-in integrations with mang popular ML projects like PyTorch, Keras, LightGBM and more. Refer to the [Ray Train docs](https://docs.ray.io/en/latest/train/train.html#quick-start-to-distributed-training-with-ray-train) for more details. Optionally, read more about the Ray-XGBoost integration in the [Introducing Distributed XGBoost Training with Ray](https://www.anyscale.com/blog/distributed-xgboost-training-with-ray) blog post.

### Invoke training - this is computationally intensive operation

In [9]:
result = trainer.fit()

2025-03-16 19:20:07,810	INFO tensorboardx.py:193 -- pip install "ray[tune]" to see TensorBoard files.



View detailed results here: /root/ray_results/XGBoostTrainer_2025-03-16_19-20-07

Training started without custom configuration.


[36m(XGBoostTrainer pid=5970)[0m Started distributed worker processes: 
[36m(XGBoostTrainer pid=5970)[0m - (node_id=7b53d495479f92d3f5f9a6990397e19533a4aeb55c5bb3fcb36baba6, ip=172.28.0.12, pid=6029) world_rank=0, local_rank=0, node_rank=0
[36m(RayTrainWorker pid=6029)[0m [19:20:18] Task [xgboost.ray-rank=00000000]:4fb05cb982867324148f564101000000 got rank 0


[2m[36m(pid=6071) [0mRunning 0: 0.00 row [00:00, ? row/s]

[2m[36m(pid=6071) [0m- Repartition 1: 0.00 row [00:00, ? row/s]

[2m[36m(pid=6071) [0mSplit Repartition 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

[2m[36m(pid=6071) [0m- split(1, equal=True) 3: 0.00 row [00:00, ? row/s]

[36m(SplitCoordinator pid=6071)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-03-16_19-18-14_958222_3663/logs/ray-data
[36m(SplitCoordinator pid=6071)[0m Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Repartition] -> OutputSplitter[split(1, equal=True)]


[2m[36m(pid=6072) [0mRunning 0: 0.00 row [00:00, ? row/s]

[2m[36m(pid=6072) [0m- Repartition 1: 0.00 row [00:00, ? row/s]

[2m[36m(pid=6072) [0mSplit Repartition 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

[2m[36m(pid=6072) [0m- split(1, equal=True) 3: 0.00 row [00:00, ? row/s]

2025-03-16 19:20:23,557	ERROR tune_controller.py:1331 -- Trial task failed for trial XGBoostTrainer_ac0b4_00000
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/ray/_private/worker.py", line 2771, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/ray/_private/worker.py",


Training errored after 0 iterations at 2025-03-16 19:20:23. Total running time: 15s
Error file: /tmp/ray/session_2025-03-16_19-18-14_958222_3663/artifacts/2025-03-16_19-20-07/XGBoostTrainer_2025-03-16_19-20-07/driver_artifacts/XGBoostTrainer_ac0b4_00000_0_2025-03-16_19-20-07/error.txt



TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: `trainer = XGBoostTrainer.restore("/root/ray_results/XGBoostTrainer_2025-03-16_19-20-07")`.
To start a new run that will retry on training failures, set `train.RunConfig(failure_config=train.FailureConfig(max_failures))` in the Trainer's `run_config` with `max_failures > 0`, or `max_failures = -1` for unlimited retries.

The resulting object grants access to metrics, checkpoints, and errors

### Report results

In [10]:
print(f"train acc = {1 - result.metrics['train-error']:.4f}")
print(f"valid acc = {1 - result.metrics['valid-error']:.4f}")
print(f"iteration = {result.metrics['training_iteration']}")

NameError: name 'result' is not defined

## Shutdown Ray runtime

In [None]:
ray.shutdown()

Disconnect the worker and terminate processes started by `ray.init()`.

# Connect with the Ray community

You can learn and get more involved with the Ray community of developers and researchers:

* [**Ray documentation**](https://docs.ray.io/en/latest)

* [**Official Ray site**](https://www.ray.io/)  
Browse the ecosystem and use this site as a hub to get the information that you need to get going and building with Ray.

* [**Join the community on Slack**](https://forms.gle/9TSdDYUgxYs8SA9e8)  
Find friends to discuss your new learnings in our Slack space.

* [**Use the discussion board**](https://discuss.ray.io/)  
Ask questions, follow topics, and view announcements on this community forum.

* [**Join a meetup group**](https://www.meetup.com/Bay-Area-Ray-Meetup/)  
Tune in on meet-ups to listen to compelling talks, get to know other users, and meet the team behind Ray.

* [**Open an issue**](https://github.com/ray-project/ray/issues/new/choose)  
Ray is constantly evolving to improve developer experience. Submit feature requests, bug-reports, and get help via GitHub issues.

* [**Become a Ray contributor**](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html)  
We welcome community contributions to improve our documentation and Ray framework.

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Generic/ray_logo.png" width="20%" loading="lazy">