Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allreduce error when continuing training from the same process (Ray elastic training) #7436

Closed
krfricke opened this issue Nov 15, 2021 · 15 comments

Comments

@krfricke
Copy link
Contributor

TLDR: Since XGBoost 1.5, XGBoost-Ray's elastic training fails (it works with XGBoost 1.4). I suspect there may be retained state as it works when all actors are re-created.

XGBoost-Ray uses Ray's actor model to reduce data loading overhead when remote training workers die.

In the elastic training test, we do the following:

  • We start a remote Ray actor on four different nodes
  • Technically, these are separate long living Python processes
  • These processes start a thread which connects to the Rabit tracker and call the native xgb.train() method

After a number of iterations (15), we kill one of the actors. This actor is then re-started. The other actors are re-used.

However, when continuing training, existing actors fail with

  File "/home/ray/anaconda3/lib/python3.7/site-packages/xgboost/training.py", line 196, in train
    early_stopping_rounds=early_stopping_rounds)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/xgboost/training.py", line 81, in _train_internal
    bst.update(dtrain, i, obj)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/xgboost/core.py", line 1682, in update
    dtrain.handle))
  File "/home/ray/anaconda3/lib/python3.7/site-packages/xgboost/core.py", line 218, in _check_call
    raise XGBoostError(py_str(_LIB.XGBGetLastError()))
xgboost.core.XGBoostError: [08:05:56] ../rabit/include/rabit/internal/utils.h:90: Allreduce failed

This is also true when not restoring from a checkpoint.

This does not happen when we re-create all actors.

The bug does not come up in XGBoost < 1.5, only in the latest release.

Are you aware of any changes in XGBoost 1.5 that maybe retain state across multiple calls to xgboost.train? As explained above, the actors retain their state and the same PID, but the xgb.train() call is always in a separate thread, which is ended for all actors when a single actor fails. We also restart the Rabit tracker between runs (and I also tried it with different ports for the Rabit tracker, all with the same result).

Any help would be much appreciated, thanks!

@trivialfis
Copy link
Member

Hi, I'm not aware of any related change. If you have a reproducible script I can bisect the changes.

@trivialfis
Copy link
Member

Ah, I see the link. Will take a deeper look.

@trivialfis
Copy link
Member

@krfricke Could you please provide some detailed instructions on how to run this on a local machine? I found ray-project/ray#15913 but seems to be a huge PR.

@krfricke
Copy link
Contributor Author

krfricke commented Nov 15, 2021

Sure,

consider these two files:

create_test_data.py

import argparse
import numpy as np
import os

from xgboost_ray.tests.utils import create_parquet

if __name__ == "__main__":
    if "OMP_NUM_THREADS" in os.environ:
        del os.environ["OMP_NUM_THREADS"]

    parser = argparse.ArgumentParser(description="Create fake data.")
    parser.add_argument(
        "filename", type=str, default="/data/parted.parquet/", help="ray/dask")
    parser.add_argument(
        "-r",
        "--num-rows",
        required=False,
        type=int,
        default=1e8,
        help="num rows")
    parser.add_argument(
        "-p",
        "--num-partitions",
        required=False,
        type=int,
        default=100,
        help="num partitions")
    parser.add_argument(
        "-c",
        "--num-cols",
        required=False,
        type=int,
        default=4,
        help="num columns (features)")
    parser.add_argument(
        "-C",
        "--num-classes",
        required=False,
        type=int,
        default=2,
        help="num classes")
    parser.add_argument(
        "-s",
        "--seed",
        required=False,
        type=int,
        default=1234,
        help="random seed")

    args = parser.parse_args()

    np.random.seed(args.seed)
    create_parquet(
        args.filename,
        num_rows=int(args.num_rows),
        num_partitions=int(args.num_partitions),
        num_features=int(args.num_cols),
        num_classes=int(args.num_classes))

ft_small_non_elastic.py

"""Fault tolerance test (small cluster, non-elastic training)

In this run, two training actors will die after some time. It is expected that
in both cases xgboost_ray stops training, restarts the dead actors, and
continues training with all four actors.

Test owner: krfricke

Acceptance criteria: Should run through and report final results. Intermediate
output should show that training halts wenn an actor dies and continues only
when all four actors are available again. The test will fail if fault
tolerance did not work correctly.

Notes: This test seems to be somewhat flaky. This might be due to
race conditions in handling dead actors. This is likely a problem of
the xgboost_ray implementation and not of this test.
"""
import ray

from xgboost_ray import RayParams


from ray.util.xgboost.release_test_util import train_ray, \
    FailureState, FailureInjection, TrackingCallback

if __name__ == "__main__":
    ray.init(num_cpus=10)

    failure_state = FailureState.remote()

    ray_params = RayParams(
        elastic_training=False,
        max_actor_restarts=2,
        num_actors=4,
        cpus_per_actor=1,
        gpus_per_actor=0)

    _, additional_results, _ = train_ray(
        path="/tmp/classification.parquet",
        num_workers=4,
        num_boost_rounds=100,
        num_files=200,
        regression=False,
        use_gpu=False,
        ray_params=ray_params,
        xgboost_params=None,
        callbacks=[
            TrackingCallback(),
            FailureInjection(
                id="first_fail", state=failure_state, ranks=[2], iteration=14),
            FailureInjection(
                id="second_fail", state=failure_state, ranks=[0], iteration=34)
        ])

    actor_1_world_size = set(additional_results["callback_returns"][1])
    assert len(actor_1_world_size) == 1 and 4 in actor_1_world_size, \
        "Training with fewer than 4 actors observed, but this was " \
        "non-elastic training. Please report to test owner."

    print("PASSED.")

Install dependencies

pip install "ray[default]==1.7.0"

Create data

 python rcreate_test_data.py /tmp/classification.parquet --seed 1234 --num-rows 1000000 --num-cols 40 --num-partitions 100 --num-classes 2

Run

python ft_small_non_elastic.py

For me this works with xgboost<1.5 but not with xgboost==1.5

This test works with a single machine (confirmed it works on my laptop).

Please note that with latest Ray master this just blocks forever after the first iteration. With Ray 1.7.0 it throws the error above.

@trivialfis
Copy link
Member

trivialfis commented Nov 15, 2021

Note to myself:
The continuation is broken by 1cd20ef likely cause is ray_xgboost doesn't cover the synchronization during DMatrix construction.

Silent output caused by 7017dd5 likely cause is in order to get jvm package working, the tracker's stdout is redirected to python logger.

@krfricke
Copy link
Contributor Author

Thanks for looking into this.

Is there something we can do about the first issue from the xgboost-ray side? Ideally we'd like to re-use the dmatrices across different starts as that's where the benefit of re-using actors comes from (caching loaded data for re-use)

@trivialfis
Copy link
Member

trivialfis commented Nov 15, 2021

re-use the dmatrices across different starts

I think that's unlikely. I looked into the ray_xgboost's code a little bit. Before training, data goes through quantization, which requires an allreduce operation. Also, DMatrix construction itself requires an allreduce operation to get a consistent number of columns across workers. If you reconstruct only one of the DMatrix it will be stuck at the allreduce operation.

@trivialfis
Copy link
Member

trivialfis commented Nov 15, 2021

The ray_xgboost project constructs DMatrix before entering rabitContext. It didn't fail before because the DMatrix constructor thought there's only one worker (as rabit hasn't been initialized yet) so the synchronization of n_features became a no-op. But after that, in the latest XGBoost, the synchronization for quantiles hits the issue.

@trivialfis
Copy link
Member

The solution is to re-initialize all dmatrices during restart and move the construction of DMatrix into rabit context.

@krfricke
Copy link
Contributor Author

krfricke commented Nov 15, 2021

Ok got it - we'll then try to keep the raw data in cache and reconstruct the dmatrices from scratch always.

I'll implement this tomorrow. Thanks for the context here, that was very helpful!

@trivialfis
Copy link
Member

Let me know if there's anything I can help. Also, it's quite pleasant to read the code as it's very well written. ;-)

@trivialfis
Copy link
Member

trivialfis commented Nov 21, 2021

Just something to think about. XGBoost used to support recovery by rabit (the allreduce impl in xgboost) but we removed the feature as it was too difficult to implement at low level. The difficulty is similar to the one in this issue. In old rabit, it caches the booster model for each iteration, but the quantile and some other things weren't properly handled. To achieve single point recovery we need to prevent any possible allreduce that can be run out of order (like all dmatrices should be initialized in the same order or just prevent all reduce in DMatrix altogether), then we need to specify what should be cached during the lifetime of rabit handle and what should be cached only for each iteration. The quantile and number of features belong to the former while the booster belongs to the latter. Some other difficulties are changed parameters (learning rate decay etc.)

I remain optimistic that we can restore the native support at some point in the future, but right now the whole data set needs to be cached and the process is best carried out by distributed framework integration (like ray xgboost).

@krfricke
Copy link
Contributor Author

Thanks for the additional context - and sorry I didn't get to it last week, other stuff came up. I hope to tackle it this week, and yes, I agree we should just move it into the Rabit context to make sure we adhere to all caching principles correctly.

@trivialfis
Copy link
Member

@krfricke Can we close this issue?

@krfricke
Copy link
Contributor Author

krfricke commented Jan 11, 2022

Yes, sorry for the delay!

For reference, this was fixed in xgboost_ray here: ray-project/xgboost_ray#179

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants