Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7216] [distr. coordination] Guard against concurrent global failover #4364

Closed

Conversation

StephanEwen
Copy link
Contributor

This is one of the blocker issues for the 1.3.2 release.

What is the purpose of the change

This fixed the bug FLINK-7216 where some race conditions can trigger concurrent failovers, triggering a restart-storm.

The heart of the bug is the fact that we allow initiating another restart while already being in state RESTARTING. That was introduced as a safety net to catch exceptions (implementation bugs) that are reported in that state and need a full recovery to ensure consistency.

However, this means that accidentally, multiple restarts may be triggered/queued and then execute after another. While one attempt is executing the failover, the next one will interfere or abort (as detected conflicting) and schedule another recovery, leading to the above mentioned restart storm. The restart storm subsides once one restart attempt makes enough progress (before the other interferes) to actually finish the scheduling phase.

Brief change log

This contains three issues, because the first two were needed for a preparing the fix.

  • FLINK-6665 and FLINK-6667 introduce an indirection where the RestartStrategy does no longer call restart() on the ExecutionGraph directly. Instead, they call a callback to initiate the restart.
  • The actual fix makes sure that the globalModVersion (which tracks global changes such as full restarts in the ExecutionGraph) is unchanged between triggering the restart and executing it. When scheduling multiple restart requests, only one will actually take effect, while the others detect being subsumed.

Verifying this change

This change added the following tests:

  • ExecutionGraphRestartTest#testConcurrentGlobalFailAndRestarts() tests explicitly that setting
  • ExecutionGraphRestartTest#testConcurrentLocalFailAndRestart() tests a similar setup

The general working of that mechanism is also covered by various existing test in org.apache.flink.runtime.executiongraph.restart

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes:

It the change affects the restart logic on the JobManager.

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

…nd a ScheduledExecutor for ExecutionGraph restarts

Initial work by zjureel@gmail.com , improved by sewen@apache.org.
Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some checkstyle-related comments, to reduce the number changes necessary once we introduce to this part of flink-runtime.

/** The ExecutionGraph to restart */
private final ExecutionGraph execGraph;

/** Atomic flag to make sure this is used only once */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a period here.

*/
public class ExecutionGraphRestartCallback implements RestartCallback {

/** The ExecutionGraph to restart */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a period here.


/**
* A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the trailing space.

* Called by the ExecutionGraph to eventually trigger a full recovery.
* The recovery must be triggered on the given callback object, and may be delayed
* with the help of the given scheduled executor.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the trailing space.

@@ -37,15 +37,26 @@
/** Atomic flag to make sure this is used only once */
private final AtomicBoolean used;

public ExecutionGraphRestartCallback(ExecutionGraph execGraph) {
/** The globalModVersion that the ExecutionGraph needs to have for the restart to go through */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a period.

@StephanEwen
Copy link
Contributor Author

Concerning the 'period' check style rule:

I think that the common language rules (not JavaDoc specific) are to add a period after complete sentences. That would mean that parameter descriptions, when not complete sentences, are not terminated by a period.

Are we rolling a rule that every text line has to be terminated in a period/fullstop?

@zentol
Copy link
Contributor

zentol commented Jul 19, 2017

With the current rules, the first sentence of any javadoc must end in a period.

So, this is invalid:

/** some parameter */
private final int myParameter ...

But, this is fine:

// some parameter
private final int myParamter ...

@StephanEwen
Copy link
Contributor Author

Okay, will update the periods. The linguist in my heart cries a bit, but I guess it makes sense that we cannot expect checkstyle to figure out if a sentence is a complete sentence or not...

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a few nitpicks about comments and questions about some parts and the tests.

@@ -159,34 +161,6 @@ public void testRestartAutomatically() throws Exception {
}

@Test
public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are superseded by the newly added tests in FailureRateRestartStrategyTest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as part of introducing the "callback" indirection, we can now also test the restart strategies much better, without always setting up a full ExecutionGraph. I added it to the refactoring.

// ------------------------------------------------------------------------

/**
* This method makes sure that the actual interval and is not spuriously waking up.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This method makes sure to sleep for the required interval and that we don't spuriously wake up."?

Also, what happens if Thread.sleep() is interrupted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the whole method and test anyways aborts exceptionally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect

try {
synchronized (progressLock) {
JobStatus current = state;
// check and increment the global version to move this recovery up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"check the current global version to determine whether our recovery attempt is still current"?

It's not incrementing the global version here.

}
}

private static final class TriggeredRestartStrategy implements RestartStrategy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"A {@link RestartStrategy} that blocks restarting on a given {@link OneShotLatch}."?

// from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers
// which cannot easily tell us when that condition has happened, unfortunately.
try {
Thread.sleep(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😢 but it seems there's no way around it. Could this lead to flaky tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In very rare cases, it might. I want to change the Execution a bit on the master to make this unnecessary.

However, that is too much surgery in a critical part for a bugfix release, so I decided to be conservative in the runtime code and rather pay this price in the tests.

@@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception {
assertEquals(JobStatus.SUSPENDED, eg.getState());
}

@Test
public void testConcurrentLocalFailAndRestart() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only verifies that we don't break the existing and working local failover, right? This test should also succeed on the current master and I checked and it indeed does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this one was a test that should have been there in the first place and I took this chance to add it.

failTrigger.trigger();

waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
completeCancellingForAllVertices(eg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I noticed that completeCancellingForAllVertices() and finishAllVertices() have slightly misleading Javadoc. That threw me off a bit when reviewing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, those docs are copy/paste wrong ;-) I fixed them...

}

@Test
public void testConcurrentGlobalFailAndRestarts() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running this on current master and the test failed but I didn't see a "storm of restarts"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the offline chat: I think you are missing the asynchrony in the restarting, leading to a lock in the cherrypicked code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jip, I think so too.

@StephanEwen
Copy link
Contributor Author

Thanks for the reviews. Addressing the comments, rerunning tests, and merging...

@aljoscha
Copy link
Contributor

+1 for merging!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants