-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-13452] Ensure to fail global when exception happens during reseting tasks of regions #9268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 3c6363e (Wed Aug 07 13:19:15 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
CI failed due to FLINK-13488 and travis file cache not found. |
|
This fix still has problem when |
|
Reopen this wrt the comments from @zhuzhurk in FLINK-13452. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yun for opening this PR. I have a few comments.
| cancelTasks(verticesToRestart) | ||
| .thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor()) | ||
| .handle(failGlobalOnError())); | ||
| .thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think failure handling for the first part is still needed.
| // re-schedule tasks | ||
| rescheduleTasks(unmodifiedVertices, globalModVersion); | ||
| } catch (GlobalModVersionMismatch e) { | ||
| throw new IllegalStateException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All exceptions should be handled.
| }; | ||
| } | ||
|
|
||
| private BiFunction<Object, Throwable, Object> failGlobalOnError() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is still needed for failure handling of the first part of failover process.
| if (t != null) { | ||
| } catch (Throwable t) { | ||
| LOG.info("Unexpected error happens in region failover. Fail globally.", t); | ||
| failGlobal(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use FatalExitExceptionHandler to handle exceptions thrown from failGlobal directly.
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @Myasuka. I think we cannot merge these changes because they change the handling of exceptionally completed cancel futures.
| FutureUtils.assertNoException( | ||
| cancelTasks(verticesToRestart) | ||
| .thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor()) | ||
| .handle(failGlobalOnError())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not change this logic since cancelTasks can return an exceptionally completed future which would now cause the process to terminate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I am refactoring this PR by changing the result of RestartStrategy#restart to a CompletableFuture<?>
| if (t != null) { | ||
| } catch (Throwable t) { | ||
| LOG.info("Unexpected error happens in region failover. Fail globally.", t); | ||
| failGlobal(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to let the exception handling happen where it has been. Usually it is a good idea to do the exception handling at the highest possible level.
| * @return A {@link CompletableFuture} that will be completed when the restarting process is done. | ||
| */ | ||
| void restart(RestartCallback restarter, ScheduledExecutor executor); | ||
| CompletableFuture<?> restart(RestartCallback restarter, ScheduledExecutor executor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that CompletableFuture<Void> would convey clearly that "nothing" is returned by the future.
Also, wildcards in return types should be avoided: https://stackoverflow.com/a/22815270
| * @param <T> type of the result | ||
| * @return Future which schedule the given operation with given delay. | ||
| */ | ||
| public static <T> CompletableFuture<T> scheduleAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not unit tested but I think it should be because it could be used outside of the restart strategies. See FutureUtilsTest
| final JobGraph jobGraph, | ||
| final RestartStrategy restartStrategy) throws Exception { | ||
| final RestartStrategy restartStrategy, | ||
| final boolean failOnRecoverCheckpoint) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For unit testing the new behavior (i.e., failGlobal is invoked if an exception occurs while restarting), wouldn't it be enough to implement a RestartStrategy that returns an exceptionally completed future? I think it would be good enough because the exception doesn't have to be necessarily thrown while restoring checkpoints – in practice also an OOM can be thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, combine testFailGlobalIfErrorOnResetTasks and previous testFailGlobalIfErrorOnRescheduleTasks into one unit test as testFailGlobalIfErrorOnRestartTasks with given RestartStrategy which would fail when restarting.
| unionListState = context.getOperatorStateStore().getUnionListState(unionStateDescriptor); | ||
| Set<Integer> actualIndices = StreamSupport.stream(unionListState.get().spliterator(), false).collect(Collectors.toSet()); | ||
| Assert.assertTrue(CollectionUtils.isEqualCollection(EXPECTED_INDICES, actualIndices)); | ||
| if (getRuntimeContext().getTaskName().contains(SINGLE_REGION_SOURCE_NAME)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change really necessary to resolve FLINK-13452, or can it be in a separate ticket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to let this integration test to include global failover scenario, this is necessary.
| HighAvailabilityServicesUtilsTest.TestHAFactory.haServices = new FailHaServices(new TestingCheckpointRecoveryFactory(new FailRecoverCompletedCheckpointStore(1, 1), new StandaloneCheckpointIDCounter()), TestingUtils.defaultExecutor()); | ||
| Configuration configuration = new Configuration(); | ||
| configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region"); | ||
| configuration.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityServicesUtilsTest.TestHAFactory.class.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting feature, didn't know about it before. However, I have two questions regarding this integration test:
- As before, isn't it enough to use a RestartStrategy that returns an exceptionally completed future?
- We already test that
failGlobalis invoked from the unit tests. Does the integration test add coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, the RegionFailoverITCase did not catch bug of FLINK-13452 due to it did not involve global failover but only region failover. I prefer to add global failover in this integration test to verify the job could be restarted and result still correct.
|
Thank you for your contribution to Apache Flink @Myasuka. I think the PR is going in the right direction. |
|
@GJL new commit content is updated in description. UPDATED:
|
|
Thanks for the update. I'll have a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All in all, the PR looks good. There are some minor things that require attention. If you want, I can apply the last changes myself. WDYT?
| } | ||
|
|
||
| private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { | ||
| public static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be made private again.
| } | ||
|
|
||
| private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) { | ||
| private CompletableFuture<?> resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would return Function<Object, CompletableFuture<Void>> here so that we can write:
FutureUtils.assertNoException(
cancelTasks(verticesToRestart)
.thenComposeAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())
.handle(failGlobalOnError()));
This is consistent with the style before.
| private static class TestException extends IOException{ | ||
| private static final long serialVersionUID = 1L; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would revert this change (adding empty line).
| * | ||
| * @param configuration Configuration containing the parameter values for the restart strategy | ||
| * @return Initialized instance of FixedDelayRestartStrategy | ||
| * @throws Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * @param <T> type of the result | ||
| * @return Future which schedule the given operation with given delay. | ||
| */ | ||
| public static <T> CompletableFuture<T> scheduleAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add the following overload:
public static CompletableFuture<Void> scheduleWithDelay(
final Runnable operation,
final Time delay,
final ScheduledExecutor scheduledExecutor) {
Supplier<Void> operationSupplier = () -> {
operation.run();
return null;
};
return scheduleWithDelay(operationSupplier, delay, scheduledExecutor);
}
This will reduce in the RestartStrategy implementations.
| @Override | ||
| public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) { | ||
| if (restartedTimes.incrementAndGet() <= failureMultiTimes) { | ||
| CompletableFuture<Void> exceptionFuture = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use return FutureUtils.completedExceptionally(new FlinkRuntimeException(...))
| TestingUtils.defaultScheduledExecutor()); | ||
|
|
||
| int result = scheduleAsyncFuture.get(); | ||
| long completionTime = System.currentTimeMillis() - start; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There some issues that I see here:
System.currentTimeMillis()is impacted by system clock changes – this may cuase test failures in conjunction with NTP. ThereforeSystem.nanoTime()should be preferred.- Test will run at least 500ms which is on the slow side for unit tests. I think it's enough to use
ManuallyTriggeredScheduledExecutorand see if the correct result is produced. - I don't think using
AtomicIntegeradds test coverage. Imo it's enough to assert on the value ofresult.
|
|
||
| final ScheduledFuture<?> scheduledFuture = scheduledTasks.iterator().next(); | ||
|
|
||
| assertFalse(scheduledFuture.isDone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this assertion is needed because you already test if the future can be cancelled:
assertTrue(scheduledFuture.isCancelled());
This assertion will fail if the future is finished normally.
| public CompletableFuture<Void> restart(final RestartCallback restarter, ScheduledExecutor executor) { | ||
| currentRestartAttempt++; | ||
| executor.schedule(restarter::triggerFullRecovery, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS); | ||
| Supplier<Void> restartSupplier = () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This conversion to Supplier is duplicated multiple times. See my other comment in FutureUtils.
|
Sorry for late reply. |
|
Please review @tillrohrmann |
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Merging as soon as build is green. |
…uring reseting tasks of a region
|
Merged manually. |

What is the purpose of the change
After FLINK-13060, we would run
createResetAndRescheduleTasksCallbackwithin another runnableresetAndRescheduleTasks. Unfortunately, any exception happened increateResetAndRescheduleTasksCallbackwould cause the thread terminated silently but record the exception inoutcomeofFutureTask. We should change the code back to previously that wouldfailGlobalwithin thecreateResetAndRescheduleTasksCallbackrunnable.Brief change log
59b1a6d5 :
createResetAndRescheduleTasksCallbackfail global if come across any exception.RegionFailoverITCaseto mock the exception that checkpoint store would failed when recover from checkpoint for the 1st time.b732229 : Refactor interface of
RestartStrategy#restartand add UT to verifyfailGlobalIfErrorOnResetTasks073d815
Verifying this change
This change added tests and can be verified as follows:
RegionFailoverITCaseto mock the exception that checkpoint store would failed when recover from checkpoint for the 1st time.Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation