-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12203] Refactor ResultPartitionManager to break tie with Task #8210
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@flinkbot attention @tillrohrmann @zhijiangW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this refactoring @azagrebin. All in all, it looks good. I had some minor comments which we should address before merging. It would also be great if you could resolve the current merge conflicts and to see whether Travis passes.
@@ -41,19 +35,15 @@ | |||
|
|||
private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class); | |||
|
|||
public final Table<ExecutionAttemptID, IntermediateResultPartitionID, ResultPartition> | |||
registeredPartitions = HashBasedTable.create(); | |||
private final Map<ResultPartitionID, ResultPartition> registeredPartitions = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's specify an initial capacity here.
} | ||
|
||
public void releasePartitionsProducedBy(ExecutionAttemptID executionId, Throwable cause) { | ||
public void releasePartitionsProducedBy(ResultPartitionID partitionId, Throwable cause) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this method into releasePartition
registeredPartitions.get(partitionId).release(cause); | ||
registeredPartitions.remove(partitionId); | ||
LOG.debug("Released partition {} produced by {}.", | ||
partitionId.getPartitionId(), partitionId.getPartitionId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to do it the following way:
ResultPartition resultPartition = registeredPartitions.remove(partitionId);
if (resultPartition != null) {
resultPartition.release(cause);
}
@@ -667,11 +668,9 @@ private void stopTaskExecutorServices() throws Exception { | |||
} | |||
|
|||
@Override | |||
public void failPartition(ExecutionAttemptID executionAttemptID) { | |||
log.info("Discarding the results produced by task execution {}.", executionAttemptID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure whether to retain the previous log for tracing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about it, the problem here is that previously it happened per task. Now, it gets a list of partitions without assumption that it is per task. JM will still log it in sendReleaseIntermediateResultPartitionsRpcCall
. If more verbose mode is needed, debug level will enable per partition logging in ResultPartitionManager.releasePartition
in TM.
registeredPartitions.get(partitionId).release(cause); | ||
registeredPartitions.remove(partitionId); | ||
LOG.debug("Released partition {} produced by {}.", | ||
partitionId.getPartitionId(), partitionId.getPartitionId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last parameter in log should be partitionId.getProducerId()
?
final LogicalSlot slot = assignedResource; | ||
LOG.info("Discarding the results produced by task execution {}.", attemptId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to put this log in the first line of this method.
@@ -100,7 +102,9 @@ public String getAddress() { | |||
} | |||
|
|||
@Override | |||
public void failPartition(ExecutionAttemptID executionAttemptID) {} | |||
public void releasePartitions(Collection<ResultPartitionID> partitionIds) { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this nice refactor @azagrebin .
This work not only decouples the ExecutionAttemptID
with ShuffleService
, but also avoids calling NetworkEnvironment#getResultPartitionManager
on TaskExecutor
side. The introduced NetworkEnvironment#releasePartitions
is really within our expectations.
Almost LGTM on my side, just left some minor format comments.
Thanks for the reviews @tillrohrmann @zhijiangW ! I've pushed a commit to address them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates and I have no other concerns. LGTM! 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my comments. Somehow all Travis builds failed. I'm not sure whether this was only a transient problem. Could you rebase this PR and trigger another build to verify this @azagrebin?
c5b90ba
to
689f972
Compare
At the moment, we have ResultPartitionManager.releasePartitionsProducedBy which uses indexing by task in network environment. These methods are eventually used only by Task which already knows its partitions so Task can use ResultPartition.fail(cause) and TaskExecutor.failPartition could directly use NetworkEnviroment.releasePartitions(Collection<ResultPartitionID>). This also requires that JM Execution sends produced partition ids instead of just ExecutionAttemptID. This closes apache#8210.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Triggered another Travis build to see whether the build failures were transient. If Travis gives green light, I'll merge this PR.
At the moment, we have ResultPartitionManager.releasePartitionsProducedBy which uses indexing by task in network environment. These methods are eventually used only by Task which already knows its partitions so Task can use ResultPartition.fail(cause) and TaskExecutor.failPartition could directly use NetworkEnviroment.releasePartitions(Collection<ResultPartitionID>). This also requires that JM Execution sends produced partition ids instead of just ExecutionAttemptID. This closes apache#8210.
689f972
to
a250829
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All builds still fail. @azagrebin could you please check why this is the case?
@tillrohrmann thanks for retrying |
@tillrohrmann PR should be good to go now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging.
At the moment, we have ResultPartitionManager.releasePartitionsProducedBy which uses indexing by task in network environment. These methods are eventually used only by Task which already knows its partitions so Task can use ResultPartition.fail(cause) and TaskExecutor.failPartition could directly use NetworkEnviroment.releasePartitions(Collection<ResultPartitionID>). This also requires that JM Execution sends produced partition ids instead of just ExecutionAttemptID. This closes apache#8210.
What is the purpose of the change
The PR is based on #8133.
At the moment, we have ResultPartitionManager.releasePartitionsProducedBy which uses indexing by task in network environment. These methods are eventually used only by Task which already knows its partitions so Task can use ResultPartition.fail(cause) and TaskExecutor.failPartition could directly use NetworkEnviroment.releasePartitions(Collection). This also requires that JM Execution sends produced partition ids instead of just ExecutionAttemptID.
Later NetworkEnviroment.releasePartitions(Collection) could be refactored into ShuffleService.releasePartitions(Collection).
Brief change log
Verifying this change
The change is simple refactoring and should be addressed by existing tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation