Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5465] [streaming] Wait for pending timer threads to finish or … #5058

Conversation

StefanRRichter
Copy link
Contributor

…to exceed a time limit in exceptional stream task shutdown

What is the purpose of the change

When stream tasks are going through their cleanup in the failover case, pending timer threads can still access native resources of a state backend after the backend's disposal. In some cases, this can crash the JVM.

The obvious fix is to wait until all timers are finished before disposing those resources, and the main reason why we did not change this is that (in theory) a the user code of a triggering timer can block forever and suppress or delay restarts in failover cases. However, the situation has changed a bit since this topic was discussed for the last time and there is now also a watchdog that ensures that tasks are terminated after some time of inactivity. I would propose a middle ground that probably catches close to all of those problems, while still ensuring a reasonable fast shutdown. I would suggest to wait for the termination of the threadpool that runs all timer events until a time limit. Typically, there is a very limited number of in-flight timers and they are usually short lived. A wait interval of a few seconds should basically fix this problem, even though there can still be very rare cases of very long running timers that also happen to still access the disposed resources. But the likelihood of this scenario should be reduced by orders of magnitude and in particular, the cascading effect should be mitigated.

Brief change log

  • *SteamTasks wait in their cleanup code for a certain time limit to give the timer service a chance to finish all pending timer threads. *

Verifying this change

This change added tests to SystemProcessingTimeServiceTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

…to exceed a time limit in exceptional stream task shutdown
@StefanRRichter
Copy link
Contributor Author

CC @aljoscha


/**
* Shuts down and clean up the timer service provider hard and immediately. This does wait
* for all timer to complete or until the time limit is exceeded. Any call to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "timers"

@@ -122,6 +123,8 @@
/** The logger used by the StreamTask and its subclasses. */
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);

private static final long TIMER_SERVICE_TERMINATION_AWAIT_MS = 7500L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a configuration setting.

@StephanEwen
Copy link
Contributor

Looks good all in all.

I am torn between adding a configuration setting for the timer grace period and not having one. Seems like a setting that is unlikely to ever be used and just blows up the config space. On the other hand, magic numbers should be configurable...

@StefanRRichter
Copy link
Contributor Author

So which way can we agree upon? I feel like this is a setting that is very specific and hard to understand for the user. On the other hand, it is nice if we can adjust it by configuration in case it causes any problems.

@StefanRRichter
Copy link
Contributor Author

StefanRRichter commented Nov 23, 2017

Implemented the configurable timeout, so that we can choose between the two options.

@aljoscha
Copy link
Contributor

+1 This LGTM now.

@StefanRRichter
Copy link
Contributor Author

Thanks for the reviews @aljoscha and @StephanEwen. Will merge this now.

asfgit pushed a commit that referenced this pull request Nov 23, 2017
…to exceed a time limit in exceptional stream task shutdown.

This closes #5058.

(cherry picked from commit d86c6b6)
@asfgit asfgit closed this in d86c6b6 Nov 23, 2017
@StephanEwen
Copy link
Contributor

StephanEwen commented Nov 24, 2017

I would like to make a few comments for followup:

  • I think TimerServiceOptions should not be an own class, we are getting a crazy fragmentation of options into classes (that have sometimes, like here, only one option defined). Let's merge these into the TaskManagerOptions.

  • The config key does not reflect the scheme in which all other config keys are defined. It reads like a name where the dot '.' is a word separator. The scheme in which all other config keys are defined is hierarchical, like a path in a nested config group/object structure. Think that the configuration is one huge JSON object, and the key is the dot path to the entry. Hence a key like taskmanager.timers.shutdown-timeout (or taskmanager.timers.shutdown.timeout, if we view shutdown as a full config group/object) would be in line with the overall style.

@StefanRRichter
Copy link
Contributor Author

StefanRRichter commented Nov 24, 2017

I was thinking about where to put those options and after all it was a choice between fragmentation and inconsistency. I considered TaskManagerOptions but it sounds like a wrong place, if you look what the other options in this class are about and it also feels like saying Task == StreamTask because timers are something that currently belong to streaming imo (well, the same would also apply to checkpointing). There are classes for configurations all over the place, so if you think this is a problem, maybe we should fix it in general and not on a per-case basis?

I agree that we can change the key string.

@StefanRRichter StefanRRichter deleted the FLINK-5465-wait-timer-shutdown branch January 23, 2018 14:14
glaksh100 pushed a commit to lyft/flink that referenced this pull request Jun 6, 2018
…to exceed a time limit in exceptional stream task shutdown.

This closes apache#5058.

(cherry picked from commit d86c6b6)
@Aitozi
Copy link
Contributor

Aitozi commented Aug 12, 2018

Can i ask a question here? Why the code below in StreamTask can not make sure the processingTimer task are all been finished before the backend has been disposed? And need this PR to guarantee. I ran into the same bug in flink 1.3.2, but i cant quite understand does the awaitPendingAfterQuiesce don't work? @StefanRRichter

// make sure all timers finish
 timerService.awaitPendingAfterQuiesce();

@StefanRRichter
Copy link
Contributor Author

StefanRRichter commented Aug 12, 2018

@Aitozi the problem with the code is that (at least in that version of Flink) the thread could still become interrupted as part of an ongoing cancellation while waiting for the timer service to finish. It will then immediately unblock with InterruptedException and the bad things can still happen. The code needs to complete the shutdown even in the face of interruption or else you can still segault from time to time.

Edit: one more hint fyi, there is also still FLINK-9845 pending, which will make the timer service manager collaborative with cancellation, so that the waiting phase can become a bit shorter for some cases.

@zorro786
Copy link

@StefanRRichter It is mentioned in Jira ticket that this affects versions 1.5.0, 1.4.2, 1.6.0.
Could you please tell why it doesn't affect 1.4.0?

@StefanRRichter
Copy link
Contributor Author

It affects 1.4 as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants