Skip to content

Commit

Permalink
upate
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangxb1987 committed Aug 3, 2018
1 parent 33768fd commit f3ea9c6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,7 @@ class DAGScheduler(
*/
private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = {
if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) {
throw new SparkException("Don't support run a barrier stage with dynamic resource " +
"allocation enabled for now, please disable dynamic resource allocation by setting " +
"\"spark.dynamicAllocation.enabled\" to \"false\".")
throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}
}

Expand Down Expand Up @@ -2022,4 +2020,10 @@ private[spark] object DAGScheduler {
"PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " +
"(scala) or barrierRdd.collect()[0] (python).\n" +
"2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))."

// Error message when running a barrier stage with dynamic resource allocation enabled.
val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " +
"now. You can disable dynamic resource allocation by setting Spark conf " +
"\"spark.dynamicAllocation.enabled\" to \"false\"."
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
.barrier()
.mapPartitions((iter, context) => iter)
testSubmitJob(sc, rdd,
"Don't support run a barrier stage with dynamic resource allocation enabled")
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}

test("submit a barrier ShuffleMapStage with dynamic resource allocation enabled") {
Expand All @@ -165,6 +165,6 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
.repartition(2)
.map(x => x + 1)
testSubmitJob(sc, rdd,
"Don't support run a barrier stage with dynamic resource allocation enabled")
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}
}

0 comments on commit f3ea9c6

Please sign in to comment.