Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47146][CORE] Possible thread leak when doing sort merge join #45327

Closed
wants to merge 3 commits into from

Conversation

JacobZheng0927
Copy link
Contributor

@JacobZheng0927 JacobZheng0927 commented Feb 29, 2024

What changes were proposed in this pull request?

Add TaskCompletionListener to close inputStream to avoid thread leakage caused by unclosed ReadAheadInputStream.

Why are the changes needed?

SPARK-40849 modified the implementation of newDaemonSingleThreadExecutor to use newFixedThreadPool instead of newSingleThreadExecutor .The difference is that newSingleThreadExecutor uses the FinalizableDelegatedExecutorService, which provides a finalize method that automatically closes the thread pool. In some cases, sort merge join execution uses ReadAheadSteam and does not close it, so this change caused a thread leak. Since Finalization is deprecated and subject to removal in a future release, we should close the associated streams instead of relying on the finalize method.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test

Was this patch authored or co-authored using generative AI tooling?

No

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.ThreadUtils

import java.util.concurrent.{Executors, ThreadPoolExecutor}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala style error.

= ThreadUtils.newDaemonSingleThreadExecutor("async-log-purge")
private val asyncPurgeExecutorService: ThreadPoolExecutor = {
val threadFactory =
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("async-log-purge").build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding an additional function to ThreadUtils?

@mridulm
Copy link
Contributor

mridulm commented Feb 29, 2024

Thanks for surfacing this issue !

To fix this, I would suggest to change ReadAheadInputStream to take TaskContext as a parameter - and add a task completion listener - which calls close.

That would fix the issue, more proactively release resources (instead of waiting for finalization to kick in) and also future proof this code (when support for finalize is removed from JDK)

@mridulm
Copy link
Contributor

mridulm commented Feb 29, 2024

Also, +CC @HeartSaVioR who reviewed the previous PR

@JacobZheng0927
Copy link
Contributor Author

Thanks for surfacing this issue !

To fix this, I would suggest to change ReadAheadInputStream to take TaskContext as a parameter - and add a task completion listener - which calls close.

That would fix the issue, more proactively release resources (instead of waiting for finalization to kick in) and also future proof this code (when support for finalize is removed from JDK)

To fix this, I would suggest to change ReadAheadInputStream to take TaskContext as a parameter - and add a task completion listener - which calls close.

Thanks, that's a very good point. I will try this modification.

close();
} catch (IOException e) {
logger.error("error while closing UnsafeSorterSpillReader", e);
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can log in info and drop the exception - for 'normal' flow, that is what ends up happening when close throws exception.

.find {
_.getName.startsWith("read-ahead")
}
assert(readAheadThread.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this test can end up becoming flakey - in case some other tests result in read-ahead thread hanging around (for example, if some previous tests task is still running).

+CC @dongjoon-hyun for thoughts on how to test this more robustly (I am wondering if this will end up in a similar flakeyness with the other memory manager PR recently, where tests were interacting).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of any flakiness about JoinSuite in these days, @mridulm .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to this PR @dongjoon-hyun - the RC for that was some other previous test task interfering the the current (due to delays in killing tasks).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm I guess this case should be fine. In the memory manager PR, the JobCancellationSuite specifically set some tasks to sleep for certain amount of time which causes them to not finish even if the test itself is finished. I don't see similar pattern in the JoinSuite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming, that was my assessment as well - but wanted to double check given we faced it very recently :-)

@dongjoon-hyun
Copy link
Member

Hi, @viirya . Did you see any thread leaks while you are working on Join (or shuffle)?

taskContext.addTaskCompletionListener(context -> {
try {
close();
} catch (IOException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, what happens if we fail to close here? Thread Leaks still happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there should be a leak, executionService.shutdownNow() will always be called.

@viirya
Copy link
Member

viirya commented Feb 29, 2024

Hi, @viirya . Did you see any thread leaks while you are working on Join (or shuffle)?

Hi, @dongjoon-hyun. No, I've not seen it. Maybe it is because our tests are not long running ones that have no chance to show any symptom of noticeable leak.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.
Any thoughts on this @dongjoon-hyun , @sunchao ?

@mridulm mridulm closed this in dfb35be Mar 5, 2024
@mridulm
Copy link
Contributor

mridulm commented Mar 5, 2024

Merged to master.
Thanks for fixing this @JacobZheng0927 !
Thanks for reviewing @zml1206, @dongjoon-hyun :-)

@mridulm
Copy link
Contributor

mridulm commented Mar 5, 2024

@JacobZheng0927, might be a good idea to backport this to 3.5 as well - will you be able to create a backport PR ?
(I ran into some issue locally when trying to merge to branch-3.5 and bailed)

JacobZheng0927 added a commit to JacobZheng0927/spark that referenced this pull request Mar 5, 2024
### What changes were proposed in this pull request?
Add TaskCompletionListener to close inputStream to avoid thread leakage caused by unclosed ReadAheadInputStream.

### Why are the changes needed?
SPARK-40849 modified the implementation of `newDaemonSingleThreadExecutor` to use `newFixedThreadPool` instead of `newSingleThreadExecutor` .The difference is that `newSingleThreadExecutor` uses the `FinalizableDelegatedExecutorService`, which provides a `finalize` method that automatically closes the thread pool. In some cases, sort merge join execution uses `ReadAheadSteam` and does not close it, so this change caused a thread leak. Since Finalization is deprecated and subject to removal in a future release, we should close the associated streams instead of relying on the finalize method.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45327 from JacobZheng0927/SPARK-47146.

Authored-by: JacobZheng0927 <zsh517559523@163.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
@JacobZheng0927
Copy link
Contributor Author

@JacobZheng0927, might be a good idea to backport this to 3.5 as well - will you be able to create a backport PR ? (I ran into some issue locally when trying to merge to branch-3.5 and bailed)

I created this pr #45390

@sunchao
Copy link
Member

sunchao commented Mar 5, 2024

Any thoughts #45327 (comment) @dongjoon-hyun , @sunchao ?

Sorry for the delay @mridulm . Posted my reply. This PR LGTM too.

mridulm pushed a commit that referenced this pull request Mar 6, 2024
This pr backport #45327 to branch-3.5

### What changes were proposed in this pull request? Add TaskCompletionListener to close inputStream to avoid thread leakage caused by unclosed ReadAheadInputStream.

### Why are the changes needed?
To fix the issue SPARK-47146

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45390 from JacobZheng0927/SPARK-47146-3.5.

Authored-by: JacobZheng0927 <zsh517559523@163.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
mridulm pushed a commit that referenced this pull request Mar 6, 2024
This pr backport #45327 to branch-3.5

### What changes were proposed in this pull request? Add TaskCompletionListener to close inputStream to avoid thread leakage caused by unclosed ReadAheadInputStream.

### Why are the changes needed?
To fix the issue SPARK-47146

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45390 from JacobZheng0927/SPARK-47146-3.5.

Authored-by: JacobZheng0927 <zsh517559523@163.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit e9f7d36)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
@@ -36,6 +38,7 @@
* of the file format).
*/
public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the logger's name correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was careless of me, thank you very much! I'll fix it right away.

jpcorreia99 pushed a commit to jpcorreia99/spark that referenced this pull request Mar 12, 2024
### What changes were proposed in this pull request?
Add TaskCompletionListener to close inputStream to avoid thread leakage caused by unclosed ReadAheadInputStream.

### Why are the changes needed?
SPARK-40849 modified the implementation of `newDaemonSingleThreadExecutor` to use `newFixedThreadPool` instead of `newSingleThreadExecutor` .The difference is that `newSingleThreadExecutor` uses the `FinalizableDelegatedExecutorService`, which provides a `finalize` method that automatically closes the thread pool. In some cases, sort merge join execution uses `ReadAheadSteam` and does not close it, so this change caused a thread leak. Since Finalization is deprecated and subject to removal in a future release, we should close the associated streams instead of relying on the finalize method.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45327 from JacobZheng0927/SPARK-47146.

Authored-by: JacobZheng0927 <zsh517559523@163.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Mar 26, 2024
This pr backport apache#45327 to branch-3.5

To fix the issue SPARK-47146

No

Unit test

No

Closes apache#45390 from JacobZheng0927/SPARK-47146-3.5.

Authored-by: JacobZheng0927 <zsh517559523@163.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit e9f7d36)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants