[GOBBLIN-1996] Add ability for Yarn app to terminate on finishing of temporal flow#3865
Conversation
6666bdd to
f479355
Compare
| // Start the application | ||
| this.serviceManager.startAsync().awaitHealthy(); | ||
| try { | ||
| this.serviceManager.startAsync().awaitHealthy(startupTimeout.getSeconds(), TimeUnit.SECONDS); |
There was a problem hiding this comment.
One of the services, the FSConfigurationManager, has some main thread interrupting behavior that causes the start call to last indefinitely.
This prevents users from being able to call the shutdown method because this method is synchronized. I'm adding a configuration for the timeout to be configurable, with the default being indefinitely waiting
There was a problem hiding this comment.
what does it mean for the "start call [to] last indefinitely"?
-
the
startAsync(it's not actually async)? -
or the
awaitHealthy(e.g. the FSConfigMgr never actually becomes healthy)?
also, if awaitHealthy never returns does that impede our ability to fail-fast when there's a legit problem, like the deployment is DOA?
There was a problem hiding this comment.
In this case, startAsync is not really working as expected when using the FSConfigurationManager. The actual work being done in cluster is done via the FSConfigurationManager when that class takes that config and produces a source out of the .conf file.
The awaitHealthy call here waits indefinitely and suspends this current thread in the current implementation. It does not impede our ability to fail-fast because often just hangs if there's an issue. If anything, we can fail faster because a configurable timeout allows us to unblock this thread and trigger the shutdown when the Workflow finishes (successfully or unsuccessfully)
| properties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS, Long.toString(300)); | ||
| } | ||
| this.applicationLauncher = new ServiceBasedAppLauncher(properties, this.clusterName); | ||
| this.applicationLauncher = new ServiceBasedAppLauncherWithoutMetrics(properties, this.clusterName); |
There was a problem hiding this comment.
Metrics in these pipelines are causing noisy logging and the metrics here are currently not used by Temporal this temporal in any meaningful way, so I've disabled them here.
Disabling metrics via the gobblin metris key is not desirable because we still need GTE emission to work
|
|
||
| LOGGER.info("Stopping the Gobblin Cluster Manager"); | ||
|
|
||
| if (this.idleProcessThread != null) { |
There was a problem hiding this comment.
Temporal needs to run in yarn mode. And so we do not need the standalone cluster / bare metal code
phet
left a comment
There was a problem hiding this comment.
nice crisp solution... I like it!
| // Start the application | ||
| this.serviceManager.startAsync().awaitHealthy(); | ||
| try { | ||
| this.serviceManager.startAsync().awaitHealthy(startupTimeout.getSeconds(), TimeUnit.SECONDS); |
There was a problem hiding this comment.
what does it mean for the "start call [to] last indefinitely"?
-
the
startAsync(it's not actually async)? -
or the
awaitHealthy(e.g. the FSConfigMgr never actually becomes healthy)?
also, if awaitHealthy never returns does that impede our ability to fail-fast when there's a legit problem, like the deployment is DOA?
ba5e03e to
f1ae369
Compare
f1ae369 to
87a478b
Compare
87a478b to
4d2f1c7
Compare
| public static class Factory { | ||
| private static final ActivityOptions DEFAULT_OPTS = ActivityOptions.newBuilder().build(); | ||
| private static final ActivityOptions DEFAULT_OPTS = ActivityOptions.newBuilder() | ||
| .setStartToCloseTimeout(Duration.ofHours(24)) |
There was a problem hiding this comment.
what are the ramifications of this? a job could still run >24 hours right? does this merely constrain the activity that attempts to send GTEs to finish performing the send within 24 hours?
There was a problem hiding this comment.
Correct. the job itself can still run for > 24 hours. But if for some reason kafka is down, this operation can take an arbitrary amount of time. So let's cap it at a reasonably high number (24 hours)
There are 2 things to consider here:
- GaaS won't be able to detect if a job has finished until this activity succeeds. If Kafka is down, we want to be able to retry this task until kafka is back.
- If the job itself is completed but this timer is unable to send the message, I think 24 hours is a reasonable amount of time to retry for before giving up. We ideally don't want to have to retry the work just because the timer failed for a brief period of time (e.g. a few hours of downtime)
There was a problem hiding this comment.
sure that's fine. for this, even 8 or 12 hours should be far sufficient. please add clarifying comment that this is max time to attempt to send, potentially waiting out a kafka outage
0c53cca to
40153a4
Compare
phet
left a comment
There was a problem hiding this comment.
I think we're there, or at least super close!
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
In the above diagram, Yarn application will terminate when it's done exeucting the submitted Temporal workflow. And the monitoring Azkaban job will terminate when it sees the yarn application finish
Tests
Commits