Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26713][CORE] Interrupt pipe IO threads in PipedRDD when task is finished #23638

Closed
wants to merge 3 commits into from

Conversation

advancedxy
Copy link
Contributor

What changes were proposed in this pull request?

Manually release stdin writer and stderr reader thread when task is finished. This commit also marks
ShuffleBlockFetchIterator as fully consumed if isZombie is set.

How was this patch tested?

Added new test

thread when task is finished. This commit also marks
ShuffleBlockFetchIterator as fully consumed if isZombie is set
@advancedxy
Copy link
Contributor Author

cc @xuanyuanking @cloud-fan @gatorsmile
and also @srowen as you reviewed related PipedRDD prs.

@advancedxy advancedxy changed the title [SPARK-26713][CORE] Release pipe IO threads in PipedRDD when task is finished [SPARK-26713][CORE] Interrupt pipe IO threads in PipedRDD when task is finished Jan 24, 2019
@advancedxy
Copy link
Contributor Author

@srowen addressed your comments. More comments?

currentResult = result.asInstanceOf[SuccessFetchResult]
(currentResult.blockId, new BufferReleasingInputStream(input, this))
} else { // the iterator has already be closed
throw new NoSuchElementException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit: new NoSuchElementException() You could also ...

if (result == null) {
  throw ..
}

but doesn't really matter, maybe just slightly cleaner to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

L387 in this file is also throw new NoSuchElementException. Shall I modify that too?

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala Outdated Show resolved Hide resolved
// val abnormalRDD = pipeRDD.mapPartitions(_ => Iterator.empty)
// the iterator generated by PipedRDD is never involved. If the parent RDD's iterator is time
// consuming to generate(ShuffledRDD's shuffle operation for example), the outlived stdin writer
// thread will consume significant memory and cpu time. Also, there's race condition for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the second part of the comment, beginning at "Also,", belong below in the change to ShuffleBlockFetcherIterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky one. After the fix in this pr to ShuffleBlockFetcherIterator, the race condition should be really rare as only one potential next call may hang. However I am unable to find a good place to put the above comment in ShuffleBlockFetcherIterator. So it's inside this task completion listener.

Do you have any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put the comment next to the change in ShuffleBlockFetcherIterator?

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala Outdated Show resolved Hide resolved
@@ -372,7 +372,7 @@ final class ShuffleBlockFetcherIterator(
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
}

override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch
override def hasNext: Boolean = !isZombie && (numBlocksProcessed < numBlocksToFetch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ShuffleBlockFetcherIterator's change related to PipedRDD?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of. The root cause of OOM I described in Spark-26713 is ShuffledRDD + PipedRDD.

I found PipedRDD's stderr writer hangs at ShuffleBlockFetcherIterator.next() and leaks memory. I believe this change of ShuffleBlockFetcherIterator's could reduce the possibility of race condition and It seems right to mark iterator as fully consumed if is already cleaned up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the change at PipedRDD by this PR, won't the stdin writer thread (you wrote stderr writer but I think it is typo) be interrupted? If so, it stops consuming ShuffleBlockFetcherIterator anymore. Isn't it enough to solve that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(you wrote stderr writer but I think it is typo)

Sorry for the typo.

If so, it stops consuming ShuffleBlockFetcherIterator anymore. Isn't it enough to solve that?

Yes, for the ShuffledRDD + PipedRRDD case , the cleanup logic in PipedRDD is enough to solve the potential leak.
However I am thinking that ShuffledRDD could be transformed with any operations, there may be other cases that ShuffledBlockFetcherIterator is cleaned up but still being consumed. So, making the ShuffledBlockFetcherIterator defensive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it sounds good for me. You can make the related comment general and move it to ShuffledBlockFetcherIterator.

@SparkQA
Copy link

SparkQA commented Jan 25, 2019

Test build #4529 has finished for PR 23638 at commit ef7643e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

Gently ping @srowen, do you have any concerns or suggestions?

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy I wouldn't ping so often. We have tens of PRs to review each day in spare time.

// val abnormalRDD = pipeRDD.mapPartitions(_ => Iterator.empty)
// the iterator generated by PipedRDD is never involved. If the parent RDD's iterator is time
// consuming to generate(ShuffledRDD's shuffle operation for example), the outlived stdin writer
// thread will consume significant memory and cpu time. Also, there's race condition for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put the comment next to the change in ShuffleBlockFetcherIterator?

// consuming to generate(ShuffledRDD's shuffle operation for example), the outlived stdin writer
// thread will consume significant memory and cpu time. Also, there's race condition for
// ShuffledRDD + PipedRDD if the subprocess command is failed. The task will be marked as failed
// , ShuffleBlockFetcherIterator will be cleaned up at task completion, which may hang
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this comma seems misplaced

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just comments on comments now

* When the iterator is inactive, [[hasNext]] and [[next()]] calls will honor that as there are
* cases the iterator is still being consumed. For example, there was a race condition for
* ShuffledRDD + PipedRDD if the subprocess command is failed. The task will be marked as failed,
* then the iterator will be cleaned up at task completion, the [[next()]] call(called in the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space in call(called
I also don't know that [[ will do anything here as scaladoc won't be generated for this anyway
"there was a race condition" -> don't know if it's useful, just explain what the code is doing now, rather than the previous issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't know that [[ will do anything here as scaladoc won't be generated for this anyway

I noticed [[ is used here and there in this file or other source code files. I think it's clear to others(who are reading the code) that this points to the actual method/variable. Also, there's bonus that we just jump to code in IntelliJ IDEA as it's treated as code block.

Will fix other issues.

stdinWriterThread.start()

// interrupts stdin writer and stderr reader threads when the corresponding task is finished as
// a safe belt. Otherwise, these threads could outlive the task's lifetime. For example:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safe belt -> remove this? not sure what it's getting at.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

// a safe belt. Otherwise, these threads could outlive the task's lifetime. For example:
// val pipeRDD = sc.range(1, 100).pipe(Seq("cat"))
// val abnormalRDD = pipeRDD.mapPartitions(_ => Iterator.empty)
// the iterator generated by PipedRDD is never involved. If the parent RDD's iterator is time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is time consuming -> takes a long time?
outlived stdin writer -> stdin writer?
cpu -> CPU

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Will fix.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 27, 2019

Test build #101726 has finished for PR 23638 at commit be18ba7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 27, 2019

Test build #4533 has finished for PR 23638 at commit be18ba7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Jan 28, 2019

Merged to master

@srowen srowen closed this in 1280bfd Jan 28, 2019
@@ -395,7 +402,7 @@ final class ShuffleBlockFetcherIterator(
// then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch
// is also corrupt, so the previous stage could be retried.
// For local shuffle block, throw FailureFetchResult for the first IOException.
while (result == null) {
while (!isZombie && result == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that hasNext returns true and next throws NoSuchElementException? isZombie may get changed by other threads?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that can happen. Right now I think it's 'worse' in that the iterator might be cleaned up and yet next() will keep querying the iterator that's being drained by cleanup().

To really tighten it up I think more or all of next() and cleanup() would have to be synchronized (?) and I'm not sure what the implications are of that.

We could follow this up with small things like making hasNext() synchronized at least, as isZombie is marked GuardedBy("this"). That still doesn't prevent this from happening but is a little tighter.

@advancedxy what do you think? I think the argument is merely that this fixes the potential issue in 99% of cases, not 100%.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that hasNext returns true and next throws NoSuchElementException? isZombie may get changed by other threads?

@cloud-fan Yeah, it can happen. But I agree with @srowen. The isZombie flag indicates the whole task is finished, there's no point for the consumer of the iterator to be still active. This changes the semantics of Iterator at rare chances, but I think it is acceptable.

We could follow this up with small things like making hasNext() synchronized at least, as isZombie is marked GuardedBy("this"). That still doesn't prevent this from happening but is a little tighter.

Maybe. But I would leave it as it's if It's up to me. Like you said, this doesn't prevent the semantics changing but a little tighter.

* subprocess command is failed. The task will be marked as failed, then the iterator will be
* cleaned up at task completion, the [[next]] call (called in the stdin writer thread of
* PipedRDD if not exited yet) may hang at [[results.take]]. The defensive check in [[hasNext]]
* and [[next]] reduces the possibility of such race conditions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a task finishes, do we really need to guarantee all the iterators stop producing data? I agree it's better, but I'm afraid it's too much effort to guarantee it. Not only shuffle reader, we also need to fix the sort iterator, aggregate iterator and so on.

And why can't PipelinedRDD stop consuming input? I think it's better to fix the solo consumer side, instead of fixing different kinds of producer sides.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why can't PipelinedRDD stop consuming input? I think it's better to fix the solo consumer side, instead of fixing different kinds of producer sides.

The PipedRDD stops consuming input in this PR. As for the ShuffedRDD + PipedRDD solely, the fixes in PipedRDD is sufficient. But I noticed the iterator still producing data is also the cause, therefore I made the corresponding changes.

When a task finishes, do we really need to guarantee all the iterators stop producing data? I agree it's better, but I'm afraid it's too much effort to guarantee it. Not only shuffle reader, we also need to fix the sort iterator, aggregate iterator and so on.

I think we can try our best to guarantee that. If it's too much effort, we could stop trying or try different approaches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"try best" is not a "guarantee".

If we don't need to do this, I suggest we should not do it at all. The new changes in ShuffleBlockFetcherIterator make it harder for people to understand the code(at least to me), and also breaks the semantic of Iterator. And I don't see much benefit of doing it, as the PipedRDD has been fixed. Can we revert the changes in ShuffleBlockFetcherIterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert the changes in ShuffleBlockFetcherIterator?

If you insist, I can revert those changes. But let's wait and see if others have other opinions.
cc @srowen, @HyukjinKwon and @viirya.

also breaks the semantic of Iterator.

Rarely and shouldn't matter that much if the task is already finished.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is not the expected behavior of Iterator. If there are elements in Iterator, it should return true when hasNext is called. It sounds more reasonable at the consumer side to stop consuming Iterator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, how about a follow-up that tries a different approach? the current change isn't harmful per se, and a small improvement.

You're suggesting reading and storing the next element that's available, if not already read, in hasNext? and then next must call hasNext to ensure this is filled if not already, which it does already? yeah that seems reasonable. That pattern is used in other iterators sometimes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a follow-up that tries a different approach

Sure, if we can do it soon. Hopefully we don't leave this partial fix in the code base for years...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. We are on the same page now. And I think @cloud-fan's proposal seems reasonable. I may create a new JIRA and come up with a different fix. However, I am leaving for holidays (lunar new year) soon. Cannot guarantee it will be finished in a couple days, but I will try my best to resolve it before the ending of Feb. Others are welcome to take it over if too much delay.

P.S: I just looked through potential similar issues to PipedRDD. I believe RRunner may have the same issue as it doesn't clean up threads. On the other side, PythonRunner gracefully stops its threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy thanks for working on it! I'm leaving for lunar new year soon too, end of Feb sounds good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy I am facing the the same issue discussed by @cloud-fan that there are 2 threads consuming the results queue at the same time, and causing spark application to hang. Is the fix for this issue being worked on right now . Is there a JIRA to track the fix?

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…s finished

## What changes were proposed in this pull request?
Manually release stdin writer and stderr reader thread when task is finished. This commit also marks
ShuffleBlockFetchIterator as fully consumed if isZombie is set.

## How was this patch tested?
Added new test

Closes apache#23638 from advancedxy/SPARK-26713.

Authored-by: Xianjin YE <advancedxy@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
@cloud-fan
Copy link
Contributor

@advancedxy any updates?

@advancedxy
Copy link
Contributor Author

@cloud-fan sorry, I lost track of this. I will submit a new pr this weekend.

@advancedxy
Copy link
Contributor Author

However, I don't get hanging reports from our internal users after this patch. One of the reasons that I lost track of this issue.

@cloud-fan
Copy link
Contributor

I don't get hanging reports from our internal users after this patch

I think it's because the problem has been fixed in PipelinedRDD. According to the previous discussion, we should either revert the changes in ShuffleBlockFetcherIterator, or fix it in the right way for future safety.

@dongjoon-hyun
Copy link
Member

Hi, All.
Since branch-2.4 is our LTS for 2.x release, can we have this fix in branch-2.4?

@cloud-fan
Copy link
Contributor

I'm fine with it, let's also include the followup #25049

@advancedxy
Copy link
Contributor Author

I can submit a pr to branch-2.4.

@dongjoon-hyun
Copy link
Member

Thank you, @cloud-fan and @advancedxy !
Yes. Please submit a backporting PR, @advancedxy .

dongjoon-hyun pushed a commit that referenced this pull request Sep 18, 2019
…ask is finished

### What changes were proposed in this pull request?
Manually release stdin writer and stderr reader thread when task is finished. This is the backport of #23638 including #25049.

### Why are the changes needed?
This is a bug fix. PipedRDD's IO threads may hang even the corresponding task is already finished. Without this fix,  it would leak resource(memory specially).

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Add new test

Closes #25825 from advancedxy/SPARK-26713_for_2.4.

Authored-by: Xianjin YE <advancedxy@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Mar 12, 2021
…ask is finished

### What changes were proposed in this pull request?
Manually release stdin writer and stderr reader thread when task is finished. This is the backport of apache#23638 including apache#25049.

### Why are the changes needed?
This is a bug fix. PipedRDD's IO threads may hang even the corresponding task is already finished. Without this fix,  it would leak resource(memory specially).

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Add new test

Closes apache#25825 from advancedxy/SPARK-26713_for_2.4.

Authored-by: Xianjin YE <advancedxy@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Mar 15, 2021
…ask is finished

### What changes were proposed in this pull request?
Manually release stdin writer and stderr reader thread when task is finished. This is the backport of apache#23638 including apache#25049.

### Why are the changes needed?
This is a bug fix. PipedRDD's IO threads may hang even the corresponding task is already finished. Without this fix,  it would leak resource(memory specially).

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Add new test

Closes apache#25825 from advancedxy/SPARK-26713_for_2.4.

Authored-by: Xianjin YE <advancedxy@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants