Skip to content

Commit

Permalink
[jvm-packages] Integration with Spark Dataframe/Dataset (#1559)
Browse files Browse the repository at this point in the history
* bump up to scala 2.11

* framework of data frame integration

* test consistency between RDD and DataFrame

* order preservation

* test order preservation

* example code and fix makefile

* improve type checking

* improve APIs

* user docs

* work around travis CI's limitation on log length

* adjust test structure

* integrate with Spark -1 .x

* spark 2.x integration

* remove spark 1.x implementation but provide instructions on how to downgrade
  • Loading branch information
CodingCat committed Sep 11, 2016
1 parent 7ff742e commit fb02797
Show file tree
Hide file tree
Showing 15 changed files with 623 additions and 137 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -79,3 +79,5 @@ tags
*.class
target
*.swp

.DS_Store
2 changes: 1 addition & 1 deletion doc/jvm/index.md
Expand Up @@ -13,7 +13,7 @@ Before you install XGBoost4J, you need to define environment variable `JAVA_HOME

After your `JAVA_HOME` is defined correctly, it is as simple as run `mvn package` under jvm-packages directory to install XGBoost4J. You can also skip the tests by running `mvn -DskipTests=true package`, if you are sure about the correctness of your local setup.

XGBoost4J-Spark which integrates XGBoost with Spark requires to run with Spark 1.6 or newer (the default version is 1.6.1). You can build XGBoost4J-Spark as a component of XGBoost4J by running `mvn package` or specifying the spark version by `mvn -Dspark.version=1.6.0 package`.
After integrating with Dataframe/Dataset APIs of Spark 2.0, XGBoost4J-Spark only supports compile with Spark 2.x. You can build XGBoost4J-Spark as a component of XGBoost4J by running `mvn package`, and you can specify the version of spark with `mvn -Dspark.version=2.0.0 package`. (To continue working with Spark 1.x, the users are supposed to update pom.xml by modifying the properties like `spark.version`, `scala.version`, and `scala.binary.version`. Users also need to change the implemention by replacing SparkSession with SQLContext and the type of API parameters from Dataset[_] to Dataframe)

Contents
--------
Expand Down
53 changes: 52 additions & 1 deletion jvm-packages/README.md
Expand Up @@ -49,12 +49,17 @@ object XGBoostScalaExample {
```

### XGBoost Spark

XGBoost4J-Spark supports training XGBoost model through RDD and Dataframe

RDD Version:

```scala
import org.apache.spark.SparkContext
import org.apache.spark.mllib.util.MLUtils
import ml.dmlc.xgboost4j.scala.spark.XGBoost

object DistTrainWithSpark {
object SparkWithRDD {
def main(args: Array[String]): Unit = {
if (args.length != 3) {
println(
Expand Down Expand Up @@ -85,6 +90,52 @@ object DistTrainWithSpark {
}
```

Dataframe Version:

```scala
object SparkWithDataFrame {
def main(args: Array[String]): Unit = {
if (args.length != 5) {
println(
"usage: program num_of_rounds num_workers training_path test_path model_path")
sys.exit(1)
}
// create SparkSession
val sparkConf = new SparkConf().setAppName("XGBoost-spark-example")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[Booster]))
val sparkSession = SparkSession.builder().appName("XGBoost-spark-example").config(sparkConf).
getOrCreate()
// create training and testing dataframes
val inputTrainPath = args(2)
val inputTestPath = args(3)
val outputModelPath = args(4)
// number of iterations
val numRound = args(0).toInt
import DataUtils._
val trainRDDOfRows = MLUtils.loadLibSVMFile(sparkSession.sparkContext, inputTrainPath).
map{ labeledPoint => Row(labeledPoint.features, labeledPoint.label)}
val trainDF = sparkSession.createDataFrame(trainRDDOfRows, StructType(
Array(StructField("features", ArrayType(FloatType)), StructField("label", IntegerType))))
val testRDDOfRows = MLUtils.loadLibSVMFile(sparkSession.sparkContext, inputTestPath).
zipWithIndex().map{ case (labeledPoint, id) =>
Row(id, labeledPoint.features, labeledPoint.label)}
val testDF = sparkSession.createDataFrame(testRDDOfRows, StructType(
Array(StructField("id", LongType),
StructField("features", ArrayType(FloatType)), StructField("label", IntegerType))))
// training parameters
val paramMap = List(
"eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "binary:logistic").toMap
val xgboostModel = XGBoost.trainWithDataset(
trainDF, paramMap, numRound, nWorkers = args(1).toInt, useExternalMemory = true)
// xgboost-spark appends the column containing prediction results
xgboostModel.transform(testDF).show()
}
}
```

### XGBoost Flink
```scala
import ml.dmlc.xgboost4j.scala.flink.XGBoost
Expand Down
10 changes: 5 additions & 5 deletions jvm-packages/pom.xml
Expand Up @@ -14,8 +14,6 @@
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<maven.version>3.3.9</maven.version>
<scala.version>2.10.5</scala.version>
<scala.binary.version>2.10</scala.binary.version>
</properties>
<modules>
<module>xgboost4j</module>
Expand All @@ -25,13 +23,15 @@
</modules>
<profiles>
<profile>
<id>spark-1.x</id>
<id>spark-2.x</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.version>1.6.1</spark.version>
<scala.binary.version>2.10</scala.binary.version>
<spark.version>2.0.0</spark.version>
<flink.suffix>_2.11</flink.suffix>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
</profile>
</profiles>
Expand Down
@@ -0,0 +1,65 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ml.dmlc.xgboost4j.scala.example.spark

import ml.dmlc.xgboost4j.scala.Booster
import ml.dmlc.xgboost4j.scala.spark.{XGBoost, DataUtils}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.{SparkContext, SparkConf}

object SparkWithDataFrame {
def main(args: Array[String]): Unit = {
if (args.length != 5) {
println(
"usage: program num_of_rounds num_workers training_path test_path model_path")
sys.exit(1)
}
// create SparkSession
val sparkConf = new SparkConf().setAppName("XGBoost-spark-example")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[Booster]))
val sqlContext = new SQLContext(new SparkContext(sparkConf))
// create training and testing dataframes
val inputTrainPath = args(2)
val inputTestPath = args(3)
val outputModelPath = args(4)
// number of iterations
val numRound = args(0).toInt
import DataUtils._
val trainRDDOfRows = MLUtils.loadLibSVMFile(sqlContext.sparkContext, inputTrainPath).
map{ labeledPoint => Row(labeledPoint.features, labeledPoint.label)}
val trainDF = sqlContext.createDataFrame(trainRDDOfRows, StructType(
Array(StructField("features", ArrayType(FloatType)), StructField("label", IntegerType))))
val testRDDOfRows = MLUtils.loadLibSVMFile(sqlContext.sparkContext, inputTestPath).
zipWithIndex().map{ case (labeledPoint, id) =>
Row(id, labeledPoint.features, labeledPoint.label)}
val testDF = sqlContext.createDataFrame(testRDDOfRows, StructType(
Array(StructField("id", LongType),
StructField("features", ArrayType(FloatType)), StructField("label", IntegerType))))
// training parameters
val paramMap = List(
"eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "binary:logistic").toMap
val xgboostModel = XGBoost.trainWithDataFrame(
trainDF, paramMap, numRound, nWorkers = args(1).toInt, useExternalMemory = true)
// xgboost-spark appends the column containing prediction results
xgboostModel.transform(testDF).show()
}
}
Expand Up @@ -21,7 +21,7 @@ import ml.dmlc.xgboost4j.scala.spark.{DataUtils, XGBoost}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.util.MLUtils

object DistTrainWithSpark {
object SparkWithRDD {
def main(args: Array[String]): Unit = {
if (args.length != 5) {
println(
Expand All @@ -45,7 +45,7 @@ object DistTrainWithSpark {
"eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "binary:logistic").toMap
val xgboostModel = XGBoost.train(trainRDD, paramMap, numRound, nWorkers = args(1).toInt,
val xgboostModel = XGBoost.trainWithRDD(trainRDD, paramMap, numRound, nWorkers = args(1).toInt,
useExternalMemory = true)
xgboostModel.booster.predict(new DMatrix(testSet))
// save model to HDFS path
Expand Down
11 changes: 3 additions & 8 deletions jvm-packages/xgboost4j-flink/pom.xml
Expand Up @@ -35,22 +35,17 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<artifactId>flink-scala${flink.suffix}</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala</artifactId>
<artifactId>flink-clients${flink.suffix}</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml</artifactId>
<artifactId>flink-ml${flink.suffix}</artifactId>
<version>0.10.2</version>
</dependency>
</dependencies>
Expand Down
Expand Up @@ -18,10 +18,9 @@ package ml.dmlc.xgboost4j.scala.spark

import scala.collection.JavaConverters._

import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector}
import org.apache.spark.mllib.regression.{LabeledPoint => SparkLabeledPoint}

import ml.dmlc.xgboost4j.LabeledPoint
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.mllib.regression.{LabeledPoint => SparkLabeledPoint}

object DataUtils extends Serializable {
implicit def fromSparkPointsToXGBoostPointsJava(sps: Iterator[SparkLabeledPoint])
Expand Down
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.{SparkContext, TaskContext}

object XGBoost extends Serializable {
Expand Down Expand Up @@ -111,6 +112,33 @@ object XGBoost extends Serializable {
}.cache()
}

/**
*
* @param trainingData the trainingset represented as DataFrame
* @param params Map containing the parameters to configure XGBoost
* @param round the number of iterations
* @param nWorkers the number of xgboost workers, 0 by default which means that the number of
* workers equals to the partition number of trainingData RDD
* @param obj the user-defined objective function, null by default
* @param eval the user-defined evaluation function, null by default
* @param useExternalMemory indicate whether to use external memory cache, by setting this flag as
* true, the user may save the RAM cost for running XGBoost within Spark
* @param missing the value represented the missing value in the dataset
* @param inputCol the name of input column, "features" as default value
* @param labelCol the name of output column, "label" as default value
* @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed
* @return XGBoostModel when successful training
*/
@throws(classOf[XGBoostError])
def trainWithDataFrame(trainingData: Dataset[_],
params: Map[String, Any], round: Int,
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN,
inputCol: String = "features", labelCol: String = "label"): XGBoostModel = {
new XGBoostEstimator(inputCol, labelCol, params, round, nWorkers, obj, eval,
useExternalMemory, missing).fit(trainingData)
}

/**
*
* @param trainingData the trainingset represented as RDD
Expand All @@ -127,9 +155,9 @@ object XGBoost extends Serializable {
* @return XGBoostModel when successful training
*/
@throws(classOf[XGBoostError])
def train(trainingData: RDD[LabeledPoint], configMap: Map[String, Any], round: Int,
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = {
def trainWithRDD(trainingData: RDD[LabeledPoint], configMap: Map[String, Any], round: Int,
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = {
require(nWorkers > 0, "you must specify more than 0 workers")
val tracker = new RabitTracker(nWorkers)
implicit val sc = trainingData.sparkContext
Expand Down
@@ -0,0 +1,81 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ml.dmlc.xgboost4j.scala.spark

import ml.dmlc.xgboost4j.scala.{EvalTrait, ObjectiveTrait}
import org.apache.spark.ml.{Predictor, Estimator}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{NumericType, DoubleType, StructType}
import org.apache.spark.sql.{DataFrame, TypedColumn, Dataset, Row}

/**
* the estimator wrapping XGBoost to produce a training model
*
* @param inputCol the name of input column
* @param labelCol the name of label column
* @param xgboostParams the parameters configuring XGBoost
* @param round the number of iterations to train
* @param nWorkers the total number of workers of xgboost
* @param obj the customized objective function, default to be null and using the default in model
* @param eval the customized eval function, default to be null and using the default in model
* @param useExternalMemory whether to use external memory when training
* @param missing the value taken as missing
*/
class XGBoostEstimator(
inputCol: String, labelCol: String,
xgboostParams: Map[String, Any], round: Int, nWorkers: Int,
obj: ObjectiveTrait = null,
eval: EvalTrait = null, useExternalMemory: Boolean = false, missing: Float = Float.NaN)
extends Estimator[XGBoostModel] {

override val uid: String = Identifiable.randomUID("XGBoostEstimator")


/**
* produce a XGBoostModel by fitting the given dataset
*/
def fit(trainingSet: Dataset[_]): XGBoostModel = {
val instances = trainingSet.select(
col(inputCol), col(labelCol).cast(DoubleType)).rdd.map {
case Row(feature: Vector, label: Double) =>
LabeledPoint(label, feature)
}
transformSchema(trainingSet.schema, logging = true)
val trainedModel = XGBoost.trainWithRDD(instances, xgboostParams, round, nWorkers, obj,
eval, useExternalMemory, missing).setParent(this)
copyValues(trainedModel)
}

override def copy(extra: ParamMap): Estimator[XGBoostModel] = {
defaultCopy(extra)
}

override def transformSchema(schema: StructType): StructType = {
// check input type, for now we only support vectorUDT as the input feature type
val inputType = schema(inputCol).dataType
require(inputType.equals(new VectorUDT), s"the type of input column $inputCol has to VectorUDT")
// check label Type,
val labelType = schema(labelCol).dataType
require(labelType.isInstanceOf[NumericType], s"the type of label column $labelCol has to" +
s" be NumericType")
schema
}
}

9 comments on commit fb02797

@tanwanirahul
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodingCat
Copy link
Member Author

@CodingCat CodingCat commented on fb02797 Sep 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if possible, please keep the dev-related discussion in github...

@CodingCat
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The XGBoostEstimator and the XGBoostModel does not use/implement the params interface. All the ml models in Spark, use the Params trait / interface for the hyper parameters. Due to this, we cannot use the current implementation with CrossValidator in Spark. Also, any new functionality in Spark relying on the assumptions on Params/ ParamsMap could not be seamlessly integrated with this implementation. For e.g - If someone adds spearmint package with Spark, it would likely work on Params interface only.

as mentioned in PR, this will be included in another PR

  1. Dependency on Spark 2.0 - Most of the current product deployments would be on Spark 1.x. It may take more than a year before these apps are migrated to 2.0 especially because the apps need to also upgrade to Scala 2.11. I feel we should support the Spark 1.6 as well side by side and provide the support for DataFrame and Pipeline APIs.

please feel free to maintain spark 1.6 version, since Spark 2.0 has broken the backward compatibility to Spark 1.x, we have to choose only one of them

  1. Backward incompatible changes - XGBoost.train has been removed and is substituted by trainWithRDD and trainWithDataFrame. We should rather keep the train method as is and add the overloaded method for the DataSet.

if you once tried that you will find scala does not allow to overload methods with default parameters

@CodingCat
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that you can merge your params related part with the master

@CodingCat
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tanwanirahul , if you decide to contribute to params implementation, please let me know ASAP, otherwise, I will start working on it after tmr

@CodingCat
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tanwanirahul , for your question about release management,

JVM APIs are subject to change in recent releases, basically, jvm-packages 0.5 is released in xgboost 0.6 because we skipped XGBoost 0.5....

There will be the incompatible change from 0.6 to 0.7 due to the reasons I mentioned above

Sorry for any inconvenience

@tanwanirahul
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodingCat Thanks for the explanation. I will push the params related change by tomorrow. Also, let me see if I can get it working on Spark 1.6 and maintain the different branch for it.

Backward incompatible changes - XGBoost.train has been removed and is substituted by trainWithRDD and trainWithDataFrame. We should rather keep the train method as is and add the overloaded method for the DataSet.

-- if you once tried that you will find scala does not allow to overload methods with default parameters

I didn't notice the default param values there. I would have still liked to keep the train method as is working for RDD and the new method with DataFrame/Dataset. My only aim is to make the changes compatible so that users keep upgrading to the latest changes quite often.

@CodingCat
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tanwanirahul for now I do not think maintaining different branches in XGBooost branch is the adopted release strategy. If you like, you can maintain a version in a different repository

@doctapp
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be very helpful for maintaining a Spark 1.6 branch as many of the enterprise software stacks still use Spark 1.6. Tx

Please sign in to comment.