Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient #5215

Closed

Conversation

tillrohrmann
Copy link
Contributor

What is the purpose of the change

Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication
with Yarn and, thus, gives a better separation of concerns.

Brief change log

  • Replace the PollingThread with the YarnApplicationStatusMonitor
  • Decouple YarnClusterClient from Yarn ApplicationStatus polling

Verifying this change

  • Changes covered by existing tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

Copy link
Member

@GJL GJL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it looks good. I left some comments.


private static void printClusterMessages(YarnClusterClient clusterClient) {
final List<String> messages = clusterClient.getNewMessages();
if (messages != null && messages.size() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if (!messages.isEmpty()) should suffice because messages is never null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. Will change it.


@Override
public void close() throws Exception {
applicationStatusUpdateFuture.cancel(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to declare throws Exception here because cancel() does not throw any checked exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, will remove it.

yarnCluster,
yarnApplicationStatusMonitor,
true);
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing YarnApplicationStatusMonitor should not throw any checked exceptions. If you change the signature, this catch block won't be needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, will remove it.

try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
yarnDescriptor.getYarnClient(),
yarnCluster.getApplicationId(),
new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use the ScheduledExecutor interface from Flink? Why not use Java's ScheduledExecutorService directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ScheduledExecutor gives a better abstraction because it does not expose service control methods like shutdown to the callee. I think the Java abstraction is slightly broken in this regard.

yarnApplicationStatusMonitor,
acceptInteractiveInput);
} catch (Exception e) {
LOG.info("Could not properly close the Yarn application status monitor.", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Catch block could be avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it.

@@ -660,7 +570,25 @@ public int run(
"yarn application -kill " + applicationId.getOpt());
yarnCluster.disconnect();
} else {
runInteractiveCli(yarnCluster, true);
ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the executor could as well be in the Monitor. If needed in the future, one could provide a constructor that accepts an external executor (e.g., for unit tests).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it could be. That way, however, we support that we can use an arbitrary executor which is available (as you've mentioned for tests). Since refactoring wouldn't add much value, I'll keep it like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

runInteractiveCli(
yarnCluster,
yarnApplicationStatusMonitor,
acceptInteractiveInput);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code block looks duplicated except for this flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is. In one of my later PRs, I removed this code duplication. Therefore I leave it like this for the moment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
boolean continueRepl = true;
int numTaskmanagers = 0;
long unknownStatusSince = System.currentTimeMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: System.nanoTime() should be preferred to measure elapsed time because it does not depend on wall clock, i.e., it is not affected by the user changing the system's time: https://stackoverflow.com/a/351571
However, if you use nanoTime(), the trick in line 729 with negative unknownStatusSince won't work.

@tillrohrmann
Copy link
Contributor Author

Thanks for the review @GJL. I've addressed your comments. Once Travis gives green light, I'll merge the PR.

@tillrohrmann tillrohrmann force-pushed the removeSpecialClusterClients branch 2 times, most recently from daf7536 to d7f4d2c Compare January 10, 2018 14:22
…lusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication
with Yarn and, thus, gives a better separation of concerns.
@asfgit asfgit closed this in 2ce5b98 Jan 11, 2018
@tillrohrmann tillrohrmann deleted the removeSpecialClusterClients branch January 11, 2018 16:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants