# %% [markdown]

 # Introduction to the Ray AI Libraries: An example of using Ray data, Ray Train, Ray Tune, Ray Serve to implement a XGBoost regression model

 ¬© 2025, Anyscale. All Rights Reserved

# %% [markdown]

 üíª **Launch Locally**: You can run this notebook locally, but performance will be reduced.

 üöÄ **Launch on Cloud**: A Ray Cluster with 4 GPUs (Click [here](http://console.anyscale.com/register) to easily start a Ray cluster on Anyscale) is recommended to run this notebook.

# %% [markdown]

 Let's start with a quick end-to-end example to get a sense of what the Ray AI Libraries can do.
 <div class="alert alert-block alert-info">
 <b> Here is the roadmap for this notebook:</b>
 <ul>
     <li>Overview of the Ray AI Libraries</li>
     <li>Quick end-to-end example</li>
     <ul>
       <li>Vanilla XGBoost code</li>
       <li>Hyperparameter tuning with Ray Tune</li>
       <li>Distributed training with Ray Train</li>
       <li>Serving an ensemble model with Ray Serve</li>
       <li>Batch inference with Ray Data</li>
     </ul>
 </ul>
 </div>

# %% [markdown]

 **Imports**

In [None]:
# %%

# (Optional): If you get an XGBoostError at import, you might have to `brew install libomp` before importing xgboost again
# !brew install libomp

# %% [markdown]

 notes: make sure to use the correct RunConfig
 from ray.train import RunConfig
 vs
 from ray.tune import RunConfig
 for train vs hyperparameter tuning

In [None]:
# %%

import asyncio
import fastapi
import pandas as pd
import requests
# macos: If you get an XGBoostError at import, you might have to `brew install libomp` before importing xgboost again
import xgboost
from pydantic import BaseModel
from sklearn.model_selection import train_test_split

import ray
import ray.tune
import ray.train
from ray.train.xgboost import XGBoostTrainer as RayTrainXGBoostTrainer
from ray.train import RunConfig

import ray.data
import ray.serve

# %% [markdown]

 ## 1. Overview of the Ray AI Libraries

 <img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_AI_Libraries/Ray+AI+Libraries.png" width="700px" loading="lazy">

 Built on top of Ray Core, the Ray AI Libraries inherit all the performance and scalability benefits offered by Core while providing a convenient abstraction layer for machine learning. These Python-first native libraries allow ML practitioners to distribute individual workloads, end-to-end applications, and build custom use cases in a unified framework.

 The Ray AI Libraries bring together an ever-growing ecosystem of integrations with popular machine learning frameworks to create a common interface for development.

 |<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Introduction_to_Ray_AIR/e2e_air.png" width="100%" loading="lazy">|
 |:-:|
 |Ray AI Libraries enable end-to-end ML development and provides multiple options for integrating with other tools and libraries from the MLOps ecosystem.|



# %% [markdown]

 ## 2. Quick end-to-end example

 For this classification task, you will apply a simple [XGBoost](https://xgboost.readthedocs.io/en/stable/) (a gradient boosted trees framework) model to the June 2021 [New York City Taxi & Limousine Commission's Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).

 The full dataset contains millions of samples of yellow cab rides, and the goal is to predict the tip amount.

 **Dataset features**
 * **`passenger_count`**
     * Float (whole number) representing number of passengers.
 * **`trip_distance`**
     * Float representing trip distance in miles.
 * **`fare_amount`**
     * Float representing total price including tax, tip, fees, etc.
 * **`tolls_amount`**
     * Float representing the total paid on tolls if any.

 **Target**
 * **`trip_amount`**
     * Float representing the total paid as tips

# %% [markdown]

 ### 2.1 Vanilla XGboost code

 Let's start with the vanilla XGBoost code to predict the tip amount for a NYC taxi cab data.

In [None]:
# %%

features = [
    "passenger_count", 
    "trip_distance",
    "fare_amount",
    "tolls_amount",
]

label_column = "tip_amount"

# %% [markdown]

 Define a function to load the data and split into train and test

In [None]:
# %%

# def load_data():
#     path = "s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2021-03.parquet"
#     df = pd.read_parquet(path, columns=features + [label_column])
#     X_train, X_test, y_train, y_test = train_test_split(
#         df[features], df[label_column], test_size=0.2, random_state=42
#     )
#     dtrain = xgboost.DMatrix(X_train, label=y_train)
#     dtest = xgboost.DMatrix(X_test, label=y_test)
#     return dtrain, dtest
def load_data():
    path = "s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2021-03.parquet"
    df = pd.read_parquet(
        path,
        columns=features + [label_column],
        storage_options={"anon": True}
    )
    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[label_column], test_size=0.2, random_state=42
    )
    dtrain = xgboost.DMatrix(X_train, label=y_train)
    dtest  = xgboost.DMatrix(X_test, label=y_test)
    return dtrain, dtest

# %% [markdown]

 Define a function to run `xgboost.train` given some hyperparameter dictionary `params`

In [None]:
# %%

storage_folder = "/Users/phil/Documents/GITHUB/any_scale_academy/INTRO_RAY/data/01_Intro_Ray_AI_Libs_Overview/" # Modify this path to your local folder if it runs on your local environment

In [None]:
# %%

from pathlib import Path
model_path = Path(storage_folder) / "model.ubj"

def my_xgboost_func(params):    
    evals_result = {}
    dtrain, dtest = load_data()
    bst = xgboost.train(
        params, 
        dtrain, 
        num_boost_round=10, 
        evals=[(dtest, "eval")], 
        evals_result=evals_result,
    )
    # Use Path
    bst.save_model(model_path)
    print(f"{evals_result['eval']}")
    return {"eval-rmse": evals_result["eval"]["rmse"][-1]}

params = {
    "objective": "reg:squarederror",
    "eval_metric": "rmse",
    "tree_method": "hist",
    "max_depth": 6,
    "eta": 0.1,
}
my_xgboost_func(params)

[0]	eval-rmse:2.18114
[1]	eval-rmse:2.13805
[2]	eval-rmse:2.10221
[3]	eval-rmse:2.07294
[4]	eval-rmse:2.04855
[5]	eval-rmse:2.02852
[6]	eval-rmse:2.01225
[7]	eval-rmse:1.99868
[8]	eval-rmse:1.98771
[9]	eval-rmse:1.97872
OrderedDict([('rmse', [2.18113709207776, 2.138052274494217, 2.1022143627953036, 2.072936825276888, 2.0485457212693987, 2.028522863406997, 2.0122461934067273, 1.99868078532301, 1.9877117047436585, 1.9787180742813582])])


{'eval-rmse': 1.9787180742813582}

# %% [markdown]

 ### 2.2 Hyperparameter tuning with Ray Tune

 Let's use Ray Tune to run distributed hyperparameter tuning for the XGBoost model.

In [None]:
# %%

# tuner = ray.tune.Tuner(  # Create a tuner
#     my_xgboost_func,  # Pass it the training function which Ray Tune calls Trainable.
#     param_space={  # Pass it the parameter space to search over
#         "objective": "reg:squarederror",
#         "eval_metric": "rmse",
#         "tree_method": "hist",
#         "max_depth": 6,
#         "eta": ray.tune.uniform(0.01, 0.3),
#     },
#     run_config=RunConfig(storage_path=storage_folder),
#     tune_config=ray.tune.TuneConfig(  # Tell it which metric to tune
#         metric="eval-rmse",
#         mode="min",
#         num_samples=10,
#     ),
# )


tuner = ray.tune.Tuner(
    my_xgboost_func,
    param_space={
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
        "tree_method": "hist",
        "max_depth": 6,
        "eta": ray.tune.uniform(0.01, 0.3),
    },
    tune_config=ray.tune.TuneConfig(
        metric="eval-rmse",
        mode="min",
        num_samples=10,
    ),
    run_config=ray.tune.RunConfig( # note use the tune config 
        storage_path=storage_folder,
    ),
)


results = tuner.fit()  # Run the tuning job
print(results.get_best_result().config)  # Get back the best hyperparameters

0,1
Current time:,2025-12-10 14:45:47
Running for:,00:00:14.10
Memory:,22.6/32.0 GiB

Trial name,status,loc,eta,iter,total time (s),eval-rmse
my_xgboost_func_ef28a_00000,TERMINATED,127.0.0.1:72583,0.18427,1,8.54033,1.94223
my_xgboost_func_ef28a_00001,TERMINATED,127.0.0.1:72580,0.0251196,1,6.58364,2.12233
my_xgboost_func_ef28a_00002,TERMINATED,127.0.0.1:72585,0.0842778,1,8.42419,1.99422
my_xgboost_func_ef28a_00003,TERMINATED,127.0.0.1:72584,0.125585,1,3.99442,1.96126
my_xgboost_func_ef28a_00004,TERMINATED,127.0.0.1:72586,0.0439054,1,8.00385,2.066
my_xgboost_func_ef28a_00005,TERMINATED,127.0.0.1:72581,0.167708,1,6.78896,1.94573
my_xgboost_func_ef28a_00006,TERMINATED,127.0.0.1:72587,0.21994,1,6.11836,1.93804
my_xgboost_func_ef28a_00007,TERMINATED,127.0.0.1:72582,0.222276,1,3.93908,1.93806
my_xgboost_func_ef28a_00008,TERMINATED,127.0.0.1:72579,0.270253,1,8.27694,1.93537
my_xgboost_func_ef28a_00009,TERMINATED,127.0.0.1:72588,0.0175397,1,6.75479,2.15092


[36m(my_xgboost_func pid=72582)[0m [0]	eval-rmse:2.12367
[36m(my_xgboost_func pid=72584)[0m [0]	eval-rmse:2.16855
[36m(my_xgboost_func pid=72582)[0m [1]	eval-rmse:2.05393
[36m(my_xgboost_func pid=72582)[0m [2]	eval-rmse:2.01048
[36m(my_xgboost_func pid=72584)[0m [1]	eval-rmse:2.11772
[36m(my_xgboost_func pid=72584)[0m [2]	eval-rmse:2.07765
[36m(my_xgboost_func pid=72582)[0m [3]	eval-rmse:1.98287
[36m(my_xgboost_func pid=72584)[0m [3]	eval-rmse:2.04667
[36m(my_xgboost_func pid=72582)[0m [4]	eval-rmse:1.96573
[36m(my_xgboost_func pid=72582)[0m [5]	eval-rmse:1.95506
[36m(my_xgboost_func pid=72584)[0m [4]	eval-rmse:2.02234
[36m(my_xgboost_func pid=72584)[0m [5]	eval-rmse:2.00350
[36m(my_xgboost_func pid=72582)[0m [6]	eval-rmse:1.94800
[36m(my_xgboost_func pid=72582)[0m [7]	eval-rmse:1.94348
[36m(my_xgboost_func pid=72584)[0m [6]	eval-rmse:1.98867
[36m(my_xgboost_func pid=72582)[0m [8]	eval-rmse:1.94036
[36m(my_xgboost_func pid=72584)[0m [7]	eval-rmse:1.977

2025-12-10 14:45:47,488	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/Users/phil/Documents/GITHUB/any_scale_academy/INTRO_RAY/data/01_Intro_Ray_AI_Libs_Overview/my_xgboost_func_2025-12-10_14-45-27' in 0.0075s.


[36m(my_xgboost_func pid=72583)[0m [7]	eval-rmse:1.95042
[36m(my_xgboost_func pid=72585)[0m [9]	eval-rmse:1.99422
[36m(my_xgboost_func pid=72579)[0m [9]	eval-rmse:1.93537
[36m(my_xgboost_func pid=72583)[0m [8]	eval-rmse:1.94556
[36m(my_xgboost_func pid=72583)[0m [9]	eval-rmse:1.94223


2025-12-10 14:45:47,491	INFO tune.py:1041 -- Total run time: 15.16 seconds (14.09 seconds for the tuning loop).


{'objective': 'reg:squarederror', 'eval_metric': 'rmse', 'tree_method': 'hist', 'max_depth': 6, 'eta': 0.2702531923080533}


# %% [markdown]

 Here is a diagram that shows what Tune does:

 It is effectively scheduling many trials and returning the best performing one.

 <img src="https://bair.berkeley.edu/static/blog/tune/tune-arch-simple.png" width="700px" loading="lazy">

# %% [markdown]

 ### 2.3. Distributed training with Ray Train

 In case your training data is too large, your training might take a long time to complete.

 To speed it up, shard the dataset across training workers and perform distributed XGBoost training.

 Let's redefine `load_data` to now load a different slice of the data given the worker index/rank.

In [None]:
# %%

# def load_data():
#     # find out which training worker is running this code
#     train_ctx = ray.train.get_context()
#     worker_rank = train_ctx.get_world_rank()
#     print(f"Loading data for worker {worker_rank}...")

#     # build path based on training worker rank
#     month = (worker_rank + 1) % 12
#     year = 2021 + (worker_rank + 1) // 12
#     path = f"s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_{year}-{month:02}.parquet"

#     # same as before
#     df = pd.read_parquet(path, columns=features + [label_column])
#     X_train, X_test, y_train, y_test = train_test_split(
#         df[features], df[label_column], test_size=0.2, random_state=42
#     )
#     dtrain = xgboost.DMatrix(X_train, label=y_train)
#     dtest = xgboost.DMatrix(X_test, label=y_test)
#     return dtrain, dtest

def load_data():
    train_ctx = ray.train.get_context()
    worker_rank = train_ctx.get_world_rank()
    print(f"Loading data for worker {worker_rank}...")

    month = (worker_rank + 1) % 12
    year = 2021 + (worker_rank + 1) // 12
    path = f"s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_{year}-{month:02}.parquet"

    df = pd.read_parquet(
        path,
        columns=features + [label_column],
        storage_options={"anon": True}   # <-- required
    )
    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[label_column], test_size=0.2, random_state=42
    )
    dtrain = xgboost.DMatrix(X_train, label=y_train)
    dtest = xgboost.DMatrix(X_test, label=y_test)
    return dtrain, dtest

# %% [markdown]

 Now we can run distributed XGBoost training using Ray Train's XGBoostTrainer - similar trainers exist for other popular ML frameworks.

In [None]:
# %%

trainer = RayTrainXGBoostTrainer(  # Create a trainer
    my_xgboost_func,  # Pass it the training function
    scaling_config=ray.train.ScalingConfig(
        num_workers=2, use_gpu=False
    ),  # Define how many training workers
    train_loop_config=params,  # Pass it the hyperparameters
)

trainer.fit()  # Run the training job

[36m(TrainController pid=72620)[0m Attempting to start training worker group of size 2 with the following resources: [{'CPU': 1}] * 2
[36m(TrainController pid=72620)[0m Started training worker group of size 2: 
[36m(TrainController pid=72620)[0m - (ip=127.0.0.1, pid=72633) world_rank=0, local_rank=0, node_rank=0
[36m(TrainController pid=72620)[0m - (ip=127.0.0.1, pid=72634) world_rank=1, local_rank=1, node_rank=0
[36m(RayTrainWorker pid=72633)[0m [14:45:55] Task [xgboost.ray-rank=00000000]:14831c1751e6b686b383bb6301000000 got rank 0
[36m(RayTrainWorker pid=72633)[0m Loading data for worker 0...
[36m(TrainController pid=72620)[0m [14:45:57] [0]	eval-rmse:2.28346
[36m(TrainController pid=72620)[0m [14:45:57] [1]	eval-rmse:2.25069
[36m(TrainController pid=72620)[0m [14:45:57] [2]	eval-rmse:2.22460
[36m(TrainController pid=72620)[0m [14:45:57] [3]	eval-rmse:2.20430
[36m(TrainController pid=72620)[0m [14:45:57] [4]	eval-rmse:2.18836
[36m(TrainController pid=72620)[0m

Result(metrics=None, checkpoint=None, error=None, path='/Users/phil/ray_results/ray_train_run-2025-12-10_14-45-47', metrics_dataframe=None, best_checkpoints=[], _storage_filesystem=<pyarrow._fs.LocalFileSystem object at 0x3e69c8630>)

# %% [markdown]

 Here is a diagram that shows what Train does:

 A train controller will create training workers and execute the training function on each worker.

 Ray Train delegates the distributed training to the underlying XGBoost framework.

 <img src="https://docs.ray.io/en/latest/_images/overview.png" width="700px" loading="lazy">

# %% [markdown]

 ### 2.4 Serving an ensemble model with Ray Serve

 Ray Serve allows for distributed serving of models and complex inference pipelines.

 Here is a diagram showing how to deploy an ensemble model with Ray Serve:

 <img src="https://images.ctfassets.net/xjan103pcp94/3DJ7vVRxYIvcFO7JmIUMCx/77a45caa275ffa46f5135f4d6726dd4f/Figure_2_-_Fanout_and_ensemble.png" width="700px" loading="lazy">

 Here is how the resulting code looks like:

In [None]:
# %%

app = fastapi.FastAPI()

class Payload(BaseModel):
    passenger_count: int
    trip_distance: float
    fare_amount: float
    tolls_amount: float


@ray.serve.deployment
@ray.serve.ingress(app)
class Ensemble:
    def __init__(self, model1, model2):
        self.model1 = model1
        self.model2 = model2

    @app.post("/predict")
    async def predict(self, data: Payload) -> dict:
        model1_prediction, model2_prediction = await asyncio.gather(
            self.model1.predict.remote([data.model_dump()]),
            self.model2.predict.remote([data.model_dump()]),
        )
        out = {"prediction": float(model1_prediction + model2_prediction) / 2}
        return out


@ray.serve.deployment
class Model:
    def __init__(self, path: str):
        self._model = xgboost.Booster()
        self._model.load_model(path)

    def predict(self, data: list[dict]) -> list[float]:
        # Make prediction
        dmatrix = xgboost.DMatrix(pd.DataFrame(data))
        model_prediction = self._model.predict(dmatrix)
        return model_prediction


# Run the deployment
handle = ray.serve.run(
    Ensemble.bind(
        model1=Model.bind(model_path),
        model2=Model.bind(model_path),
    ),
    route_prefix="/ensemble"
)

INFO 2025-12-10 14:46:02,108 serve 72513 -- Started Serve in namespace "serve".
[36m(ProxyActor pid=72648)[0m INFO 2025-12-10 14:46:02,018 proxy 127.0.0.1 -- Proxy starting on node 066c79cb5eded2851576d4533c020db7a4a1bf5e0ce9db48ca869e4c (HTTP port: 8000).
[36m(ProxyActor pid=72648)[0m INFO 2025-12-10 14:46:02,091 proxy 127.0.0.1 -- Got updated endpoints: {}.
[36m(ServeController pid=72646)[0m INFO 2025-12-10 14:46:02,155 controller 72646 -- Deploying new version of Deployment(name='Model', app='default') (initial target replicas: 1).
[36m(ServeController pid=72646)[0m INFO 2025-12-10 14:46:02,156 controller 72646 -- Deploying new version of Deployment(name='Model_1', app='default') (initial target replicas: 1).
[36m(ServeController pid=72646)[0m INFO 2025-12-10 14:46:02,157 controller 72646 -- Deploying new version of Deployment(name='Ensemble', app='default') (initial target replicas: 1).
[36m(ProxyActor pid=72648)[0m INFO 2025-12-10 14:46:02,159 proxy 127.0.0.1 -- Got up

# %% [markdown]

 Let's make an HTTP request to the Ray Serve instance.

In [None]:
# %%

requests.post(
    "http://localhost:8000/ensemble/predict",
    json={  # Use json parameter instead of params
        "passenger_count": 1,
        "trip_distance": 2.5,
        "fare_amount": 10.0,
        "tolls_amount": 0.5,
    },
).json()

[36m(ServeReplica:default:Model pid=72650)[0m INFO 2025-12-10 14:46:05,359 default_Model oujxjs6t 6ea1f8ec-f3a0-4536-ab99-6014dff54379 -- CALL predict OK 4.2ms
[36m(ServeReplica:default:Model_1 pid=72649)[0m INFO 2025-12-10 14:46:05,359 default_Model_1 1ybsyodk 6ea1f8ec-f3a0-4536-ab99-6014dff54379 -- CALL predict OK 3.4ms
[36m(ServeReplica:default:Ensemble pid=72651)[0m INFO 2025-12-10 14:46:05,319 default_Ensemble e8hf4j7g 6ea1f8ec-f3a0-4536-ab99-6014dff54379 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x15f3c7950>.


{'prediction': 2.0076115131378174}

# %% [markdown]

 ### 2.5 Batch inference with Ray Data

 Ray Data allows for distributed data processing through streaming execution across a heterogeneous cluster of CPUs and GPUs.

 This makes Ray Data ideal for workloads like compute-intensive data processing, data ingestion, and batch inference.

In [None]:
# %%

class OfflinePredictor:
    def __init__(self):
        # Load expensive state
        self._model = xgboost.Booster()
        self._model.load_model(model_path)

    def predict(self, data: list[dict]) -> list[float]:
        # Make prediction in batch
        dmatrix = xgboost.DMatrix(pd.DataFrame(data))
        model_prediction = self._model.predict(dmatrix)
        return model_prediction

    def __call__(self, batch: dict) -> dict:
        batch["predictions"] = self.predict(batch)
        return batch


# # Apply the predictor to the validation dataset
# prediction_pipeline = (
#     ray.data.read_parquet(
#         "s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2021-03.parquet",
#     )
#     .select_columns(features)
#     .map_batches(OfflinePredictor, concurrency=(2, 10))
# )

import s3fs

fs = s3fs.S3FileSystem(anon=True)

prediction_pipeline = (
    ray.data.read_parquet(
        "s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2021-03.parquet",
        filesystem=fs,       # <-- fsspec FS object
    )
    .select_columns(features)
    .map_batches(OfflinePredictor, concurrency=(2, 10))
)

[36m(ServeReplica:default:Ensemble pid=72651)[0m INFO 2025-12-10 14:46:05,396 default_Ensemble e8hf4j7g 6ea1f8ec-f3a0-4536-ab99-6014dff54379 -- POST /ensemble/predict 200 101.7ms


Parquet dataset sampling 0:   0%|          | 0.00/1.00 [00:00<?, ? file/s]

2025-12-10 14:46:08,895	INFO parquet_datasource.py:728 -- Estimated parquet encoding ratio is 9.752.
2025-12-10 14:46:08,896	INFO parquet_datasource.py:788 -- Estimated parquet reader batch size at 883012 rows
2025-12-10 14:46:08,896	INFO parquet_datasource.py:788 -- Estimated parquet reader batch size at 883012 rows


# %% [markdown]

 After defining the pipeline, we can execute it in a distributed manner by writing the output to a sink

In [None]:
# %%

prediction_pipeline.write_parquet(f"{storage_folder}/xgboost_predictions")

2025-12-10 14:46:10,157	INFO logging.py:397 -- Registered dataset logger for dataset dataset_4_0
2025-12-10 14:46:10,168	INFO streaming_executor.py:174 -- Starting execution of Dataset dataset_4_0. Full logs are in /tmp/ray/session_2025-12-10_14-45-27_396825_72513/logs/ray-data
2025-12-10 14:46:10,169	INFO streaming_executor.py:175 -- Execution plan of Dataset dataset_4_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> ActorPoolMapOperator[MapBatches(OfflinePredictor)] -> TaskPoolMapOperator[Write]
2025-12-10 14:46:10,204	INFO streaming_executor.py:682 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.


Running 0: 0.00 row [00:00, ? row/s]

- ReadParquet->SplitBlocks(100) 1: 0.00 row [00:00, ? row/s]

- MapBatches(OfflinePredictor) 2: 0.00 row [00:00, ? row/s]

- Write 3: 0.00 row [00:00, ? row/s]

2025-12-10 14:46:15,782	INFO streaming_executor.py:300 -- ‚úîÔ∏è  Dataset dataset_4_0 execution finished in 5.61 seconds
2025-12-10 14:46:16,005	INFO dataset.py:5193 -- Data sink Parquet finished. 1925152 rows and 66.1MiB data written.


# %% [markdown]

 Let's inspect the produced predictions.

In [None]:
# %%

!ls {storage_folder}/xgboost_predictions/

2_5a868b9ca2d24c7f87bd5a45441926dc_000000_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000001_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000002_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000003_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000004_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000005_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000006_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000007_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000008_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000009_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000010_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000011_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000012_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000013_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000014_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000015_000000-0.parquet
2_5a868b9ca2d24c7f87bd5a45441926dc_000016_000000-0.parqu

# %% [markdown]

 ### 2.6 Clean up

In [None]:
# %%

# Run this cell for file cleanup 
# !rm -rf {storage_folder}/xgboost_predictions/
# !rm {model_path}