Skip to content

Conversation

@StephanEwen
Copy link
Contributor

Currently, the thread pools of the JobManager do not have any UncaughtExceptionHandler.

While uncaught exceptions are rare (Flink handles exceptions aggressively in most places), when exceptions slip through in these threads (which execute future responses and delayed actions), the JobManager may be in an inconsistent state and not function properly any more.

This pull request adds a handler that results in a process kill in the case of uncaught exceptions. Letting the JobManager be restarted by the respective cluster framework is the only guaranteed way to be safe.

This also unifies the ExecutorThreadFactory and NamedThreadFactory.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good to me. +1 for merging :-)

The change adds an important safety net to not allow the system to go into a corrupted state. Really good addition!

Copy link
Contributor

@StefanRRichter StefanRRichter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a very good change to me, +1. I had only ver minor comments.


this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
this.executorService = Executors.newScheduledThreadPool(10,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any rational behind the magic number 10 or do we use this because it was 10 before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR just did not want to change anything else than what its goal was.
The 10 is pretty magic, though, agreed. Something relative to the number of cores seems to make more sense, intuitively.

futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
new ExecutorThreadFactory("mesos-jobmanager-future"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if 'akkaExecutor' and 'mesos-jobmanager-akka' (or 'coordinationFutureExecutor' if we want to be more general than 'akka') would carry more information for people not familiar with the code. As far as I can see, this pool is only used by Akka, whereas the name could imply that it is somehow used for general futures or even async user code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pool is not really tied to Akka. Akka has its own threads for the actors. The JobManager actor uses the "future" pool for futures produced by the actors. The ExecutionGraph also uses that pool for some callbacks.

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line is either incomplete or could be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, incomplete, will fix that.

timeout = AkkaUtils.getTimeout(config);
} catch (NumberFormatException e) {
}
catch (NumberFormatException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary reformatting that complicates git blame

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we should have a discuss about whether needing line break behand }. enable it to checkstyle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is still my old code style config. IntelliJ sometimes triggers some local reformatting.
@shijinkui Updating the code style has been a discussion forever. To include this into the style, one would need to fix many styles. But it is ultimately a good idea to have this, agreed.

NicoK pushed a commit to NicoK/flink that referenced this pull request Feb 10, 2017
This adds a JVM-terminating handler that logs errors from uncaught exceptions
and terminates the process so that critical exceptions are not accidentally
lost and leave the system running in an inconsistent state.

It borrows and re-uses code from @StephanEwen from this PR:
apache#3290
@StephanEwen
Copy link
Contributor Author

Addressing the comments and merging this...

StephanEwen added a commit to StephanEwen/flink that referenced this pull request Feb 10, 2017
@asfgit asfgit closed this in ef77c25 Feb 10, 2017
joseprupi pushed a commit to joseprupi/flink that referenced this pull request Feb 12, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants