Skip to content

Commit

Permalink
[SPARK-24954][CORE] Fail fast on job submit if run a barrier stage wi…
Browse files Browse the repository at this point in the history
…th dynamic resource allocation enabled

## What changes were proposed in this pull request?

We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that we acquire some executors (but not enough to launch all the tasks in a barrier stage) and later release them due to executor idle time expire, and then acquire again).

We perform the check on job submit and fail fast if running a barrier stage with dynamic resource allocation enabled.

## How was this patch tested?

Added new test suite `BarrierStageOnSubmittedSuite` to cover all the fail fast cases that submitted a job containing one or more barrier stages.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21915 from jiangxb1987/SPARK-24954.
  • Loading branch information
jiangxb1987 authored and mengxr committed Aug 3, 2018
1 parent c32dbd6 commit 92b4884
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 11 deletions.
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Expand Up @@ -364,6 +364,7 @@ class DAGScheduler(
*/
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
Expand All @@ -384,6 +385,23 @@ class DAGScheduler(
stage
}

/**
* We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead
* to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that
* we acquire some executors (but not enough to launch all the tasks in a barrier stage) and
* later release them due to executor idle time expire, and then acquire again).
*
* We perform the check on job submit and fail fast if running a barrier stage with dynamic
* resource allocation enabled.
*
* TODO SPARK-24942 Improve cluster resource management with jobs containing barrier stage
*/
private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = {
if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) {
throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}
}

/**
* Create a ResultStage associated with the provided jobId.
*/
Expand All @@ -393,6 +411,7 @@ class DAGScheduler(
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
Expand Down Expand Up @@ -2001,4 +2020,10 @@ private[spark] object DAGScheduler {
"PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " +
"(scala) or barrierRdd.collect()[0] (python).\n" +
"2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))."

// Error message when running a barrier stage with dynamic resource allocation enabled.
val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " +
"now. You can disable dynamic resource allocation by setting Spark conf " +
"\"spark.dynamicAllocation.enabled\" to \"false\"."
}
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark
import scala.concurrent.duration._
import scala.language.postfixOps

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.scheduler.DAGScheduler
import org.apache.spark.util.ThreadUtils
Expand All @@ -30,16 +28,13 @@ import org.apache.spark.util.ThreadUtils
* This test suite covers all the cases that shall fail fast on job submitted that contains one
* of more barrier stages.
*/
class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
with LocalSparkContext {

override def beforeEach(): Unit = {
super.beforeEach()
class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext {

val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("test")
sc = new SparkContext(conf)
private def createSparkContext(conf: Option[SparkConf] = None): SparkContext = {
new SparkContext(conf.getOrElse(
new SparkConf()
.setMaster("local[4]")
.setAppName("test")))
}

private def testSubmitJob(
Expand All @@ -62,6 +57,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}

test("submit a barrier ResultStage that contains PartitionPruningRDD") {
sc = createSparkContext()
val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1)
val rdd = prunedRdd
.barrier()
Expand All @@ -71,6 +67,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}

test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") {
sc = createSparkContext()
val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1)
val rdd = prunedRdd
.barrier()
Expand All @@ -82,6 +79,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}

test("submit a barrier stage that doesn't contain PartitionPruningRDD") {
sc = createSparkContext()
val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1)
val rdd = prunedRdd
.repartition(2)
Expand All @@ -93,6 +91,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}

test("submit a barrier stage with partial partitions") {
sc = createSparkContext()
val rdd = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
Expand All @@ -101,6 +100,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}

test("submit a barrier stage with union()") {
sc = createSparkContext()
val rdd1 = sc.parallelize(1 to 10, 2)
.barrier()
.mapPartitions((iter, context) => iter)
Expand All @@ -114,6 +114,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}

test("submit a barrier stage with coalesce()") {
sc = createSparkContext()
val rdd = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
Expand All @@ -125,6 +126,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}

test("submit a barrier stage that contains an RDD that depends on multiple barrier RDDs") {
sc = createSparkContext()
val rdd1 = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
Expand All @@ -139,6 +141,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}

test("submit a barrier stage with zip()") {
sc = createSparkContext()
val rdd1 = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
Expand All @@ -150,4 +153,36 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
val result = rdd3.collect().sorted
assert(result === Seq(12, 14, 16, 18, 20, 22, 24, 26, 28, 30))
}

test("submit a barrier ResultStage with dynamic resource allocation enabled") {
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))

val rdd = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}

test("submit a barrier ShuffleMapStage with dynamic resource allocation enabled") {
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))

val rdd = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
.repartition(2)
.map(x => x + 1)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}
}

0 comments on commit 92b4884

Please sign in to comment.