Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] support stage-level for yarn/k8s #10209

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 44 additions & 29 deletions python-package/xgboost/spark/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,14 @@ def _gen_predict_params_dict(self) -> Dict[str, Any]:
predict_params[param.name] = self.getOrDefault(param)
return predict_params

def _validate_gpu_params(self) -> None:
def _validate_gpu_params(
self, spark_version: str, conf: SparkConf, is_local: bool = False
) -> None:
"""Validate the gpu parameters and gpu configurations"""

if self._run_on_gpu():
ss = _get_spark_session()
sc = ss.sparkContext

if _is_local(sc):
# Support GPU training in Spark local mode is just for debugging
if is_local:
# Supporting GPU training in Spark local mode is just for debugging
# purposes, so it's okay for printing the below warning instead of
# checking the real gpu numbers and raising the exception.
get_logger(self.__class__.__name__).warning(
Expand All @@ -364,33 +363,41 @@ def _validate_gpu_params(self) -> None:
self.getOrDefault(self.num_workers),
)
else:
executor_gpus = sc.getConf().get("spark.executor.resource.gpu.amount")
executor_gpus = conf.get("spark.executor.resource.gpu.amount")
if executor_gpus is None:
raise ValueError(
"The `spark.executor.resource.gpu.amount` is required for training"
" on GPU."
)

if not (
ss.version >= "3.4.0"
and _is_standalone_or_localcluster(sc.getConf())
gpu_per_task = conf.get("spark.task.resource.gpu.amount")
if gpu_per_task is not None and float(gpu_per_task) > 1.0:
get_logger(self.__class__.__name__).warning(
"The configuration assigns %s GPUs to each Spark task, but each "
"XGBoost training task only utilizes 1 GPU, which will lead to "
"unnecessary GPU waste",
gpu_per_task,
)
# For 3.5.1+, Spark supports task stage-level scheduling for
# Yarn/K8s/Standalone/Local cluster
# From 3.4.0 ~ 3.5.0, Spark only supports task stage-level scheduing for
# Standalone/Local cluster
# For spark below 3.4.0, Task stage-level scheduling is not supported.
#
# With stage-level scheduling, spark.task.resource.gpu.amount is not required
# to be set explicitly. Or else, spark.task.resource.gpu.amount is a must-have and
# must be set to 1.0
if spark_version < "3.4.0" or (
"3.4.0" <= spark_version < "3.5.1"
and not _is_standalone_or_localcluster(conf)
):
# We will enable stage-level scheduling in spark 3.4.0+ which doesn't
# require spark.task.resource.gpu.amount to be set explicitly
gpu_per_task = sc.getConf().get("spark.task.resource.gpu.amount")
if gpu_per_task is not None:
if float(gpu_per_task) < 1.0:
raise ValueError(
"XGBoost doesn't support GPU fractional configurations. "
"Please set `spark.task.resource.gpu.amount=spark.executor"
".resource.gpu.amount`"
)

if float(gpu_per_task) > 1.0:
get_logger(self.__class__.__name__).warning(
"%s GPUs for each Spark task is configured, but each "
"XGBoost training task uses only 1 GPU.",
gpu_per_task,
"XGBoost doesn't support GPU fractional configurations. Please set "
"`spark.task.resource.gpu.amount=spark.executor.resource.gpu."
"amount`. To enable GPU fractional configurations, you can try "
"standalone/localcluster with spark 3.4.0+ and"
"YARN/K8S with spark 3.5.1+"
)
else:
raise ValueError(
Expand Down Expand Up @@ -475,7 +482,9 @@ def _validate_params(self) -> None:
"`pyspark.ml.linalg.Vector` type."
)

self._validate_gpu_params()
ss = _get_spark_session()
sc = ss.sparkContext
self._validate_gpu_params(ss.version, sc.getConf(), _is_local(sc))

def _run_on_gpu(self) -> bool:
"""If train or transform on the gpu according to the parameters"""
Expand Down Expand Up @@ -925,10 +934,14 @@ def _skip_stage_level_scheduling(self, spark_version: str, conf: SparkConf) -> b
)
return True

if not _is_standalone_or_localcluster(conf):
if (
"3.4.0" <= spark_version < "3.5.1"
and not _is_standalone_or_localcluster(conf)
):
self.logger.info(
"Stage-level scheduling in xgboost requires spark standalone or "
"local-cluster mode"
"For %s, Stage-level scheduling in xgboost requires spark standalone "
"or local-cluster mode",
spark_version,
)
return True

Expand Down Expand Up @@ -980,7 +993,9 @@ def _try_stage_level_scheduling(self, rdd: RDD) -> RDD:
"""Try to enable stage-level scheduling"""
ss = _get_spark_session()
conf = ss.sparkContext.getConf()
if self._skip_stage_level_scheduling(ss.version, conf):
if _is_local(ss.sparkContext) or self._skip_stage_level_scheduling(
ss.version, conf
):
return rdd

# executor_cores will not be None
Expand Down
210 changes: 171 additions & 39 deletions tests/test_distributed/test_with_spark/test_spark_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,8 +929,127 @@ def test_gpu_transform(self, clf_data: ClfData) -> None:
model_loaded.set_device("cuda")
assert model_loaded._run_on_gpu()

def test_validate_gpu_params(self) -> None:
# Standalone
standalone_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.task.resource.gpu.amount", "0.08")
)
classifer_on_cpu = SparkXGBClassifier(use_gpu=False)
classifer_on_gpu = SparkXGBClassifier(use_gpu=True)

# No exception for classifier on CPU
classifer_on_cpu._validate_gpu_params("3.4.0", standalone_conf)

with pytest.raises(
ValueError, match="XGBoost doesn't support GPU fractional configurations"
):
classifer_on_gpu._validate_gpu_params("3.3.0", standalone_conf)

# No issues
classifer_on_gpu._validate_gpu_params("3.4.0", standalone_conf)
classifer_on_gpu._validate_gpu_params("3.4.1", standalone_conf)
classifer_on_gpu._validate_gpu_params("3.5.0", standalone_conf)
classifer_on_gpu._validate_gpu_params("3.5.1", standalone_conf)

# no spark.executor.resource.gpu.amount
standalone_bad_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.task.resource.gpu.amount", "0.08")
)
msg_match = (
"The `spark.executor.resource.gpu.amount` is required for training on GPU"
)
with pytest.raises(ValueError, match=msg_match):
classifer_on_gpu._validate_gpu_params("3.3.0", standalone_bad_conf)
with pytest.raises(ValueError, match=msg_match):
classifer_on_gpu._validate_gpu_params("3.4.0", standalone_bad_conf)
with pytest.raises(ValueError, match=msg_match):
classifer_on_gpu._validate_gpu_params("3.4.1", standalone_bad_conf)
with pytest.raises(ValueError, match=msg_match):
classifer_on_gpu._validate_gpu_params("3.5.0", standalone_bad_conf)
with pytest.raises(ValueError, match=msg_match):
classifer_on_gpu._validate_gpu_params("3.5.1", standalone_bad_conf)

standalone_bad_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
)
msg_match = (
"The `spark.task.resource.gpu.amount` is required for training on GPU"
)
with pytest.raises(ValueError, match=msg_match):
classifer_on_gpu._validate_gpu_params("3.3.0", standalone_bad_conf)

classifer_on_gpu._validate_gpu_params("3.4.0", standalone_bad_conf)
classifer_on_gpu._validate_gpu_params("3.5.0", standalone_bad_conf)
classifer_on_gpu._validate_gpu_params("3.5.1", standalone_bad_conf)

# Yarn and K8s mode
for mode in ["yarn", "k8s://"]:
conf = (
SparkConf()
.setMaster(mode)
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.task.resource.gpu.amount", "0.08")
)
with pytest.raises(
ValueError,
match="XGBoost doesn't support GPU fractional configurations",
):
classifer_on_gpu._validate_gpu_params("3.3.0", conf)
with pytest.raises(
ValueError,
match="XGBoost doesn't support GPU fractional configurations",
):
classifer_on_gpu._validate_gpu_params("3.4.0", conf)
with pytest.raises(
ValueError,
match="XGBoost doesn't support GPU fractional configurations",
):
classifer_on_gpu._validate_gpu_params("3.4.1", conf)
with pytest.raises(
ValueError,
match="XGBoost doesn't support GPU fractional configurations",
):
classifer_on_gpu._validate_gpu_params("3.5.0", conf)

classifer_on_gpu._validate_gpu_params("3.5.1", conf)

for mode in ["yarn", "k8s://"]:
bad_conf = (
SparkConf()
.setMaster(mode)
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
)
msg_match = (
"The `spark.task.resource.gpu.amount` is required for training on GPU"
)
with pytest.raises(ValueError, match=msg_match):
classifer_on_gpu._validate_gpu_params("3.3.0", bad_conf)
with pytest.raises(ValueError, match=msg_match):
classifer_on_gpu._validate_gpu_params("3.4.0", bad_conf)
with pytest.raises(ValueError, match=msg_match):
classifer_on_gpu._validate_gpu_params("3.5.0", bad_conf)

classifer_on_gpu._validate_gpu_params("3.5.1", bad_conf)

def test_skip_stage_level_scheduling(self) -> None:
conf = (
standalone_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.executor.cores", "12")
Expand All @@ -943,98 +1062,111 @@ def test_skip_stage_level_scheduling(self) -> None:
classifer_on_gpu = SparkXGBClassifier(use_gpu=True)

# the correct configurations should not skip stage-level scheduling
assert not classifer_on_gpu._skip_stage_level_scheduling("3.4.0", conf)
assert not classifer_on_gpu._skip_stage_level_scheduling(
"3.4.0", standalone_conf
)
assert not classifer_on_gpu._skip_stage_level_scheduling(
"3.4.1", standalone_conf
)
assert not classifer_on_gpu._skip_stage_level_scheduling(
"3.5.0", standalone_conf
)
assert not classifer_on_gpu._skip_stage_level_scheduling(
"3.5.1", standalone_conf
)

# spark version < 3.4.0
assert classifer_on_gpu._skip_stage_level_scheduling("3.3.0", conf)

assert classifer_on_gpu._skip_stage_level_scheduling("3.3.0", standalone_conf)
# not run on GPU
assert classifer_on_cpu._skip_stage_level_scheduling("3.4.0", conf)
assert classifer_on_cpu._skip_stage_level_scheduling("3.4.0", standalone_conf)

# spark.executor.cores is not set
badConf = (
bad_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.task.resource.gpu.amount", "0.08")
)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf)

# spark.executor.cores=1
badConf = (
bad_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.executor.cores", "1")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.task.resource.gpu.amount", "0.08")
)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf)

# spark.executor.resource.gpu.amount is not set
badConf = (
bad_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.task.resource.gpu.amount", "0.08")
)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf)

# spark.executor.resource.gpu.amount>1
badConf = (
bad_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "2")
.set("spark.task.resource.gpu.amount", "0.08")
)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf)

# spark.task.resource.gpu.amount is not set
badConf = (
bad_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
)
assert not classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf)
assert not classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf)

# spark.task.resource.gpu.amount=1
badConf = (
bad_conf = (
SparkConf()
.setMaster("spark://foo")
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.task.resource.gpu.amount", "1")
)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf)

# yarn
badConf = (
SparkConf()
.setMaster("yarn")
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.task.resource.gpu.amount", "1")
)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf)

# k8s
badConf = (
SparkConf()
.setMaster("k8s://")
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.task.resource.gpu.amount", "1")
)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf)

# For Yarn and K8S
for mode in ["yarn", "k8s://"]:
for gpu_amount in ["0.08", "0.2", "1.0"]:
conf = (
SparkConf()
.setMaster(mode)
.set("spark.executor.cores", "12")
.set("spark.task.cpus", "1")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.task.resource.gpu.amount", gpu_amount)
)
assert classifer_on_gpu._skip_stage_level_scheduling("3.3.0", conf)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", conf)
assert classifer_on_gpu._skip_stage_level_scheduling("3.4.1", conf)
assert classifer_on_gpu._skip_stage_level_scheduling("3.5.0", conf)

# This will be fixed when spark 4.0.0 is released.
if gpu_amount == "1.0":
assert classifer_on_gpu._skip_stage_level_scheduling("3.5.1", conf)
else:
# Starting from 3.5.1+, stage-level scheduling is working for Yarn and K8s
assert not classifer_on_gpu._skip_stage_level_scheduling(
"3.5.1", conf
)


class XgboostLocalTest(SparkTestCase):
Expand Down
Loading