New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUGFIX] Fix resource leak when shuffle read #174
Conversation
@@ -201,13 +202,21 @@ public Configuration getHadoopConf() { | |||
RssShuffleDataIterator iterator = new RssShuffleDataIterator<K, C>( | |||
shuffleDependency.serializer(), shuffleReadClient, | |||
readMetrics); | |||
iterators.add(iterator); | |||
CompletionIterator<Product2<K, C>, RssShuffleDataIterator<K, C>> completionIterator = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use AQE, we could have many iterators, if we don't release the resource of them after we use them, we may occur OOM. I means that we use one iterator, and then release the iterator. We shouldn't release all the iterators at the end of task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.spark.util.CompletionIterator
will clean up as soon as its wrapped RssShuffleDataIterator.hasNext
returns false
:
// org.apache.spark.util.CompletionIterator
def hasNext: Boolean = {
val r = iter.hasNext // iter => RssShuffleDataIterator
if (!r && !completed) {
completed = true
// reassign to release resources of highly resource consuming iterators early
iter = Iterator.empty.asInstanceOf[I]
completion()
}
r
}
def completion(): Unit // completion() => RssShuffleDataIterator.cleanup
After this PR, if there is no special case for the Spark Task
, the timing of resource cleanup is still when the RssShuffleDataIterator
ends, not when the Spark Task
ends.
This is the same behavior as before the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.spark.util.CompletionIterator
will clean up as soon as its wrappedRssShuffleDataIterator.hasNext
returnsfalse
:// org.apache.spark.util.CompletionIterator def hasNext: Boolean = { val r = iter.hasNext // iter => RssShuffleDataIterator if (!r && !completed) { completed = true // reassign to release resources of highly resource consuming iterators early iter = Iterator.empty.asInstanceOf[I] completion() } r } def completion(): Unit // completion() => RssShuffleDataIterator.cleanup
After this PR, if there is no special case for the
Spark Task
, the timing of resource cleanup is still when theRssShuffleDataIterator
ends, not when theSpark Task
ends.This is the same behavior as before the PR.
OK, I got it. Good catch.
Codecov Report
@@ Coverage Diff @@
## master #174 +/- ##
============================================
- Coverage 58.29% 56.92% -1.38%
+ Complexity 1262 1183 -79
============================================
Files 158 149 -9
Lines 8397 7902 -495
Branches 779 749 -30
============================================
- Hits 4895 4498 -397
+ Misses 3251 3162 -89
+ Partials 251 242 -9 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @izchen
Thanks for your review, @jerqi ! |
What changes were proposed in this pull request?
Use
org.apache.spark.TaskContext#addTaskCompletionListener
to clean up resources used byRssShuffleDataIterator
. This avoids possible resource leaks.Why are the changes needed?
Before this PR,
RssShuffleDataIterator
would only clean up used resources after all records read.When the
Spark Task
fails or cancels, or runs some special logic such asLocalLimit
, the resource will not be cleaned up. This creates potential resource leaks.Does this PR introduce any user-facing change?
No
How was this patch tested?
Added a UT case