-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19514] Making range interruptible. #16872
Conversation
test("Cancelling stage in a query with Range.") { | ||
val listener = new SparkListener { | ||
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { | ||
Thread.sleep(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do something a little less coarse (it probably also blocks other events from being emitted), and with less potential for flakyness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only other thing in the SparkListener interface that seems to fit here seems to be onExecutorMetricsUpdate()
. However, the metrics are send with the heartbeats, and the heartbeats are send around 10s apart. Hence, using this method makes the tests much slower (4s vs 20s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @zsxwing do you have a suggestion to do this nicely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this listener? Blocking onTaskStart
only slows down the listener bus thread. It should not block other events from being emitted (It may drop events if the queue is full though). If it does happen, then it's a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that we are trying to kill a query once it actually started executing. If we do not call sleep(...)
the query gets killed before it starts executing. The other option is to wait for onExecutorMetricsUpdate(..)
but this is tied to the heartbeat (10s), and makes the test much slower.
Should we move to the other approach, or do you have any other suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that I didn't see sparkContext.cancelStage(taskStart.stageId)
. How about this:
class DataFrameRangeSuite {
...
test("Cancelling stage in a query with Range.") {
DataFrameRangeSuite.isTaskStarted = false
val listener = new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
eventually(timeout(20.seconds)) {
assert(DataFrameRangeSuite.isTaskStarted)
}
sparkContext.cancelStage(taskStart.stageId)
}
}
sparkContext.addSparkListener(listener)
for (codegen <- Seq(true, false)) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
import testImplicits._
val ex = intercept[SparkException] {
spark.range(100000L).map { i =>
DataFrameRangeSuite.isTaskStarted = true
i
}.crossJoin(spark.range(100000L))
.toDF("a", "b").agg(sum("a"), sum("b")).collect()
}
ex.getCause() match {
case null =>
assert(ex.getMessage().contains("cancelled"))
case cause: SparkException =>
assert(cause.getMessage().contains("cancelled"))
case cause: Throwable =>
fail("Expected the casue to be SparkException, got " + cause.toString() + " instead.")
}
}
eventually(timeout(20.seconds)) {
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
}
}
}
}
object DataFrameRangeSuite {
@volatile var isTaskStarted = false
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Ryan. Worked like a charm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually there is an issue with the test case -- how do we know that we are cancelling the original tasks, and not the ones in shuffle? We need to add another flag to block the range operator from proceeding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so how about this:
test("Cancelling stage in a query with Range.") {
val listener = new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
eventually(timeout(10.seconds)) {
assert(DataFrameRangeSuite.stageToKill > 0)
}
sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
}
}
sparkContext.addSparkListener(listener)
for (codegen <- Seq(true, false)) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
DataFrameRangeSuite.stageToKill = -1
val ex = intercept[SparkException] {
spark.range(1000000000L).map { x =>
DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
x
}.toDF("id").agg(sum("id")).collect()
}
ex.getCause() match {
case null =>
assert(ex.getMessage().contains("cancelled"))
case cause: SparkException =>
assert(cause.getMessage().contains("cancelled"))
case cause: Throwable =>
fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
}
}
eventually(timeout(20.seconds)) {
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
}
}
}
Now we should be 100% sure what exactly is getting killed. I just checked that it fails without the changes from this PR.
Do I need a new issue in Apache Jira for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just create another PR under the same ticket.
@@ -443,6 +443,10 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | |||
| if (shouldStop()) return; | |||
| } | |||
| | |||
| if (TaskContext.get().isInterrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also move TaskContext.get() out of the loop. It should not change during evaluation.
spark.range(100000L).crossJoin(spark.range(100000L)) | ||
.toDF("a", "b").agg(sum("a"), sum("b")).collect() | ||
} | ||
val msg = if (ex.getCause() != null) ex.getCause().getMessage() else ex.getMessage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which exception do we actually expect here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either this:
org.apache.spark.SparkException: Job 43 cancelled because Stage 85 was cancelled
or the same wrapped by this:
throw new SparkException("Exception thrown in awaitResult: ", t) |
Test build #72643 has finished for PR 16872 at commit
|
LGTM |
Test build #72647 has finished for PR 16872 at commit
|
I'm going to merge this in master. If we find a way to optimize the test we can do a follow-up pr. |
## What changes were proposed in this pull request? Previously range operator could not be interrupted. For example, using DAGScheduler.cancelStage(...) on a query with range might have been ineffective. This change adds periodic checks of TaskContext.isInterrupted to codegen version, and InterruptibleOperator to non-codegen version. I benchmarked the performance of codegen version on a sample query `spark.range(1000L * 1000 * 1000 * 10).count()` and there is no measurable difference. ## How was this patch tested? Adds a unit test. Author: Ala Luszczak <ala@databricks.com> Closes apache#16872 from ala/SPARK-19514b.
Previously range operator could not be interrupted. For example, using DAGScheduler.cancelStage(...) on a query with range might have been ineffective. This change adds periodic checks of TaskContext.isInterrupted to codegen version, and InterruptibleOperator to non-codegen version. I benchmarked the performance of codegen version on a sample query `spark.range(1000L * 1000 * 1000 * 10).count()` and there is no measurable difference. Adds a unit test. Author: Ala Luszczak <ala@databricks.com> Closes apache#16872 from ala/SPARK-19514b. (cherry picked from commit 4064574) Signed-off-by: Reynold Xin <rxin@databricks.com>
What changes were proposed in this pull request?
Previously range operator could not be interrupted. For example, using DAGScheduler.cancelStage(...) on a query with range might have been ineffective.
This change adds periodic checks of TaskContext.isInterrupted to codegen version, and InterruptibleOperator to non-codegen version.
I benchmarked the performance of codegen version on a sample query
spark.range(1000L * 1000 * 1000 * 10).count()
and there is no measurable difference.How was this patch tested?
Adds a unit test.