-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-20663][runtime] Instantly release unsafe memory on freeing segment. #14904
[FLINK-20663][runtime] Instantly release unsafe memory on freeing segment. #14904
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit e4771ac (Fri Feb 19 07:33:15 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
b194119
to
ce6b3c3
Compare
cc @tillrohrmann One thing I'm not entirely sure is that, do we still need a configuration option to fallback to the original behaviors, given that we have already forbidden wrapping buffers from unsafe segments? This branch contains an additional commit compared to this PR, which introduces such a configuration option by touching quite some files. (FYI, this branch also passed the CI tests.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @xintongsong. Assuming that nobody calls wrap
on an unsafe memory segment, I think the solution should work. I think the solution with processAsByteBuffer
is actually quite nice :-)
I had a couple of comments concerning the remaining wrap
calls and the costs of the processAsByteBuffer
method. Please take a look.
One additional thought is whether we couldn't solve the problem a bit differently by changing how UnsafeMemoryBudget
works. If we had two counters expectedFreeMemory
and actuallyAvailableMemory
where expectedFreeMemory
is the memory which should be available but has not necessarily been freed yet, we could wait indefinitely in the UnsafeMemoryBudget.reserveMemory
method until the memory has been returned.
Whenever we call MemorySegment.free
we increase the UnsafeMemoryBudget.expectedFreeMemory
. Whenever the cleaner of the unsafe memory runs, then we increase actuallyAvailableMemory
. That way we should always know when to expect more memory to be freed by the GC procedure and, thus, can decide when to wait and when not to wait for the memory.
I am not saying that we have to do it like this but I wanted to make sure that we had this solution on the radar.
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
Show resolved
Hide resolved
@Override | ||
public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) { | ||
return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which calls to wrap
cannot be replaced with this call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know what the performance penalty for the additional Function
is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which calls to wrap cannot be replaced with this call?
Technically, I don't think there are wrap
calls that CANNOT be replaced. Some of the calls only wrap part of the segment, which can be solved by simply adding offset
and length
as arguments. I have not replaced all calls to wrap
with this processAsByteBuffer
, mostly because the remaining calls are on direct segment, thus it's not necessary to migrate them.
Do we know what the performance penalty for the additional Function is?
We don't have a measurement. However, given that this method is for the per-segment operation (by default 32KB), I don't expect a significant performance penalty from using the Function
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could think about replacing these calls with processAsByteBuffer
also for the direct memory in order to be able to get rid of the wrap
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into them together with the follow-up refactor effort. Since not all wrap
calls are on the entire segment, I'll need to check the performance impact from using Function
on those calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is also ok to provide additional methods on the MemorySegment
which use internally a ByteBuffer
implementation to be efficient.
if (cleaner != null) { | ||
cleaner.run(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the special casing of this class, why don't we introduce an DirectMemorySegment
and a UnsafeMemorySegment
? Also, why do we have the HeapMemorySegment
and still allow the HybridMemorySegment
to be used with heap memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we had a DirectMemorySegment
, then we could only allow wrap
on this type, for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HybridMemorySegment
is currently the only MemorySegment
implementation that is used in production. HeapMemorySegment
is no longer used, but seems intentionally not removed as it is annotated with @SuppressWarnings("unused")
.
From my side, it makes sense to split HybridMemorySegment
into HeapMemorySegment
, DirectMemorySegment
and UnsafeMemorySegment
. This should get rid of lots of if-else
branches in the implementation.
I'm leaning towards addressing this refactor as a follow-up effort, and only do that for the master branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this sounds reasonable. Then let's do this refactoring soon. In fact, HeapMemorySegment
does not seem to be that unused anymore. Somehow the table API started using this class for some RawFormatDeserializationSchema
. This might be a good indicator for pulling things straight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this refactoring we could also think about getting rid of the whole native memory cleaner logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll drive the refactor effort and try to get it done in the 1.13 release cycle.
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
Show resolved
Hide resolved
@tillrohrmann Thanks for the review. Concerning indefinitely waiting when there's enough I think this approach indeed solves the failures due to GC not releasing memory timely. However, it solves the problem by waiting longer, which may not have as good performance as the current solution that tries to release earlier. In case there's no enough heap activities to trigger GC in advance, we may frequently experience full GC on finishing existing tasks and deploying new tasks, which is costly for batch scenarios where deploying new tasks are frequent and TMs are usually large. |
ce6b3c3
to
d1936a8
Compare
Comments addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the further explanation. I am ok with the proposed changes. Maybe we could add one additional test which ensures that the cleaner
is called when calling free
.
I would also be in favour of doing the refactoring for the HybridMemorySegment
soonish because it feels that this class is used a bit differently now compared to when it was written.
flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
Show resolved
Hide resolved
This method is only used in tests.
…orySegment#processAsByteBuffer.
d1936a8
to
e4771ac
Compare
Thanks for the review, @tillrohrmann. |
Passed CI here:
Merging this. |
thanks! |
Brief change log