New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] call setGroup for ranking task #2066

Merged
merged 12 commits into from Mar 6, 2017

Conversation

Projects
None yet
4 participants
@cloverrose
Contributor

cloverrose commented Feb 26, 2017

xgboost4j-spark can not be used for ranking task now to my knowledge.
This patch supports ranking task( ONLY trainWithRDD, train now.
I can not find way to make patch for trainWithDataFrame.)

Example

Making groupData correctly (Num partitions should be same as trainingSet and num elements of each partition should be same as trainingSet) may be confusing.
For example, I make it from pair RDD (RDD[(Seq[String], Int)]), from key output LibSVM format, from value output Group Input Format

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

case class Item(label: Double, feature: Array[Double]) {
  override def toString: String = {
    val featurePart = feature.zipWithIndex flatMap { case (v, index) =>
      if (v != 0) {
        Some(f"${index + 1}:${v}")
      } else {
        None
      }
    } mkString(" ")
    f"${label} ${featurePart}"
  }
}
case class Query(items: Seq[Item])

object MakeLearningData {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("LearningToRankSample")
      .set("spark.serializer", "org.apach.spark.serializer.KryoSerializer")

    val sc = new SparkContext(sparkConf)
  }

  def convert(sc: SparkContext, queries: Seq[Query], output: String): Unit = {
    val temp: RDD[(Seq[String], Int)] = sc.parallelize(queries) map { case query: Query =>
      val texts: Seq[String] = query.items map { _.toString }
      val size: Int = query.items.size
      (texts, size)
    }

    temp.persist() // To share same partition num, size
    temp flatMap { case (texts, size) => texts } saveAsTextFile(output)
    temp map { case (texts, size) => size } saveAsTextFile(output + ".group")
    temp.unpersist()
  }
}

In learning phase, I make Helper class

import ml.dmlc.xgboost4j.scala.spark.{XGBoost, XGBoostModel}
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.rdd.RDD

object XgbHelper {

  def convGroupRDD(groupData: RDD[Int]): Seq[Seq[Int]] = {
    groupData mapPartitions { it =>
      Iterator(it.toList)
    } collect()
  }

  def learn(trainRDD: RDD[LabeledPoint], groupData: RDD[Int],
            params: Map[String, Any], numRound: Int): XGBoostModel = {
    val groupDataSeq: Seq[Seq[Int]] = convGroupRDD(groupData)
    XGBoost.trainWithRDD(trainRDD, params, numRound, nWorkers = trainRDD.partitions.length,
      groupData = groupDataSeq)
  }
}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.ml.feature.LabeledPoint

import ml.dmlc.xgboost4j.scala.Booster

object LearningToRankSample {
  def main(args: Array[String]): Unit = {
    if (args.length != 3) sys.error(
      """Usage: <numRound> <inputPath> <outputPath>""".stripMargin
    )

    val sparkConf = new SparkConf().setAppName("LearningToRankSample")
      .set("spark.serializer", "org.apach.spark.serializer.KryoSerializer")

    implicit val sc = new SparkContext(sparkConf)
    sparkConf.registerKryoClasses(Array(classOf[Booster]))

    val numRound = args(0).toInt
    val inputPath = args(1)
    val outputPath = args(2)

    val params: Map[String, Any] = Map(
      "objective" -> "rank:ndcg",
      "eval_metrics" -> "ndcg",
      "eta" -> 0.1,
      "gamma" -> 1.0,
      "min_child_weight" -> 0.1,
      "max_depth" -> 6
    )

    val trainRDD = MLUtils.loadLibSVMFile(sc, inputPath) map { lp =>
      LabeledPoint(lp.label, lp.features.asML)
    }
    val groupData = sc.textFile(inputPath + ".group") map { _.toInt }
    val model = XgbHelper.learn(trainRDD, groupData, params, numRound)
    model.saveModelAsHadoopFile(outputPath)
  }
}

I want to know this way is right.
If this way is right, I'm willing to write test code.

@codecov-io

This comment has been minimized.

Show comment
Hide comment
@codecov-io

codecov-io Feb 26, 2017

Codecov Report

Merging #2066 into master will decrease coverage by -0.04%.
The diff coverage is n/a.

@@            Coverage Diff             @@
##           master    #2066      +/-   ##
==========================================
- Coverage   30.65%   30.62%   -0.04%     
==========================================
  Files          72       72              
  Lines        5836     5836              
  Branches      583      583              
==========================================
- Hits         1789     1787       -2     
- Misses       3970     3971       +1     
- Partials       77       78       +1
Impacted Files Coverage Δ
src/data/sparse_page_raw_format.cc 80.76% <0%> (-3.85%)
src/objective/regression_obj.cc 89.83% <0%> (-0.57%)
src/data/simple_dmatrix.cc 58.66% <0%> (+0.66%)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ab13fd7...56fcfbc. Read the comment docs.

codecov-io commented Feb 26, 2017

Codecov Report

Merging #2066 into master will decrease coverage by -0.04%.
The diff coverage is n/a.

@@            Coverage Diff             @@
##           master    #2066      +/-   ##
==========================================
- Coverage   30.65%   30.62%   -0.04%     
==========================================
  Files          72       72              
  Lines        5836     5836              
  Branches      583      583              
==========================================
- Hits         1789     1787       -2     
- Misses       3970     3971       +1     
- Partials       77       78       +1
Impacted Files Coverage Δ
src/data/sparse_page_raw_format.cc 80.76% <0%> (-3.85%)
src/objective/regression_obj.cc 89.83% <0%> (-0.57%)
src/data/simple_dmatrix.cc 58.66% <0%> (+0.66%)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ab13fd7...56fcfbc. Read the comment docs.

@CodingCat CodingCat self-requested a review Feb 26, 2017

@CodingCat

instead of breaking the current API signature, you can just pass groupData through xgBoostConfMap

for DF, you can just add a parameter to hold groupData

@cloverrose

This comment has been minimized.

Show comment
Hide comment
@cloverrose

cloverrose Feb 26, 2017

Contributor

Thank you for your advice. I updated code.

And I also understood trainWithDataFrame's params will be passed buildDistributedBoosters as xgBoostConfMap.

Contributor

cloverrose commented Feb 26, 2017

Thank you for your advice. I updated code.

And I also understood trainWithDataFrame's params will be passed buildDistributedBoosters as xgBoostConfMap.

@CodingCat

you have to create a corresponding param which will be captured by XGBoostEstimator to make it runnable

Show outdated Hide outdated ...boost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
// remove groupData because this value is very large
val xgBoostConfMapFiltered = xgBoostConfMap.filterKeys(_ != "groupData")
partitionedTrainingSet.mapPartitionsWithIndex {

This comment has been minimized.

@CodingCat

CodingCat Feb 27, 2017

Member

please put the original comments back

@CodingCat

CodingCat Feb 27, 2017

Member

please put the original comments back

Show outdated Hide outdated ...boost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
booster = SXGBoost.train(trainingSet, xgBoostConfMap, round,
if (groupData != null) {
trainingSet.setGroup(groupData(partIndex).toArray)
}

This comment has been minimized.

@CodingCat

CodingCat Feb 27, 2017

Member

what's the difference to serialize groupData as a part of xgboostConfMap and an independent variable?

@CodingCat

CodingCat Feb 27, 2017

Member

what's the difference to serialize groupData as a part of xgboostConfMap and an independent variable?

This comment has been minimized.

@cloverrose

cloverrose Mar 2, 2017

Contributor

Sorry, I can not understand the intent of the question correctly.
If my answer dose not make sense, please let me know.

I want to call setGroup(group: Array[Int])

And checking xgBoostConfMap is costly.

So I check xgBoostConfMap and make independ variable.

@cloverrose

cloverrose Mar 2, 2017

Contributor

Sorry, I can not understand the intent of the question correctly.
If my answer dose not make sense, please let me know.

I want to call setGroup(group: Array[Int])

And checking xgBoostConfMap is costly.

So I check xgBoostConfMap and make independ variable.

This comment has been minimized.

@CodingCat

CodingCat Mar 2, 2017

Member

say 'xgBoostConfMap' has M bytes, 'groupData' has N bytes, with a naive (and incorrect) xgBoostConfMapFiltered will have M - N bytes (which should be larger than M-N considering JVM overhead, etc.)

You still call groupData(partIndex) in the closure of mapPartition, so groupData (N bytes) will still be serialized and pass to executors, and xgBoostConfMapFiltered is also called within mapPartition, so another (M - N) will be serialized and passed to executors...

I didn't see any difference here

@CodingCat

CodingCat Mar 2, 2017

Member

say 'xgBoostConfMap' has M bytes, 'groupData' has N bytes, with a naive (and incorrect) xgBoostConfMapFiltered will have M - N bytes (which should be larger than M-N considering JVM overhead, etc.)

You still call groupData(partIndex) in the closure of mapPartition, so groupData (N bytes) will still be serialized and pass to executors, and xgBoostConfMapFiltered is also called within mapPartition, so another (M - N) will be serialized and passed to executors...

I didn't see any difference here

Show outdated Hide outdated ...boost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
val groupData: Seq[Seq[Int]] = xgBoostConfMap.get("groupData") match {
case Some(gpData) => gpData.asInstanceOf[Seq[Seq[Int]]]
case None => null

This comment has been minimized.

@CodingCat

CodingCat Mar 2, 2017

Member

because of your optimization, you will serialize a null and pass to executors when users are not doing ranking....why not just judge whether .get('xxxx') isDefined?

@CodingCat

CodingCat Mar 2, 2017

Member

because of your optimization, you will serialize a null and pass to executors when users are not doing ranking....why not just judge whether .get('xxxx') isDefined?

This comment has been minimized.

@CodingCat

CodingCat Mar 2, 2017

Member

Additionally, if you really see groupData makes xgBoostConfMap as larger as a performance bottleneck, you can pass xgBoostConfMap by broadcasting

@CodingCat

CodingCat Mar 2, 2017

Member

Additionally, if you really see groupData makes xgBoostConfMap as larger as a performance bottleneck, you can pass xgBoostConfMap by broadcasting

This comment has been minimized.

@CodingCat

CodingCat Mar 2, 2017

Member

Sorry, I just forgot one thing, right now, xgboostconfmap is broadcast by spark automatically

@CodingCat

CodingCat Mar 2, 2017

Member

Sorry, I just forgot one thing, right now, xgboostconfmap is broadcast by spark automatically

This comment has been minimized.

@CodingCat

CodingCat Mar 2, 2017

Member

IIRC, spark shall broadcast xgboostconfmap automatically in this case

@CodingCat

CodingCat Mar 2, 2017

Member

IIRC, spark shall broadcast xgboostconfmap automatically in this case

@cloverrose

This comment has been minimized.

Show comment
Hide comment
@cloverrose

cloverrose Mar 2, 2017

Contributor

Thank you polite description. I may understand your message and my mistake.

Contributor

cloverrose commented Mar 2, 2017

Thank you polite description. I may understand your message and my mistake.

@CodingCat

This comment has been minimized.

Show comment
Hide comment
@CodingCat

CodingCat Mar 3, 2017

Member

unit test failed

Member

CodingCat commented Mar 3, 2017

unit test failed

@CodingCat

This comment has been minimized.

Show comment
Hide comment
@CodingCat

CodingCat Mar 3, 2017

Member

can you create a dummy dataset and the corresponding unit test?

Member

CodingCat commented Mar 3, 2017

can you create a dummy dataset and the corresponding unit test?

cloverrose added some commits Mar 5, 2017

Merge remote-tracking branch 'upstream/master' into setGroup-spark
 Conflicts:
	jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/LearningTaskParams.scala
@CodingCat

This comment has been minimized.

Show comment
Hide comment
@CodingCat

CodingCat Mar 5, 2017

Member

@terrytangyuan , any idea on what happened on python test?

Member

CodingCat commented Mar 5, 2017

@terrytangyuan , any idea on what happened on python test?

@terrytangyuan

This comment has been minimized.

Show comment
Hide comment
@terrytangyuan

terrytangyuan Mar 5, 2017

Member

Not sure why it's flaky. It also seems like we cannot re-run a particular task on Travis anymore.

Member

terrytangyuan commented Mar 5, 2017

Not sure why it's flaky. It also seems like we cannot re-run a particular task on Travis anymore.

@CodingCat

This comment has been minimized.

Show comment
Hide comment
@CodingCat

CodingCat Mar 6, 2017

Member

I found that there is a restart build button, and it seems that we can click that to restart building?

Member

CodingCat commented Mar 6, 2017

I found that there is a restart build button, and it seems that we can click that to restart building?

@terrytangyuan

This comment has been minimized.

Show comment
Hide comment
@terrytangyuan

terrytangyuan Mar 6, 2017

Member

Yeah. Seems like it passed this time. I don't know why I couldn't find that button anymore.

Member

terrytangyuan commented Mar 6, 2017

Yeah. Seems like it passed this time. I don't know why I couldn't find that button anymore.

@CodingCat

This comment has been minimized.

Show comment
Hide comment
@CodingCat

CodingCat Mar 6, 2017

Member

permission issues?

Member

CodingCat commented Mar 6, 2017

permission issues?

@CodingCat

for the demo data, can you use some much smaller dataset or you can move all these resources/dataset to the root path and access them directly,

@@ -341,4 +342,50 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
assert(loadedXGBoostModel.getLabelCol == "label")
assert(loadedXGBoostModel.getPredictionCol == "prediction")
}
test("test use groupData") {

This comment has been minimized.

@CodingCat

CodingCat Mar 6, 2017

Member

can you also test is DF-based approach is working?

@CodingCat

CodingCat Mar 6, 2017

Member

can you also test is DF-based approach is working?

Show outdated Hide outdated ...boost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
partitionedTrainingSet.mapPartitions {
trainingSamples =>
partitionedTrainingSet.mapPartitionsWithIndex {
case (partIndex, trainingSamples) =>

This comment has been minimized.

@CodingCat

CodingCat Mar 6, 2017

Member

I just noticed this change, you don't need to use mapPartitionsWithIndex, and you can just directly use TaskContext.getPartitionId() to get partitionIndex

@CodingCat

CodingCat Mar 6, 2017

Member

I just noticed this change, you don't need to use mapPartitionsWithIndex, and you can just directly use TaskContext.getPartitionId() to get partitionIndex

@CodingCat

This comment has been minimized.

Show comment
Hide comment
@CodingCat

CodingCat Mar 6, 2017

Member

LGTM, merged, thanks

Member

CodingCat commented Mar 6, 2017

LGTM, merged, thanks

@CodingCat CodingCat merged commit 288f309 into dmlc:master Mar 6, 2017

3 of 4 checks passed

codecov/project 30.62% (-0.04%) compared to ab13fd7
Details
codecov/patch Coverage not affected when comparing ab13fd7...56fcfbc
Details
continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment