New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27568][CORE] Fix readLock leak while calling take()/first() on a cached rdd #24467
Conversation
Test build #104933 has finished for PR 24467 at commit
|
Jenkins, retest this please. |
Test build #104965 has finished for PR 24467 at commit
|
Is there any better way to do this rather than interrogate the class of the delegate? it's a little hacky |
@srowen Yeah, agree with that. I'm thinking of it. |
AFAIK this shall not lead to any job failure because the config "spark.storage.exceptionOnPinLeak" is normally turned off. However this is really a issue when people submit jobs from python side, and I submitted #24542 to catch the AssertionError. To me the fix proposed in this PR is acceptable, but I'm not sure whether we shall still fix this since now it shall not cause critical issues and the fix itself is kind of hacky. |
This reminds me of the memory leak issue in sort-merge-join. We use scala But what we really need is a traditional database iterator, with For this particular case, I think using a task completion listener is good enough? |
Block level read/write lock mechanism has a basic assumption that all block locks should be released when a task finished. And that's why we check the leaked locks after the task finished. As task completion listener also would be triggered after task finished, so I think using it may not take big difference. Actually, The process of checking leaked locks(by calling |
Test build #106053 has finished for PR 24467 at commit
|
What changes were proposed in this pull request?
Currently, if we run the code below in Spark:
we'll see the line below in log:
19/04/25 23:48:54 INFO Executor: 1 block locks were not released by TID = 0:
[rdd_0_0]
and, If we set "spark.storage.exceptionOnPinLeak"=true, job will fail.
Normally, we'll always release readLock for the block once we consumed all elements in a
CompletionIterator
.However, operation like take()/first() do not need to consume all, which lead to the release behaviour
can't be triggered.
This pr suggests to manually call
completion()
for theCompletionIterator
if the iterator still has nextelement after task finished, so that readLock could be released within
competion()
.How was this patch tested?
Added.