Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29177] [Core] fix zombie tasks after stage abort #25850

Closed
wants to merge 5 commits into from

Conversation

adrian-wang
Copy link
Contributor

@adrian-wang adrian-wang commented Sep 19, 2019

What changes were proposed in this pull request?

Do task handling even the task exceeds maxResultSize configured. More details are in the jira description https://issues.apache.org/jira/browse/SPARK-29177 .

Why are the changes needed?

Without this patch, the zombie tasks will prevent yarn from recycle those containers running these tasks, which will affect other applications.

Does this PR introduce any user-facing change?

No

How was this patch tested?

unit test and production test with a very large SELECT in spark thriftserver.

@adrian-wang
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 19, 2019

Test build #110991 has finished for PR 25850 at commit 82740d0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 19, 2019

Test build #110993 has finished for PR 25850 at commit 82740d0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 19, 2019

Test build #110999 has finished for PR 25850 at commit d1e744e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111040 has finished for PR 25850 at commit d1e744e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111042 has finished for PR 25850 at commit b9dc92b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@adrian-wang
Copy link
Contributor Author

@xuanyuanking Could you please help review this?

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pinging me, I think it make sense to handle a success task as killed task for resource cleaning. We did the same thing in TaskSetManager.handleSuccessfulTask for speculative tasks.

@@ -64,6 +64,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about directly call taskSetManager.handleFailedTask here?
If canFetchMoreResults return false, taskSetManger.isZombie has set to true. scheduler.handlerFailedTask equally same with taskSetManager.handleFailedTask, and this will make UT easy to write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling scheduler.handleFailedTask is to be consistent with other cases in this function.

@SparkQA
Copy link

SparkQA commented Sep 21, 2019

Test build #111119 has finished for PR 25850 at commit fe3c674.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one nit.
LGTM, cc @jiangxb1987 @cloud-fan

@@ -64,6 +64,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to leave a comment here to explain why we handle the oversize task as a killed task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, thanks.

@SparkQA
Copy link

SparkQA commented Sep 23, 2019

Test build #111204 has finished for PR 25850 at commit aa41348.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan closed this in c08bc37 Sep 23, 2019
cloud-fan pushed a commit that referenced this pull request Sep 23, 2019
### What changes were proposed in this pull request?
Do task handling even the task exceeds maxResultSize configured. More details are in the jira description https://issues.apache.org/jira/browse/SPARK-29177 .

### Why are the changes needed?
Without this patch, the zombie tasks will prevent yarn from recycle those containers running these tasks, which will affect other applications.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
unit test and production test with a very large `SELECT` in spark thriftserver.

Closes #25850 from adrian-wang/zombie.

Authored-by: Daoyuan Wang <me@daoyuan.wang>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c08bc37)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.4!

@srowen
Copy link
Member

srowen commented Sep 23, 2019

@cloud-fan @adrian-wang oops, looks like this doesn't compile in 2.4:

[error] /home/jenkins/workspace/spark-branch-2.4-test-sbt-hadoop-2.7/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala:155: overloaded method constructor DirectTaskResult with alternatives:
[error]   ()org.apache.spark.scheduler.DirectTaskResult[T] <and>
[error]   (valueBytes: java.nio.ByteBuffer,accumUpdates: Seq[org.apache.spark.util.AccumulatorV2[_, _]])org.apache.spark.scheduler.DirectTaskResult[T]
[error]  cannot be applied to (java.nio.ByteBuffer, scala.collection.immutable.Nil.type, Array[Nothing])
[error]     val directTaskResult = new DirectTaskResult(ByteBuffer.allocate(0), Nil, Array())
[error] 

Want to revert it or just hot-fix forward? it may be pretty easy.

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 23, 2019

@srowen thanks for catching! I've pushed a commit to fix it.

@adrian-wang adrian-wang deleted the zombie branch October 17, 2019 02:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants