New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12667][runtime] Add JobID to TaskExecutorGateway#releasePartitions #8630
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @zentol, I left some comments
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
Show resolved
Hide resolved
@@ -641,7 +641,7 @@ private void stopTaskExecutorServices() throws Exception { | |||
} | |||
|
|||
@Override | |||
public void releasePartitions(Collection<ResultPartitionID> partitionIds) { | |||
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit weird that we will leave jobId
here unused, is the future change that needs it going to be so big?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not gonna be big, but this part is simply already done.
final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); | ||
taskManagerGateway.setReleasePartitionsConsumer(releasedPartitions::setFields); | ||
|
||
final SimpleSlot slot = new SimpleSlot( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could provide an utility for creating SimpleSlot
in this class, because it would be reused for many tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
such a utility already exists in #createProgrammedSlotProvider, will update the test to use that instead
Collections.emptySet(), | ||
TestingUtils.infiniteTime()); | ||
|
||
execution.deploy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could further provide an utility for creating Execution like below:
private CompletableFuture<Execution> createExecution(
TaskManagerGateway taskManagerGateway,
JobVertex... vertices) throws Exception {
SimpleSlot slot = new SimpleSlot(
new SingleSlotTestingSlotOwner(),
new LocalTaskManagerLocation(),
0,
taskManagerGateway);
ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
slotProvider.addSlot(vertices[0].getID(), 0, CompletableFuture.completedFuture(slot));
ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
new JobID(),
slotProvider,
new NoRestartStrategy(),
vertices);
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertices[0].getID());
ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
Execution execution = executionVertex.getCurrentExecutionAttempt();
CompletableFuture<Execution> allocationFuture = execution.allocateAndAssignSlotForExecution(
slotProvider,
false,
LocationPreferenceConstraint.ALL,
Collections.emptySet(),
TestingUtils.infiniteTime());
return allocationFuture;
}
Then we could get Execution from future, and further get ExecutionVertex and ExecutionGraph from Execution. Then this helper could be reused for many existing tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we could de-duplicate some code here. I'm concerned that we're baking in a few assumptions (vertex parallelism should be 1, first vertex in the array is special), and there a few subtle differences in tests that where I don't know whether they are significant or not. (For example, #testTaskRestoreStateIsNulledAfterDeployment
doesn't allocate a slot beforehand). I don't have the time right now to really look into these things, so I'd move any larger refactoring to this class into a follow-up.
execution.markFinished(); | ||
postFinishedExecutionAction.accept(execution); | ||
|
||
assertEquals(executionGraph.getJobID(), releasedPartitions.f0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use assertThat
instead for assertEquals
IntermediateResultPartition intermediateResultPartition = executionVertex | ||
.getProducedPartitions() | ||
.get(partitionId.getPartitionId()); | ||
assertNotNull(intermediateResultPartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assert might be not necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is done to ensure that the ids of all released partitions are actually valid. without it the test would pass even if completely random partitions were passed to the task executor.
I'll add a comment to clarify this; I had to think for a bit myself as to what we're checking here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @zentol .
It looks good to me, I think the added JobID
might be used in future, only has some deduplication concerns in test.
* Tests that the partitions are released in case of an execution cancellation after the execution is already finished. | ||
*/ | ||
@Test | ||
public void testPartitionReleaseOnCancelAfterFinished() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "...OnCancelingAfterFinished" would be better in considering of the gramma, but I think I won't argue for that. :)
* Tests that the partitions are released in case of an execution suspension after the execution is already finished. | ||
*/ | ||
@Test | ||
public void testPartitionReleaseOnSuspendAfterFinished() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, I think "...OnSuspendingAfterFinished" would be better, but I won't argue for that too.
ab7cf17
to
3c10fed
Compare
What is the purpose of the change
Adds
JobID
as an argument toTaskExecutorGateway#releasePartitions
to simplify bookkeeping on the taskmanager side.Additionally adds tests for calls to said method originating in
Execution
.