Skip to content

Commit

Permalink
[backport] jvm-packages 1.6.1 (#7849)
Browse files Browse the repository at this point in the history
* [jvm-packages] move the dmatrix building into rabit context (#7823)

This fixes the QuantileDeviceDMatrix in distributed environment.

* [doc] update the jvm tutorial to 1.6.1 [skip ci] (#7834)

* [Breaking][jvm-packages] Use barrier execution mode (#7836)

With the introduction of the barrier execution mode. we don't need to kill SparkContext when some xgboost tasks failed. Instead, Spark will handle the errors for us. So in this PR, `killSparkContextOnWorkerFailure` parameter is deleted.

* [doc] remove the doc about killing SparkContext [skip ci] (#7840)

* [jvm-package] remove the coalesce in barrier mode (#7846)

* [jvm-packages] Fix model compatibility (#7845)

* Ignore all Java exceptions when looking for Linux musl support (#7844)

Co-authored-by: Bobby Wang <wbo4958@gmail.com>
Co-authored-by: Michael Allman <msa@allman.ms>
  • Loading branch information
3 people committed Apr 29, 2022
1 parent f75c007 commit f4eb6b9
Show file tree
Hide file tree
Showing 24 changed files with 173 additions and 601 deletions.
4 changes: 2 additions & 2 deletions doc/jvm/xgboost4j_spark_gpu_tutorial.rst
@@ -1,5 +1,5 @@
#############################################
XGBoost4J-Spark-GPU Tutorial (version 1.6.0+)
XGBoost4J-Spark-GPU Tutorial (version 1.6.1+)
#############################################

**XGBoost4J-Spark-GPU** is an open source library aiming to accelerate distributed XGBoost training on Apache Spark cluster from
Expand Down Expand Up @@ -220,7 +220,7 @@ application jar is iris-1.0.0.jar
cudf_version=22.02.0
rapids_version=22.02.0
xgboost_version=1.6.0
xgboost_version=1.6.1
main_class=Iris
app_jar=iris-1.0.0.jar
Expand Down
8 changes: 1 addition & 7 deletions doc/jvm/xgboost4j_spark_tutorial.rst
Expand Up @@ -16,12 +16,6 @@ This tutorial is to cover the end-to-end process to build a machine learning pip
* Building a Machine Learning Pipeline with XGBoost4J-Spark
* Running XGBoost4J-Spark in Production

.. note::

**SparkContext will be stopped by default when XGBoost training task fails**.

XGBoost4J-Spark 1.2.0+ exposes a parameter **kill_spark_context_on_worker_failure**. Set **kill_spark_context_on_worker_failure** to **false** so that the SparkContext will not be stopping on training failure. Instead of stopping the SparkContext, XGBoost4J-Spark will throw an exception instead. Users who want to re-use the SparkContext should wrap the training code in a try-catch block.

.. contents::
:backlinks: none
:local:
Expand Down Expand Up @@ -129,7 +123,7 @@ labels. A DataFrame like this (containing vector-represented features and numeri

.. note::

There is no need to assemble feature columns from version 1.6.0+. Instead, users can specify an array of
There is no need to assemble feature columns from version 1.6.1+. Instead, users can specify an array of
feture column names by ``setFeaturesCol(value: Array[String])`` and XGBoost4j-Spark will do it.

Dealing with missing values
Expand Down
Expand Up @@ -69,7 +69,7 @@ public void testBooster() throws XGBoostError {
.hasHeader().build();

int maxBin = 16;
int round = 100;
int round = 10;
//set params
Map<String, Object> paramMap = new HashMap<String, Object>() {
{
Expand Down
Expand Up @@ -56,18 +56,20 @@ class GpuPreXGBoost extends PreXGBoostProvider {
}

/**
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]]
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
* RDD[Watches] will be used as the training input
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params)
}

Expand Down Expand Up @@ -116,19 +118,21 @@ object GpuPreXGBoost extends PreXGBoostProvider {
}

/**
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
* RDD[Watches] will be used as the training input
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {

val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) =
estimator match {
Expand Down Expand Up @@ -166,7 +170,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
xgbExecParams: XGBoostExecutionParams =>
val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers,
xgbExecParams.cacheTrainingSet)
(buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
(true, buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
}

/**
Expand Down Expand Up @@ -403,14 +407,9 @@ object GpuPreXGBoost extends PreXGBoostProvider {
}

private def repartitionInputData(dataFrame: DataFrame, nWorkers: Int): DataFrame = {
// We can't check dataFrame.rdd.getNumPartitions == nWorkers here, since dataFrame.rdd is
// a lazy variable. If we call it here, we will not directly extract RDD[Table] again,
// instead, we will involve Columnar -> Row -> Columnar and decrease the performance
if (nWorkers == 1) {
dataFrame.coalesce(1)
} else {
dataFrame.repartition(nWorkers)
}
// we can't involve any coalesce operation here, since Barrier mode will check
// the RDD patterns which does not allow coalesce.
dataFrame.repartition(nWorkers)
}

private def repartitionForGroup(
Expand Down Expand Up @@ -448,7 +447,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
private def buildRDDWatches(
dataMap: Map[String, ColumnDataBatch],
xgbExeParams: XGBoostExecutionParams,
noEvalSet: Boolean): RDD[Watches] = {
noEvalSet: Boolean): RDD[() => Watches] = {

val sc = dataMap(TRAIN_NAME).rawDF.sparkSession.sparkContext
val maxBin = xgbExeParams.toMap.getOrElse("max_bin", 256).asInstanceOf[Int]
Expand All @@ -459,7 +458,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
GpuUtils.toColumnarRdd(dataMap(TRAIN_NAME).rawDF).mapPartitions({
iter =>
val iterColBatch = iter.map(table => new GpuColumnBatch(table, null))
Iterator(buildWatches(
Iterator(() => buildWatches(
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
colIndicesForTrain, iterColBatch, maxBin))
})
Expand All @@ -469,7 +468,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
val nameAndColIndices = dataMap.map(nc => (nc._1, nc._2.colIndices))
coPartitionForGpu(dataMap, sc, xgbExeParams.numWorkers).mapPartitions {
nameAndColumnBatchIter =>
Iterator(buildWatchesWithEval(
Iterator(() => buildWatchesWithEval(
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
nameAndColIndices, nameAndColumnBatchIter, maxBin))
}
Expand Down
Expand Up @@ -39,13 +39,8 @@ trait GpuTestSuite extends FunSuite with TmpFolderSuite {

def enableCsvConf(): SparkConf = {
new SparkConf()
.set(RapidsConf.ENABLE_READ_CSV_DATES.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_BYTES.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_SHORTS.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_LONGS.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_DOUBLES.key, "true")
.set("spark.rapids.sql.csv.read.float.enabled", "true")
.set("spark.rapids.sql.csv.read.double.enabled", "true")
}

def withGpuSparkSession[U](conf: SparkConf = new SparkConf())(f: SparkSession => U): U = {
Expand Down Expand Up @@ -246,12 +241,13 @@ object SparkSessionHolder extends Logging {
Locale.setDefault(Locale.US)

val builder = SparkSession.builder()
.master("local[1]")
.master("local[2]")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.rapids.sql.enabled", "false")
.config("spark.rapids.sql.test.enabled", "false")
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
.config("spark.rapids.memory.gpu.pooling.enabled", "false") // Disable RMM for unit tests.
.config("spark.sql.files.maxPartitionBytes", "1000")
.appName("XGBoost4j-Spark-Gpu unit test")

builder.getOrCreate()
Expand Down
Expand Up @@ -96,19 +96,21 @@ object PreXGBoost extends PreXGBoostProvider {
}

/**
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
* RDD[Watches] will be used as the training input
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input
* Option[RDD[_]\] is the optional cached RDD
*/
override def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
params: Map[String, Any]): XGBoostExecutionParams =>
(Boolean, RDD[() => Watches], Option[RDD[_]]) = {

if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) {
return optionProvider.get.buildDatasetToRDD(estimator, dataset, params)
Expand Down Expand Up @@ -170,12 +172,12 @@ object PreXGBoost extends PreXGBoostProvider {
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
}

}
Expand Down Expand Up @@ -311,30 +313,31 @@ object PreXGBoost extends PreXGBoostProvider {


/**
* Converting the RDD[XGBLabeledPoint] to the function to build RDD[Watches]
* Converting the RDD[XGBLabeledPoint] to the function to build RDD[() => Watches]
*
* @param trainingSet the input training RDD[XGBLabeledPoint]
* @param evalRDDMap the eval set
* @param hasGroup if has group
* @return function to build (RDD[Watches], the cached RDD)
* @return function to build (RDD[() => Watches], the cached RDD)
*/
private[spark] def buildRDDLabeledPointToRDDWatches(
trainingSet: RDD[XGBLabeledPoint],
evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(),
hasGroup: Boolean = false): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
hasGroup: Boolean = false):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {

xgbExecParams: XGBoostExecutionParams =>
composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match {
case Left(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
}
}

Expand Down Expand Up @@ -374,34 +377,34 @@ object PreXGBoost extends PreXGBoostProvider {
}

/**
* Build RDD[Watches] for Ranking
* Build RDD[() => Watches] for Ranking
* @param trainingData the training data RDD
* @param xgbExecutionParams xgboost execution params
* @param evalSetsMap the eval RDD
* @return RDD[Watches]
* @return RDD[() => Watches]
*/
private def trainForRanking(
trainingData: RDD[Array[XGBLabeledPoint]],
xgbExecutionParam: XGBoostExecutionParams,
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[Watches] = {
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[() => Watches] = {
if (evalSetsMap.isEmpty) {
trainingData.mapPartitions(labeledPointGroups => {
val watches = Watches.buildWatchesWithGroup(xgbExecutionParam,
val buildWatches = () => Watches.buildWatchesWithGroup(xgbExecutionParam,
DataUtils.processMissingValuesWithGroup(labeledPointGroups, xgbExecutionParam.missing,
xgbExecutionParam.allowNonZeroForMissing),
getCacheDirName(xgbExecutionParam.useExternalMemory))
Iterator.single(watches)
Iterator.single(buildWatches)
}).cache()
} else {
coPartitionGroupSets(trainingData, evalSetsMap, xgbExecutionParam.numWorkers).mapPartitions(
labeledPointGroupSets => {
val watches = Watches.buildWatchesWithGroup(
val buildWatches = () => Watches.buildWatchesWithGroup(
labeledPointGroupSets.map {
case (name, iter) => (name, DataUtils.processMissingValuesWithGroup(iter,
xgbExecutionParam.missing, xgbExecutionParam.allowNonZeroForMissing))
},
getCacheDirName(xgbExecutionParam.useExternalMemory))
Iterator.single(watches)
Iterator.single(buildWatches)
}).cache()
}
}
Expand Down Expand Up @@ -462,35 +465,35 @@ object PreXGBoost extends PreXGBoostProvider {
}

/**
* Build RDD[Watches] for Non-Ranking
* Build RDD[() => Watches] for Non-Ranking
* @param trainingData the training data RDD
* @param xgbExecutionParams xgboost execution params
* @param evalSetsMap the eval RDD
* @return RDD[Watches]
* @return RDD[() => Watches]
*/
private def trainForNonRanking(
trainingData: RDD[XGBLabeledPoint],
xgbExecutionParams: XGBoostExecutionParams,
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[Watches] = {
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[() => Watches] = {
if (evalSetsMap.isEmpty) {
trainingData.mapPartitions { labeledPoints => {
val watches = Watches.buildWatches(xgbExecutionParams,
val buildWatches = () => Watches.buildWatches(xgbExecutionParams,
DataUtils.processMissingValues(labeledPoints, xgbExecutionParams.missing,
xgbExecutionParams.allowNonZeroForMissing),
getCacheDirName(xgbExecutionParams.useExternalMemory))
Iterator.single(watches)
Iterator.single(buildWatches)
}}.cache()
} else {
coPartitionNoGroupSets(trainingData, evalSetsMap, xgbExecutionParams.numWorkers).
mapPartitions {
nameAndLabeledPointSets =>
val watches = Watches.buildWatches(
val buildWatches = () => Watches.buildWatches(
nameAndLabeledPointSets.map {
case (name, iter) => (name, DataUtils.processMissingValues(iter,
xgbExecutionParams.missing, xgbExecutionParams.allowNonZeroForMissing))
},
getCacheDirName(xgbExecutionParams.useExternalMemory))
Iterator.single(watches)
Iterator.single(buildWatches)
}.cache()
}
}
Expand Down
@@ -1,5 +1,5 @@
/*
Copyright (c) 2021 by Contributors
Copyright (c) 2021-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,19 +45,21 @@ private[scala] trait PreXGBoostProvider {
def transformSchema(xgboostEstimator: XGBoostEstimatorCommon, schema: StructType): StructType

/**
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
* RDD[Watches] will be used as the training input
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD
*/
def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]])
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]])

/**
* Transform Dataset
Expand Down

0 comments on commit f4eb6b9

Please sign in to comment.