Skip to content

Commit

Permalink
Revert "[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky"
Browse files Browse the repository at this point in the history
This reverts commit 1306411.
  • Loading branch information
Marcelo Vanzin committed Apr 20, 2018
1 parent 9b562d6 commit 8eb64a5
Showing 1 changed file with 33 additions and 45 deletions.
Expand Up @@ -17,16 +17,14 @@


package org.apache.spark.sql package org.apache.spark.sql


import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.math.abs import scala.math.abs
import scala.util.Random import scala.util.Random


import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually


import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -154,53 +152,39 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
} }


test("Cancelling stage in a query with Range.") { test("Cancelling stage in a query with Range.") {
// Save and restore the value because SparkContext is shared val listener = new SparkListener {
val savedInterruptOnCancel = sparkContext override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) eventually(timeout(10.seconds), interval(1.millis)) {

assert(DataFrameRangeSuite.stageToKill > 0)
try {
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")

for (codegen <- Seq(true, false)) {
// This countdown latch used to make sure with all the stages cancelStage called in listener
val latch = new CountDownLatch(2)

val listener = new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
sparkContext.cancelStage(taskStart.stageId)
latch.countDown()
}
} }
sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
}
}


sparkContext.addSparkListener(listener) sparkContext.addSparkListener(listener)
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { for (codegen <- Seq(true, false)) {
val ex = intercept[SparkException] { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
sparkContext.range(0, 10000L, numSlices = 10).mapPartitions { x => DataFrameRangeSuite.stageToKill = -1
x.synchronized { val ex = intercept[SparkException] {
x.wait() spark.range(0, 100000000000L, 1, 1).map { x =>
} DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
x x
}.toDF("id").agg(sum("id")).collect() }.toDF("id").agg(sum("id")).collect()
}
ex.getCause() match {
case null =>
assert(ex.getMessage().contains("cancelled"))
case cause: SparkException =>
assert(cause.getMessage().contains("cancelled"))
case cause: Throwable =>
fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
}
} }
latch.await(20, TimeUnit.SECONDS) ex.getCause() match {
eventually(timeout(20.seconds)) { case null =>
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) assert(ex.getMessage().contains("cancelled"))
case cause: SparkException =>
assert(cause.getMessage().contains("cancelled"))
case cause: Throwable =>
fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
} }
sparkContext.removeSparkListener(listener)
} }
} finally { eventually(timeout(20.seconds)) {
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
savedInterruptOnCancel) }
} }
sparkContext.removeSparkListener(listener)
} }


test("SPARK-20430 Initialize Range parameters in a driver side") { test("SPARK-20430 Initialize Range parameters in a driver side") {
Expand All @@ -220,3 +204,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
} }
} }
} }

object DataFrameRangeSuite {
@volatile var stageToKill = -1
}

0 comments on commit 8eb64a5

Please sign in to comment.