New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-21401] Consolidate JobGraph generation to properly instantiate streaming and batch JobGraphs #15093
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit ff8af80 (Fri Mar 05 09:05:42 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for all the cleanups! Except for one minor oversight, I couldn't find any major issues in the change.
+1 to merge once CI passed.
JobGraphBuilder.newStreamingJobGraphBuilder() | ||
.addJobVertices(Arrays.asList(source, target)) | ||
.addClasspaths(Collections.singletonList(jarFileInJobGraph.toUri().toURL())) | ||
.build(); | ||
|
||
jobGraph.setClasspaths(Collections.singletonList(jarFileInJobGraph.toUri().toURL())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobGraph.setClasspaths(Collections.singletonList(jarFileInJobGraph.toUri().toURL())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Will remove it.
@@ -260,100 +245,6 @@ public static void teardownClass() { | |||
} | |||
} | |||
|
|||
@Test | |||
public void testDeclineCheckpointInvocationWithUserException() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have added a reference to this discussion. Thanks for digging it up.
fail("Previous statement should have failed"); | ||
} catch (ExecutionException t) { | ||
assertTrue(t.getCause() instanceof UnavailableDispatcherOperationException); | ||
} | ||
|
||
// submission has succeeded, let the initialization finish. | ||
blockingJobGraph.f1.unblock(); | ||
latch.trigger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much nicer than the Tuple2 with the vertex! Thanks for the cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Tuple2
is actually quite ugly. We should try to never use it if possible.
.setJobId(jobId) | ||
.addJobVertex(blockingJobVertex) | ||
.build(), | ||
blockingJobVertex); | ||
} | ||
|
||
private static class FailingJobVertex extends JobVertex { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the various cleanups! I took some notes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also added a couple of tests which relied on scheduler details. I think before we did not have the necessity to separate tests thoroughly.
The AdaptiveScheduler needs to make the JobVertices serializable and let them use static fields to communicate with the test because the JobGraph is copied. This closes apache#15093.
80919f4
to
c1139e5
Compare
Thanks for the review @rmetzger. I've addressed your comments. I will merge this PR once AZP gives green light. |
The AdaptiveScheduler needs to make the JobVertices serializable and let them use static fields to communicate with the test because the JobGraph is copied. This closes apache#15093.
c1139e5
to
c285494
Compare
…rFactoryTest to use streaming jobs
…hUserException to not need JobMaster instance
…endent of underlying JobMaster implementation This commit makes the DispatcherTest.testErrorDuringInitialization independent of the underlying JobMaster implementation by splitting it into DispatcherTest.testJobManagerRunnerInitializationFailureFailsJob and JobManagerRunnerImplTest.testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError which tests in two steps what DispatcherTest.testErrorDuringInitialization tested by relying on the DefaultScheduler to eagerly create an ExecutionGraph and to fail.
… not depend on underlying Scheduler implementation By using the BlockingJobManagerRunnerFactory it is possible to decouple the DispatcherTest.testInvalidCallDuringInitialization from the underlying Scheduler implementation which was used for blocking the creation of the JobManagerRunner before.
…ndependent of Scheduler implementation By letting the DispatcherTest.testWaitingForJobMasterLeadership use the TestingJobManagerRunnerFactory we can abstract this test from the implementation details of the Scheduler and when it shows which JobStatus.
…dToExecutionGraph independent of Scheduler implementation Change Dispatcher.testInitializationTimestampForwardedToExecutionGraph to testInitializationTimestampForwardedToJobManagerRunner which only tests that the initialization timestamp is forwarded to the JobManagerRunner. Additionally, this commit adds AdaptiveSchedulerTest.testExecutionGraphGenerationSetsInitializationTimestamp, AdaptiveSchedulerTest.testInitializationTimestampForwarding and DefaultSchedulerTest.testCorrectSettingOfInitializationTimestamp to test the timestamp forwarding for the different Scheduler implementations.
…BeStarted independent of Scheduler implementation This commit makes the DispatcherTest.testFatalErrorIfRecoveredJobsCannotBeStarted independent of the scheduler implementation by using the TestingJobManagerRunnerFactory and letting the TestingJobManagerRunner complete with an initialization error.
…ndent of underlying scheduler Make DispatcherTest.testNonBlockingJobSubmission independent of underlying scheduler by using the BlockingJobManagerRunnerFactory and blocking the creation of the JobManagerRunner.
…ot needed The leader elections are only needed where we instantiate a proper JobManagerRunnerImpl which needs leader election.
The AdaptiveScheduler needs to make the JobVertices serializable and let them use static fields to communicate with the test because the JobGraph is copied. This closes apache#15093.
…nough slots for test jobs
…or the adaptive scheduler The adaptive scheduler does not support stop with savepoint yet. Therefore, this test cannot work.
…otCreation and .testStopWithSavepointFailingAfterSnapshotCreation for the AdaptiveScheduler The AdaptiveScheduler does not support stopping jobs with savepoint yet.
…ncreasing resource timeout to 30s for the AdaptiveScheduler The test seems to take some time to start the required TaskManagers. This can sometimes lead to resource timeouts in the WaitingForResources state if it takes too long. This commit fixes this problem by increasing the resource timeout to 30s.
c285494
to
364a8ef
Compare
The AdaptiveScheduler needs to make the JobVertices serializable and let them use static fields to communicate with the test because the JobGraph is copied. This closes apache#15093.
This PR consolidates the
JobGraph
generation into as few places as possible. Moreover it introduces explicit factory methods for streaming and batchJobGraphs
(JobType.STREAMING
andJobType.BATCH
) in order to better classify which tests can be run with theAdaptiveScheduler
and which strictly require theDefaultScheduler
.This PR also introduces a
JobGraphBuilder
which can be used to make theJobGraph
an immutable data structure. This builder is then used for the more complexJobGraph
creations for tests.Last but not least, this PR adapts some failing tests (mainly
DispatcherTest
) to also work with theAdaptiveScheduler
.