Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10411] Make ClusterEntrypoint more compositional #6743

Closed

Conversation

tillrohrmann
Copy link
Contributor

What is the purpose of the change

Introduce the ClusterComponent which encapsulates the logic for starting the cluster
components, Dispatcher, RestServerEndpoint and the ResourceManager. The individual
components are created by using a factory instance. The ClusterEntrypoint is now
only responsible for managing the required services needed by the ClusterComponent.
This design should make the testing of these components easier, improve reusability
and reduce code duplication.

Move the logic of when to exit the JVM process out of the ClusterEntrypoint
so that the caller is now responsible to make this call. This improves the
usage of the ClusterEntrypoint for testing purposes.

Brief change log

  • Introduce ClusterComponent responsible for managing the cluster components: Dispatcher, RestServerEndpoint and ResourceManager
  • Introduce factories for the cluster components
  • Update existing ClusterEntrypoints to use the new ClusterComponent

Verifying this change

  • Most changes are covered by existing tests since it is mainly restructuring code
  • Added ClassPathJobGraphRetrieverTest to test ClassPathJobGraphRetriever

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

Copy link
Contributor

@StefanRRichter StefanRRichter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the refactoring is good! Being new to this code, I was wondering if we could not simplify ClusterEntrypoint even more be separating the creation of collaborator object even more from the construction of the entry point. I think that is should be possible to avoid lazy initialization and to get the main fields final. We should also be able to avoid the need for locking in entry point and cluster component. Probably that could be improved in a separate PR if you think it is a good improvement.

@@ -163,9 +135,6 @@
@GuardedBy("lock")
private ClusterInformation clusterInformation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this field could become just a local variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we don't need it at all.

INSTANCE;

@Override
public WebMonitorEndpoint<RestfulGateway> createRestEndpoint(Configuration configuration, LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever, LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, TransientBlobService transientBlobService, Executor executor, MetricQueryServiceRetriever metricQueryServiceRetriever, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some newlines to break down the length of parameter list.

schedulerConfiguration,
taskManagerParameters,
taskManagerContainerSpec),
new FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use a helper method that does construct FileJobGraphRetriever from configuration because this is duplicated. At least "job.graph" could become a string constant somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll move it into the FileJobGraphRetriever.

@GuardedBy("lock")
private JobManagerMetricGroup jobManagerMetricGroup;

public ClusterComponent(DispatcherFactory<T> dispatcherFactory, ResourceManagerFactory<?> resourceManagerFactory, RestEndpointFactory<?> restEndpointFactory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linebreaks.

INSTANCE;

@Override
public ResourceManager<YarnWorkerNode> createResourceManager(Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newlines.

@Nonnull
private final String[] programArguments;

public ClassPathJobGraphRetriever(@Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings, @Nonnull String[] programArguments) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newlines.

}

@Override
public MiniDispatcher createDispatcher(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, HistoryServerArchivist historyServerArchivist) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linebreaks.

* Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint}
* in the same process.
*/
public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As somebody looking into this code for the fist time, I found that the name ClusterComponent for this class was not really helpful to understand what it does or what it is good for. Maybe there is a name that makes this a it clearer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I'm also not super happy with the name. Before, I named it DispatcherComponent but this is also not perfect. Maybe it should be named DispatcherResourceManagerComponent/DispatcherResourceManagerRpcEndpoints?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ClusterCoreCollaborators? It also does not give info about what it contains, but gives a hint that it is holding references to core collaborators and you can dig into the class for more details?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or a different view, if we cannot even find a name for it, then maybe it should not exists in the current form? (given that: one class = one concept/responsibility)

try {
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName, e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

return terminationFuture;
}

protected void startCluster() {
protected void startCluster() throws ClusterEntrypointException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could currently be private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. I actually use it in a follow-up PR for testing purposes. Would, thus, just keep it like this.

Copy link
Contributor

@StefanRRichter StefanRRichter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some minor fixes and suggestions, but overall looking good!

@tillrohrmann
Copy link
Contributor Author

Thanks a lot for your review @StefanRRichter. I agree that instantiating the ClusterComponent in an eager way is a good improvement. I'll add another commit for that.

Introduce a ClusterComponent which encapsulates the logic for starting the cluster
components, Dispatcher, RestServerEndpoint and the ResourceManager. The individual
components are created by using a factory instance. The ClusterEntrypoint is now
only responsible for managing the required services needed by the ClusterComponent.
This design should make the testing of these components easier, improve reusability
and reduce code duplication.
Move the logic of when to exit the JVM process out of the ClusterEntrypoint
so that the caller is now responsible to make this call. This improves the
usage of the ClusterEntrypoint for testing purposes.
This commit introduces the DispatcherResourceManagerComponentFactory which is used
to create a DispatcherResourceManagerComponent. That way, it is possible to eagerly
initialize all fields of the DispatcherResourceManagerComponent making it possible
to make all fields final and remove the lock.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants