Skip to content

[SPARK-28769][CORE] Improve warning message of BarrierExecutionMode when required slots > maximum slots#25487

Closed
sarutak wants to merge 3 commits intoapache:masterfrom
sarutak:barrier-exec-mode-warning-message
Closed

[SPARK-28769][CORE] Improve warning message of BarrierExecutionMode when required slots > maximum slots#25487
sarutak wants to merge 3 commits intoapache:masterfrom
sarutak:barrier-exec-mode-warning-message

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Aug 18, 2019

What changes were proposed in this pull request?

Improved warning message in Barrier Execution Mode when required slots > maximum slots.
The new message contains information about required slots, maximum slots and how many times retry failed.

Why are the changes needed?

Providing to users with the number of required slots, maximum slots and how many times retry failed might help users to decide what they should do.
For example, continuing to wait for retry succeeded or killing jobs.

Does this PR introduce any user-facing change?

Yes.
If spark.scheduler.barrier.maxConcurrentTaskCheck.maxFailures=3, we get following warning message.

Before applying this change:

19/08/18 15:18:09 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:24 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:39 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:54 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
  at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:439)
  at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:453)
  at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:983)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2140)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2132)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2121)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:749)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2145)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:961)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:960)
  ... 47 elided

After applying this change:

19/08/18 16:52:23 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently.
19/08/18 16:52:38 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 1/3 failed).
19/08/18 16:52:53 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 2/3 failed).
19/08/18 16:53:08 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 3/3 failed).
org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
  at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:439)
  at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:453)
  at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:983)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2140)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2132)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2121)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:749)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2145)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:961)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:960)
  ... 47 elided

How was this patch tested?

I tested manually using Spark Shell with following configuration and script. And then, checked log message.

$ bin/spark-shell --master local[2] --conf spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures=3
scala> sc.parallelize(1 to 100, sc.defaultParallelism+1).barrier.mapPartitions(identity(_)).collect

@SparkQA
Copy link

SparkQA commented Aug 18, 2019

Test build #109298 has finished for PR 25487 at commit bc5042b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member Author

sarutak commented Aug 20, 2019

cc: @jiangxb1987

s" (Retry ${retryCount}/$maxFailureNumTasksCheck failed)"
}

logWarning(s"The job $jobId requires to run a barrier stage " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be rewritten a little, and doesn't need the special case above

Barrier stage in job $jobId  requires ${...} slots, but only ${...} are available. Failure ${numCheckFailures} / ${maxFailureNumTasksCheck}

Copy link
Member Author

@sarutak sarutak Aug 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first idea was like what you suggests but If we do so, we get following messages.

19/08/22 01:48:34 WARN DAGScheduler: Barrier stage in job 0  requires 3 slots, but only 2 are available. Failure 1 / 3
19/08/22 01:48:49 WARN DAGScheduler: Barrier stage in job 0  requires 3 slots, but only 2 are available. Failure 2 / 3
19/08/22 01:49:04 WARN DAGScheduler: Barrier stage in job 0  requires 3 slots, but only 2 are available. Failure 3 / 3
19/08/22 01:49:19 WARN DAGScheduler: Barrier stage in job 0  requires 3 slots, but only 2 are available. Failure 4 / 3

Failure 4 / 3 looks weird. Another solution would be using maxFailureNumTaskCheck + 1 as the number of maximum attempt.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rendering a message like "Will retry up to x more times?"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... it's like as follows?

19/08/22 02:54:37 WARN DAGScheduler: Barrier stage in job 0 requires 3 slots, but only 2 are available. Will retry up to 3 more times
19/08/22 02:54:52 WARN DAGScheduler: Barrier stage in job 0 requires 3 slots, but only 2 are available. Will retry up to 2 more times
19/08/22 02:55:07 WARN DAGScheduler: Barrier stage in job 0 requires 3 slots, but only 2 are available. Will retry up to 1 more times
19/08/22 02:55:22 WARN DAGScheduler: Barrier stage in job 0 requires 3 slots, but only 2 are available. Will retry up to 0 more times
org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh OK I accept that's a little odd too, but I think it's understandable enough and a little less complex to deal with.

@SparkQA
Copy link

SparkQA commented Aug 21, 2019

Test build #109507 has finished for PR 25487 at commit 1207e00.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
if (rdd.isBarrier() && rdd.getNumPartitions > sc.maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed
lazy val numPartitions = rdd.getNumPartitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, one last little thing: why are these local vars lazy? They're cheap to access. Even if not, just nest two if statements below to conditionally access them.

Copy link
Member Author

@sarutak sarutak Aug 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they, especially rdd.getNumPartitions, are cheap to access. It's just for removing unused variable and method call in case rdd.isBarrier() returns false.
I have no strong preference for whether making them lazy, nesting if statements or just removing lazy modifier.

It's just my curiosity but making those variables lazy brings another overhead or poor readability of code?

Anyway, I'll remove lazy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not sure what if anything it does for a local. It introduces another lambda/function and check when accessed though, so probably isn't even saving anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I understand.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Aug 22, 2019

Test build #109586 has finished for PR 25487 at commit 1967e9c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Aug 22, 2019

Merged to master

@srowen srowen closed this in 33e45ec Aug 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants