Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages][spark-gpu] Add GPU support for XGBoost4j-Spark-Gpu #7361

Closed
wants to merge 6 commits into from

Conversation

wbo4958
Copy link
Contributor

@wbo4958 wbo4958 commented Oct 26, 2021

This PR is trying to bring spark-rapids to accelerate XGBoost from end to end by GPU. It is a following-up or replacement for #5950. For more history can refer to this comment

This PR first reworked CPU training and transform pipeline and then added GPU pipeline for xgboost4j-spark-gpu

Reworked CPU pipeline

  1. Reworked CPU train/transform pipeline
  2. Moved the data-preparations into PreXGBoost before XGBoost train
  3. Changed the API of XGBoost to train on RDD[Watches] instead of RDD[LabeledPointed]. Watches is an collection of DMatrix

Added GPU pipeline in XGBoost4j-Spark-Gpu

The goal of XGBoost4j-Spark-Gpu is to support training/transforming on both CPU and GPU.

  1. Soft link the CPU pipeline into XGBoost4j-Spark-Gpu, so it can fully support CPU
  2. Add some GPU params for GPU-only
  3. Add spark-rapids dependency to accelerate ETL by GPU.
  4. Define PreXGBoostProvider and implement it by GPU.
  5. add ServiceLoader to discover GPU implementation
  6. implement the APIs of PreXGBoostProvider

Design and APIs

xgboost-plugin-way

The whole picture can refer the above flowchart.

This PR has defined below APIs for different implementations.

buildDatasetToRDD to convert Dataset into RDD[Watches], Watches is a collection of DMatrix

This PR has moved the code for data preparation into PreXGBoost. For the built-in CPU pipeline, the data preparation first converts the Dataset into RDD[LabeledPoint] and then to RDD[Watches], finally to RDD[Booster]. The LabeledPoint way is quite CPU-related, which converts each Row into LabeledPoint which is row-wised. But GPU honors column-wised data, which means GPU do not need LabeledPoint any more. GPU can have its own way to build DMatrix from the column data.

transformSchema

the built-in transformSchema in the Spark ML framework requires the feature column and label columns. The feature column is the vectorized type of all feature columns. Just like the description as above, GPU way honors column data, and do not need to vectorize the feature columns, So for GPU, we need to intercept transformSchema and do check by itself.

transformDataset

Same as buildDatasetToRDD, CPU and GPU has different implementations.

How to discover GPU implementation

This PR makes the GPU implementation into a Plugin-way, and load it by ServiceLoader.

For xgboost4j-spark, this PR doesn't define the service in resource, so it will default not to detect GPU implementation.

For xgboost4j-spark-gpu, this PR declares the GPU implementation in the service in resource, so it can detect the GPU implementation by default. This PR also define an API to check if the GPU implementation is enabled or disabled. if it's not enabled, it will fall back to CPU implementation.

Usage for GPU

GPU version

    // user can get rid of this and specify the configs when submitting application
    val conf = new SparkConf()
      .set("spark.rapids.sql.enabled", "true")
      .set("spark.plugins", "com.nvidia.spark.SQLPlugin")

    val spark = SparkSession.builder()
      .master("local[1]")
      .config(conf)
      .appName(classOf[BobbyXGBoostSuite].getSimpleName)
      .getOrCreate()

      val schema = new StructType(Array(
        StructField("sepal length", DoubleType, true),
        StructField("sepal width", DoubleType, true),
        StructField("petal length", DoubleType, true),
        StructField("petal width", DoubleType, true),
        StructField("class", StringType, true)))
      val rawInput = spark.read.schema(schema).csv(path)

      val label = "class"
      // get all feature column names
      val featuresNames = schema.fieldNames.filterNot(f => f.equals(label)).toArray

      val xgbParam = Map("eta" -> 0.1f,
        "max_depth" -> 2,
        "objective" -> "multi:softprob",
        "num_class" -> 3,
        "num_round" -> 100,
        "num_workers" -> 1,
        "tree_method" -> "gpu_hist",
      )

      val xgbClassifier = new XGBoostClassifier(xgbParam)
        .setLabelCol(label)
        .setFeaturesCols(featuresNames)  // API for GPU-only

      val xgbClassificationModel = xgbClassifier.fit(rawInput)

      val df = xgbClassificationModel.transform(rawInput)
      df.show()

1. Add PreXGBoost to build RDD[Watches] from Dataset
2. Feed RDD[Watches] built from PreXGBoost to XGBoost to train
extract the common part of transform code from XGBoostClassifier
and XGBoostRegressor
add Rapids plugin support
@wbo4958 wbo4958 changed the title Add GPU support for XGBoost4j-Spark-Gpu [jvm-packages][gpu-spark]Add GPU support for XGBoost4j-Spark-Gpu Oct 26, 2021
@wbo4958 wbo4958 changed the title [jvm-packages][gpu-spark]Add GPU support for XGBoost4j-Spark-Gpu [jvm-packages][gpu-spark] Add GPU support for XGBoost4j-Spark-Gpu Oct 26, 2021
@wbo4958 wbo4958 changed the title [jvm-packages][gpu-spark] Add GPU support for XGBoost4j-Spark-Gpu [jvm-packages][spark-gpu] Add GPU support for XGBoost4j-Spark-Gpu Oct 26, 2021
@wbo4958
Copy link
Contributor Author

wbo4958 commented Oct 26, 2021

@trivialfis @hcho3 @RAMitchell please help to review it. thx

Copy link
Member

@trivialfis trivialfis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a few preliminary questions:

  • Does the user need to call functions from pre-xgboost before training/prediction?
  • Those functions that are marked "GPU Only", is it a limitation for the interface or can be extended to CPU in the future?
  • What's buildUnsafeRows and why do you need it?

Assuming the API is fine, we can work with the c++ code first.

}

test("distributed training with group data") {
val trainingRDD = sc.parallelize(Ranking.train, 5)
val buildTrainingRDD = PreXGBoost.buildRDDLabeledPointToRDDWatches(trainingRDD, hasGroup = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required for users? If so that's a breaking change and can we hide it inside XGBoost?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not kind of breaking change. since the trainDistributed is limited to be accessed by [spark] package, meaning user should not suppose to use this function.

Copy link
Member

@trivialfis trivialfis Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not kind of breaking change. since the trainDistributed is limited to be accessed by [spark] package, meaning user should not suppose to use this function.

Then why do you need to change the test?

@@ -254,85 +253,28 @@ class XGBoostRegressionModel private[ml] (

def setInferBatchSize(value: Int): this.type = set(inferBatchSize, value)

/**
* This API is only used in GPU train pipeline of xgboost4j-spark-gpu, which requires
* all feature columns must be numeric types.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the CPU can have other types? Also, GPU now handles categorical data type.

try {
cb.close();
} catch (Exception e) {
e.printStackTrace();
Copy link
Member

@trivialfis trivialfis Oct 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ... is the exception being thrown?

import ml.dmlc.xgboost4j.gpu.java.CudfColumnBatch;

/**
* CudfTable with schema for scala
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document its functionality, like what's it for.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Oct 28, 2021

Have a few preliminary questions:

  • Does the user need to call functions from pre-xgboost before training/prediction?
  • Those functions that are marked "GPU Only", is it a limitation for the interface or can be extended to CPU in the future?
  • What's buildUnsafeRows and why do you need it?

Assuming the API is fine, we can work with the c++ code first.

Does the user need to call functions from pre-xgboost before training/prediction?

No, PreXGBoost is a util to convert Dataset into RDD[DMatrix], User will not use this function directly.

  • Those functions that are marked "GPU Only", is it a limitation for the interface or can be extended to CPU in the future?

Yeah, it's a limitation, since GPU is column-wised while CPU is row-wised. In the future, maybe we can unify the interface for GPU and CPU.

  • What's buildUnsafeRows and why do you need it?

The transformed data is kind of Row-flattened in GPU. so the buildUnsafeRow is to convert the format spark needs in GPU and then copy to CPU, finally feed to Spark.

@trivialfis
Copy link
Member

Sharing the offline discussion here. I think the user interface is fine since it aligns with the existing CPU implemetation. We will start with smaller and lower level infrastructure first by eliminating unnecessary code.

cc @hcho3 @RAMitchell

@wbo4958
Copy link
Contributor Author

wbo4958 commented Nov 3, 2021

Thx @trivialfis, I just removed the unsafe row building since it's unnecessary. Could you help to review it again?

Copy link
Member

@trivialfis trivialfis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial review. Great work that this PR seems to be a lot more cleaner than the previous version.

So let me try to summarize the PR based on my own understanding. Feel free to correct me if I'm wrong. You have extracted the pre-processing steps into individual modules for CPU and GPU as PreXGBoosXXX and added glue code for converting Dataset into RDD. That seems to be a viable approach to proceed.

Some questions are inlined in the review.

@@ -18,6 +18,7 @@ endif (ENABLE_ALL_WARNINGS)
target_link_libraries(xgboost4j PRIVATE objxgboost)
target_include_directories(xgboost4j
PRIVATE
${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we no longer have new CUDA code, is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is rough PR, will clean it up

<scala.version>2.12.8</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop.version>2.7.3</hadoop.version>
<maven.wagon.http.retryHandler.count>5</maven.wagon.http.retryHandler.count>
<log.capi.invocation>OFF</log.capi.invocation>
<use.cuda>OFF</use.cuda>
<cudf.version>21.08.2</cudf.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to have this in CPU package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no big deal, it's just a property not dependency

@@ -0,0 +1,276 @@
/*
Copyright (c) 2014 by Contributors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to put files under nvidia namespace? xgboost is not a nvidia project by itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will change the package name.


/** Slice the columns indicated by indices into a Table*/
public Table slice(List<Integer> indices) {
if (indices == null || indices.size() == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will it be NULL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no

}

/** Slice the columns indicated by indices into a Table*/
public Table slice(List<Integer> indices) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does cuDF java binding support slicing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no

return schema;
}

public double getMaxInColumn(int colIndex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function being used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, will clean it up

* @param weightInfo Weight information calculated from earlier batches.
* @return The group id of last group in current column batch.
*/
public int groupAndAggregateOnColumnsHost(int groupIdx, int weightIdx, int prevTailGid,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rough PR, will clean it up.

missing: Float,
maxBin: Int): DMatrix = {
// FIXME add option or dynamic to check.
if (true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rough PR, will clean it up


private[this] class RapidsIterator(base: Iterator[GpuColumnBatch],
indices: ColumnIndices) extends Iterator[CudfColumnBatch] {
var maxLabels: Double = 0.0f
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

draft PR , will clean it up

@@ -0,0 +1,53 @@
/*
Copyright (c) 2014 by Contributors
Copy link
Member

@trivialfis trivialfis Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intended to put this file in CPU package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the para is shared by CPU and GPU, since the entry (XGBoostClassifier/XGBoostRegressor) is located in CPU package

@wbo4958
Copy link
Contributor Author

wbo4958 commented Nov 8, 2021

Since this is a draft PR to make GPU pipeline implementation happen, I didn't take much care about the trivial things which will be resolved in the following small PR. So let me begin to put up the real PR. Thx @trivialfis

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants