In [4]:
from pprint import pprint
import ray
ray.init()
pprint(ray.cluster_resources())

2024-10-30 16:39:10,396	INFO worker.py:1807 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


{'CPU': 24.0,
 'GPU': 1.0,
 'accelerator_type:G': 1.0,
 'memory': 3689597339.0,
 'node:172.20.221.186': 1.0,
 'node:__internal_head__': 1.0,
 'object_store_memory': 1844798668.0}


In [6]:
from typing import Tuple

import ray
from ray.data import Dataset, Preprocessor
from ray.data.preprocessors import StandardScaler
from ray.train.xgboost import XGBoostTrainer
from ray.train import Result, ScalingConfig
import xgboost
use_gpu = False
num_workers = 5

2024-10-30 16:47:20,557	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [7]:
def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
    dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
    train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
    test_dataset = valid_dataset.drop_columns(["target"])
    return train_dataset, valid_dataset, test_dataset

In [8]:
def train_xgboost(num_workers: int, use_gpu: bool = False) -> Result:
    train_dataset, valid_dataset, _ = prepare_data()

    # Scale some random columns
    columns_to_scale = ["mean radius", "mean texture"]
    preprocessor = StandardScaler(columns=columns_to_scale)
    train_dataset = preprocessor.fit_transform(train_dataset)
    valid_dataset = preprocessor.transform(valid_dataset)

    # XGBoost specific params
    params = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    }

    trainer = XGBoostTrainer(
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
        label_column="target",
        params=params,
        datasets={"train": train_dataset, "valid": valid_dataset},
        num_boost_round=100,
        metadata = {"preprocessor_pkl": preprocessor.serialize()}
    )
    result = trainer.fit()
    print(result.metrics)

    return result

In [9]:
import pandas as pd
from ray.train import Checkpoint


class Predict:

    def __init__(self, checkpoint: Checkpoint):
        self.model = XGBoostTrainer.get_model(checkpoint)
        self.preprocessor = Preprocessor.deserialize(checkpoint.get_metadata()["preprocessor_pkl"])

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        preprocessed_batch = self.preprocessor.transform_batch(batch)
        dmatrix = xgboost.DMatrix(preprocessed_batch)
        return {"predictions": self.model.predict(dmatrix)}


def predict_xgboost(result: Result):
    _, _, test_dataset = prepare_data()

    scores = test_dataset.map_batches(
        Predict, 
        fn_constructor_args=[result.checkpoint], 
        concurrency=1, 
        batch_format="pandas"
    )
    
    predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
    print(f"PREDICTED LABELS")
    predicted_labels.show()

In [10]:
result = train_xgboost(num_workers=10, use_gpu=False)

2024-10-30 16:48:19,637	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-30_16-39-09_402576_721550/logs/ray-data
2024-10-30 16:48:19,638	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
                                                                                                                  
✔️  Dataset execution finished in 11.46 seconds: 100%|██████████| 569/569 [00:11<00:00, 49.6 row/s]                          

- ReadCSV->SplitBlocks(48): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 27.4KB object store: : 569 row [00:11, 49.6 row/s]
2024-10-30 16:48:31,120	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-30_16-39-09_402576_721550/logs/ray-data
2024-10-30 16:48:31,121	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
         

== Status ==
Current time: 2024-10-30 16:48:38 (running for 00:00:00.11)
Using FIFO scheduling algorithm.
Logical resource usage: 11.0/24 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2024-10-30_16-39-09_402576_721550/artifacts/2024-10-30_16-48-37/XGBoostTrainer_2024-10-30_16-48-37/driver_artifacts
Number of trials: 1/1 (1 PENDING)




[36m(XGBoostTrainer pid=728375)[0m GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[36m(XGBoostTrainer pid=728375)[0m Started distributed worker processes: 
[36m(XGBoostTrainer pid=728375)[0m - (node_id=d3facf9112a40477b403d0144a025de36aee0cf1253cfb4842b96cb1, ip=172.20.221.186, pid=728460) world_rank=0, local_rank=0, node_rank=0
[36m(XGBoostTrainer pid=728375)[0m - (node_id=d3facf9112a40477b403d0144a025de36aee0cf1253cfb4842b96cb1, ip=172.20.221.186, pid=728461) world_rank=1, local_rank=1, node_rank=0
[36m(XGBoostTrainer pid=728375)[0m - (node_id=d3facf9112a40477b403d0144a025de36aee0cf1253cfb4842b96cb1, ip=172.20.221.186, pid=728459) world_rank=2, local_rank=2, node_rank=0
[36m(XGBoostTrainer pid=728375)[0m - (node_id=d3facf9112a40477b403d0144a025de36aee0cf1253cfb4842b96cb1, ip=172.20.221.186, pid=728462) world_rank=3, local_rank=3, node_rank=0
[36m(X

                                                 
[A                                                       

(pid=729068) Running 0: 0.00 row [00:00, ? row/s]                
[A

== Status ==
Current time: 2024-10-30 16:48:43 (running for 00:00:05.15)
Using FIFO scheduling algorithm.
Logical resource usage: 11.0/24 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2024-10-30_16-39-09_402576_721550/artifacts/2024-10-30_16-48-37/XGBoostTrainer_2024-10-30_16-48-37/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




2024-10-30 16:48:48,501	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/home/sat/ray_results/XGBoostTrainer_2024-10-30_16-48-37' in 0.0069s.
2024-10-30 16:48:48,505	INFO tune.py:1041 -- Total run time: 11.21 seconds (9.90 seconds for the tuning loop).


== Status ==
Current time: 2024-10-30 16:48:48 (running for 00:00:09.90)
Using FIFO scheduling algorithm.
Logical resource usage: 11.0/24 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2024-10-30_16-39-09_402576_721550/artifacts/2024-10-30_16-48-37/XGBoostTrainer_2024-10-30_16-48-37/driver_artifacts
Number of trials: 1/1 (1 TERMINATED)


OrderedDict([('train-logloss', np.float64(0.00595748214697009)), ('train-error', np.float64(0.0)), ('valid-logloss', np.float64(0.07639243121041493)), ('valid-error', np.float64(0.04117647058823528)), ('timestamp', 1730281727), ('checkpoint_dir_name', 'checkpoint_000000'), ('should_checkpoint', True), ('done', True), ('training_iteration', 101), ('trial_id', '22b6a_00000'), ('date', '2024-10-30_16-48-47'), ('time_this_iter_s', 0.005759477615356445), ('time_total_s', 7.536529064178467), ('pid', 728375), ('hostname', 'DESKTOP-VIHKCAB'), ('node_ip', '172.20.221.186'), ('config', {}), ('time_since_restore', 7.536529064178467), 

In [13]:
predict_xgboost(result)

2024-10-30 16:50:27,276	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-30_16-39-09_402576_721550/logs/ray-data
2024-10-30 16:50:27,277	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
                                                                                                                  
✔️  Dataset execution finished in 7.00 seconds: 100%|██████████| 569/569 [00:06<00:00, 81.3 row/s]                           

- ReadCSV->SplitBlocks(48): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 27.4KB object store: : 569 row [00:07, 81.3 row/s]
2024-10-30 16:50:34,286	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-30_16-39-09_402576_721550/logs/ray-data
2024-10-30 16:50:34,287	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
         

PREDICTED LABELS


Running 0: 0.00 row [00:00, ? row/s]
[A

                                                                                                               
[A                                                                         

[A[A                                             


✔️  Dataset execution finished in 1.35 seconds: 100%|██████████| 20.0/20.0 [00:01<00:00, 14.8 row/s]

[A

[A[A
[A                                                                                                                                                                                               

[A[A                                             


- MapBatches(drop_columns)->MapBatches(Predict): Tasks: 3; Actors: 1; Queued blocks: 0; Resources: 1.0 CPU, 222.0B object store; [locality off]: 100%|██████████| 138/138 [00:00<00:00, 931 row/s]

[A

[A[A
[A
[A
[A
[A

[A[A                                                                                                                       

{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(0)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(0)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(0)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(1)}
{'predictions': np.int64(0)}



