Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] XGBoost Running spark local on windows idea,Rabit returns with exit code 3 #3418

Closed
KimiRaikking opened this issue Jun 28, 2018 · 1 comment

Comments

@KimiRaikking
Copy link

Environment info

Operating System:
Windows 7
Compiler:
scala 2.11.8, Spark 2.11
Package used (python/R/jvm/C++):
jvm packages
xgboost version used:
release-0.72

Steps to reproduce

1.import jvm_packages to idea project
2.add testing code for running on spark local

class XGBoostUtils {

  def testBinaryClass(
              numWorker: Int,
              trackerType: String,
              trackerTimeout: Long,
              objectiveType: String ): Unit = {

    val spark = SparkSession.builder.master("local[8]").appName("example").getOrCreate()
    // val spark = SparkSession.builder.appName("example").getOrCreate()

    spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val trainString = "agaricus.txt.train"
    val testString = "agaricus.txt.test"

    val train = spark.read.format("libsvm").load(trainString).toDF("label", "feature")

    val test = spark.read.format("libsvm").load(testString).toDF("label", "feature")

    val numRound = 2

    val paramMap = List(
      "eta" -> 1f,
      "max_depth" -> 5, // tree depth,range:[1,∞]
      "silent" -> 0, // print runtime log
      "objective" -> objectiveType, // define objective type,like binary:logistic
      "lambda" -> 2.5,
      "timeout_request_workers" -> 10000L,
      "nthread" -> 1,
      "tracker_conf" -> new TrackerConf ( trackerTimeout, trackerType)
    ).toMap
    println(paramMap)

    val model = XGBoost.trainWithDataFrame(
      train,
      paramMap,
      numRound,
      numWorker,
      obj = null,
      eval = null,
      useExternalMemory = false,
      Float.NaN,
      "feature",
      "label" )
    val predict = model.transform(test)
    predict.take(5).map(println(_))
    predict.printSchema()

    val scoreAndLabels = predict.select(model.getPredictionCol, model.getLabelCol)
      .rdd
      .map { case Row(score: Double, label: Double) => (score, label) }

    val probabilities = predict.select("probabilities")
    probabilities.take(10).map(println(_))

    // get the auc
    val metric = new BinaryClassificationMetrics(scoreAndLabels)
    val auc = metric.areaUnderROC()
    println("auc:" + auc)

  }

3.running code,

public static void main(String[] args) {
        System.out.println("Start test");
        XGBoostUtils utils = new XGBoostUtils();     
        utils.testBinaryClass(1,"scala",60*1000*1000L,"binary:logistic");
        System.out.println("End test");
    }
  1. exception happened
    log like this
18/06/28 15:09:51|INFO|XGBoostSpark:Rabit returns with exit code 3
Exception in thread "main" ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.ml$dmlc$xgboost4j$scala$spark$XGBoost$$postTrackerReturnProcessing(XGBoost.scala:408)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:358)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:339)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:338)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:139)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:36)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:96)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainWithDataFrame(XGBoost.scala:195)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostUtils.testBinaryClass(XGBoostUtils.scala:115)
	at ml.dmlc.spark.SparkMain.main(SparkMain.java:14)

What have you tried?

  1. I tried print real exception,so i add printStackTrace in function waitFor of RabitTracker.scala
private def waitFor(atMost: Duration): Int = {
    // request the completion Future from the tracker actor
    Try(Await.result(handler ? RabitTrackerHandler.RequestCompletionFuture, askTimeout.duration)
      .asInstanceOf[Future[Int]]) match {
      case Success(futureCompleted) =>
        // wait for all workers to complete synchronously.
        val statusCode = Try(Await.result(futureCompleted, atMost)) match {
          case Success(n) if n == numWorkers =>
            IRabitTracker.TrackerStatus.SUCCESS.getStatusCode
          case Success(n) if n < numWorkers =>
            IRabitTracker.TrackerStatus.TIMEOUT.getStatusCode
          case Failure(e) =>
            IRabitTracker.TrackerStatus.FAILURE.getStatusCode
        }
        system.shutdown()

        statusCode
      case Failure(ex: Throwable) =>
        ex.printStackTrace()
        if (!system.isTerminated) {
          system.shutdown()
        }
        IRabitTracker.TrackerStatus.FAILURE.getStatusCode
    }
  }
  1. See this stackTrace
akka.pattern.AskTimeoutException: Recipient[Actor[akka://RabitTracker/user/Handler#-1269920980]] had already been terminated.
	at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
	at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144)
	at ml.dmlc.xgboost4j.scala.rabit.RabitTracker$$anonfun$5.apply(RabitTracker.scala:158)
	at ml.dmlc.xgboost4j.scala.rabit.RabitTracker$$anonfun$5.apply(RabitTracker.scala:159)
	at scala.util.Try$.apply(Try.scala:192)
	at ml.dmlc.xgboost4j.scala.rabit.RabitTracker.waitFor(RabitTracker.scala:158)
	at ml.dmlc.xgboost4j.scala.rabit.RabitTracker.waitFor(RabitTracker.scala:192)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anonfun$2.apply$mcI$sp(XGBoost.scala:356)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anonfun$2.apply(XGBoost.scala:356)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anonfun$2.apply(XGBoost.scala:356)
	at org.apache.spark.SparkParallelismTracker.safeExecute(SparkParallelismTracker.scala:82)
	at org.apache.spark.SparkParallelismTracker.execute(SparkParallelismTracker.scala:108)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:356)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:339)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:338)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:139)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:36)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:96)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainWithDataFrame(XGBoost.scala:195)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostUtils.testBinaryClass(XGBoostUtils.scala:115)
	at ml.dmlc.spark.SparkMain.main(SparkMain.java:14)
  1. So , look at this exception, i tried to avoid this happen, i delete one line in function handleRabitWorkerMessage of RabitTrackerHandler.scala, that is do not stop self actor ref when tracker handler receive worker shud down, and it works.
    And if my tracker conf set to "python", it works too.

My doubt is why this problem happens and why it seems ok in linux env?

case WorkerShutdown(rank, _, _) =>
      assert(rank >= 0, "Invalid rank.")
      assert(!shutdownWorkers.contains(rank))
      shutdownWorkers.add(rank)

      log.info(s"Received shutdown signal from $rank")

      if (shutdownWorkers.size == numWorkers) {
        promisedShutdownWorkers.success(shutdownWorkers.size)
        println(s"Do not stop self handler ${self}")
        // context.stop(self)
      }
@tqchen tqchen closed this as completed Jul 4, 2018
@jfgosselin
Copy link

@KimiRaikking , @tqchen, @xydrolase is this a bug ? I ran into exact same issue. @KimiRaikking suggestion to remove context.stop(self) solves the problem

  • Centos 7
  • Scala 2.11.12
  • Apache Spark 2.2.2
  • xgboost4j 0.72
  • xgboost4j-spark 0.72

@lock lock bot locked as resolved and limited conversation to collaborators Dec 23, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants