New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions #8654
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
ed6b5c0
to
86ffffe
Compare
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @zentol!
It looks general good to me and only left some small comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @zentol.
LGTM!
@zhijiangW could have another look at the latest commit? Addresses an issue raised in #8687 . #8687 (comment) |
public void testExternallyManagedBlockingPartitionConsumableMultipleTimes() throws IOException { | ||
final ResultPartition partition = new ResultPartitionBuilder() | ||
.setIsExternallyManaged(true) | ||
.setResultPartitionType(ResultPartitionType.BLOCKING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I guess we do not need set partition type and number of subpartitions, and the default would make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The partition being blocking is important for the test so I'd like to not omit it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I thought the property of isExternallyManaged
should control whether it could be consumed multiple times not related to ResultPartitionType
. But in reality the pipelined type could not support this atm because of the checkState(readView == null
while PipelinedSubpartition#createReadView
. So we can keep the current way for setting the blocking type.
Yes, I have reviewed the last commit and thought you might create separate ticket for ref-count before. :) |
ResultPartitionID partitionId = partition.getPartitionId(); | ||
LOG.debug("Released partition {} produced by {}.", | ||
partitionId.getPartitionId(), partitionId.getProducerId()); | ||
if (!partition.isManagedExternally() || isReleaseExternallyManagedPartitionsOnConsumption) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't sufficient; the partition is already removed from registeredPartitions
above, leading to wrong behavior of getUnreleasedPartitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I also noticed this just now, the state is inconsistent here if removed but not released.
int refCnt = pendingReferences.decrementAndGet(); | ||
|
||
if (refCnt == 0) { | ||
if (isManagedExternally) { | ||
partitionManager.onConsumedPartition(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if isManagedExternally
, we should not call partitionManager.onConsumedPartition(this)
after one sub partition is consumed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should still be safe to do so (because the RPM basically ignores the call), but yeah this shows that I haven't thought this through properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we call pin()
in createSubpartitionView
and decrement the refcount as usual in onConsumedPartition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pin
would increase the counter as the number of subpartitions, but createSubpartitionView
only indicates for one of the subpartitions. I need think through this issue again.
Maybe we could focus on the pin issue in a separate ticket and this PR only introduces the flag but not changed any current behaviors if you like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've found a way to do it; pin() will continue to increment the refCount by the number of subpartitions, but createSubpartitionsView will increment it by 1 (if the partition is externally managed).
I'm not sure whether this is a good approach, and what plans people had on how this should be implemented, so I wouldn't oppose deferring this to a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I do not like the current way because it seems a bit hacky.
Because during ResultPartition#onConsumedSubpartition
, the counter would never be 0 for external case, so the ResultPartitionManager#onConsumedPartition
would never be called, then the internal checking logic in ResultPartitionManager#onConsumedPartition
seems redunctant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could always pin
as before, but migrate the checking logic from ResultPartitionManger
to ResultPartition
as I mentioned before. I mean ResultPartitionManager
is not aware of the external issue, the tag in ResultPartitionManager#isReleaseExternallyManagedPartitionsOnConsumption
could be merged with ResultPartition#isManagedExternally
to generate a final tag in ResultPartition
.
Then while ResultPartition#onConsumedSubpartition
, it could check the final tag to decide whether to call ResultPartitionManager#onConsumedPartition
. And the reference counter only works when the final tag is false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In conclusion:
-
do not modify previous
ResultPartitionManger
, only add final tag inResultPartition
-
pin logic could keep same as before.
-
the ref counter check in
ResultPartition#createSubpartitionView
only works when tag false -
the ref counter check in
ResultPartition#onConsumedSubpartition
only works when tag false
partition.getPartitionId(), | ||
(resultPartitionID, resultPartition) -> { | ||
if (partition == resultPartition) { | ||
if (partition.isManagedExternally() && !isReleaseExternallyManagedPartitionsOnConsumption) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have another option for this. ATM both ResultPartition
and ResultPartitionManager
need check the conditions of releasing partitions consumed, then the logic seems difficult to trace.
We could make ResultPartitionManager
not aware of this. The isReleaseExternallyManagedPartitionsOnConsumption
could be merged with partition.isManagedExternally()
to generate one final boolean tag in ResultPartition
. Then the ResultPartition
checks this tag to decide whether to call partitionManager.onConsumedPartition
while one sub partition is consumed. The logic in ResultPartitionManager
would be simple:
-
Based on consumption release: The behavior is the same as before after calling
ResultPartitionManger#onConsumedPartition
, remove from map and release directly. -
Based on external release: via
ResultPartitionManager#releasePartition
.
And we only have one boolean tag in ResultPartition
to handle the external check. WDYT?
This suggestion is not relevant with pin issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... I never thought about combining isManagedExternally and the configuration flag. That's a great idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @zentol and the review @zhijiangW. I had some more comments. In particular, I would be interested whether we want to introduce a ReleaseOnConsumptionResultPartition
which encapsulates the release logic.
...e/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
Outdated
Show resolved
Hide resolved
...time/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
Outdated
Show resolved
Hide resolved
...runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
Outdated
Show resolved
Hide resolved
@tillrohrmann I implemented both of your suggestions. I like the renaming of externallyManaged to releasedOnConsumption; it makes things a lot easier to understand. As for the ResultPartition sub-class, the empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my comments @zentol. LGTM. +1 for merging once Travis gives green light.
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
Show resolved
Hide resolved
|
||
@Override | ||
void pin() { | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I am not sure why it needs while
loop here before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow Zhijiang's thought, I think we can now remove the pin method from the ResultPartition, since it is only the implementation detail of the ReleaseOnConsumptionResultPartiton. Then we can move the initialization of the pending reference to the Constructor to ensure no concurrent access to the variable, then we can use pendingReferences.set(subpartitions.length) to initialize the variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created FLINK-12842 and FLINK-12843 for the ref-counter issues which could be solved separately after this PR merged, because these issues already exist before and are not in the scope of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we can resolve these in follow-ups.
Thanks for the updates @zentol and the reviews @tillrohrmann . |
What is the purpose of the change
Based on #8608; only the last 2 commits are relevant.
The first commit introduces the concept of externally managed partitions, i.e. partitions that are not automatically released by the ShuffleEnvironment on consumption, but instead when instructed to do so by the TaskManager, or more generally as part of the partition lifecycle management. On it's own this commit does not change any behavior, as no part of the system makes use of this flag.
This is primarily a preparatory step for the subsequent commit and future partition lifecycle management.
The second commit introduces a dev feature flag to enable/disable the automatic release of consumed blocking result partitions by the shuffle environment.
If the flag is enabled, which is the default, then the system does not behave differently to how it does currently.
Once the partition lifecycle is fully introduced we will either switch the default to false, or remove the flag entirely.