Skip to content

GOBBLIN-XXXX: GGW-emitted terminal GTE on AM failure + AM/launcher exit-code propagation#4197

Draft
pratapaditya04 wants to merge 2 commits into
apache:masterfrom
pratapaditya04:pratapaditya04/temporal-am-job-completion-gte-v2
Draft

GOBBLIN-XXXX: GGW-emitted terminal GTE on AM failure + AM/launcher exit-code propagation#4197
pratapaditya04 wants to merge 2 commits into
apache:masterfrom
pratapaditya04:pratapaditya04/temporal-am-job-completion-gte-v2

Conversation

@pratapaditya04
Copy link
Copy Markdown
Contributor

JIRA

  • My PR addresses the following Gobblin JIRA issue and references it in the PR title.
    • GOBBLIN-XXXX (follow-up to the AM shutdown-hook GTE work in commit dd6b26b70)

Description

Today GaaS relies on the GobblinTrackingEvent (GTE) stream as the source of truth for job outcomes, but two gaps make the signal unreliable for the temporal-on-yarn flow:

  1. Missing terminal GTEs on AM failure paths. The previous commit on this branch (dd6b26b70 — Emit job-completion GTE from AM shutdown hook as single source of truth) installed a JVM-shutdown-hook GTE emitter inside + "GobblinTemporalJobLauncher.emitJobCompletionGTE()" + . That covers graceful AM shutdown. It does NOT cover JVM-fatal paths — OOM, SIGKILL, container-kill before the hook registers, AM CLI parse error — where the AM JVM dies before the hook can run. From GaaS's perspective the job is stuck.
  2. Wrong exit status. Both + "GobblinTemporalApplicationMaster.main()" + and + "GobblinYarnAppLauncher.main()" + exited 0 on essentially every code path. The AM only + "System.exit(1)" + s on CLI parse failure; on actual job failure it returned cleanly through the try-with-resources and the JVM exited 0. The launcher had no + "System.exit" + at all. This masked real failures from Grid Gateway dashboards.

This PR closes both gaps:

AM-side exit code ( + "gobblin-temporal" + )

  • + "GobblinTemporalJobLauncher" + caches the terminal + "WorkflowExecutionStatus" + into a static field, populated either by + "handleLaunchFinalization" + (normal flow, while Temporal stubs are alive) or by + "emitJobCompletionGTE" + (JVM shutdown hook).
  • Exposes + "getLastTerminalStatus()" + and static + "computeExitCode(WorkflowExecutionStatus)" + so the AM main can read the cache after + "close()" + has shut down Temporal stubs.
  • + "GobblinTemporalApplicationMaster.main()" + reads the cache after + "start()" + returns and calls + "System.exit(0|1)" + , surfacing job failures as non-zero AM JVM exit codes.

GGW launcher ( + "gobblin-yarn/GobblinYarnAppLauncher" + )

  • + "handleApplicationReportArrivalEvent" + now calls a new + "handleTerminalAppStatus" + , which maps + "FinalApplicationStatus" + + "JOB_FAILED" + / + "JOB_CANCEL" + (SUCCEEDED → no synthetic GTE, since the AM hook covers it), emits with the same event names and metadata schema as the AM-side hook so + "KafkaAvroJobStatusMonitor" + consumes both uniformly, and sets the launcher's exit code to 1.
  • + "handleGetApplicationReportFailureEvent" + exhaustion → + "JOB_FAILED" + with + "failureReason = GGW_LOST_AM_VISIBILITY" + (distinct from + "GGW_OBSERVED_AM_TERMINATION" + ) so triage can tell the two apart, plus + "exitCode = 1" + .
  • + "main()" + now ends with + "System.exit(launcher.getExitCode())" + .
  • Emission is idempotency-guarded by an + "AtomicBoolean" + to handle EventBus replays and the race between the two handlers.

Scope: temporal flow only. Shared schema is reused ( + "TimingEvent.LauncherTimings.{JOB_SUCCEEDED, JOB_FAILED, JOB_CANCEL}" + , + "TimingEvent.FlowEventConstants.{FLOW_GROUP_FIELD, FLOW_NAME_FIELD, FLOW_EXECUTION_ID_FIELD, JOB_NAME_FIELD, JOB_GROUP_FIELD}" + , + "ConfigurationKeys.{FLOW_GROUP_KEY, ...}" + ).

Tests

  • My PR adds the following unit tests.

+ "GobblinTemporalJobLauncherTest" + (7 new cases):

  • + "testComputeExitCodeForCompletedReturnsZero" + / + "testComputeExitCodeForAnyNonCompletedReturnsOne" + cover the AM main's static exit-code helper for every + "WorkflowExecutionStatus" + .
  • + "testHandleLaunchFinalizationPopulatesLastTerminalStatus" + verifies normal-flow status capture.
  • + "testLastTerminalStatusSurvivesClose" + ensures the cache outlives + "close()" + shutting down Temporal stubs (this is what the AM main relies on).
  • + "testEmitJobCompletionGTEDoesNotReQueryWhenStatusAlreadyCached" + verifies the hook reuses the cached value rather than re-querying Temporal during shutdown.
  • + "testConstructorResetsStaleTerminalStatus" + guards against cross-test cache leakage (matters for any JVM that reuses launchers).

+ "GobblinYarnAppLauncherTerminalGteTest" + (new file, 16 cases): focused unit tests that avoid the heavy + "MiniYARNCluster" + + Helix integration setup used by + "GobblinYarnAppLauncherTest" + , by injecting fields with reflection on a CALLS_REAL_METHODS Mockito stand-in. Covers + "mapFinalAppStatusToEventName" + for every + "FinalApplicationStatus" + , + "handleTerminalAppStatus" + exit-code wiring + idempotency, + "handleLostAmVisibility" + exit-code + idempotency + interaction with already-emitted GTEs, and + "buildLauncherTerminalMetadata" + (flow-id population from + "Config" + , diagnostics truncation, lost-visibility variant).

All 23 + "gobblin-temporal:test" + cases pass; new + "GobblinYarnAppLauncherTerminalGteTest" + passes; + "checkstyleMain" + / + "checkstyleTest" + clean on both modules.

Commits

  • My commits all reference JIRA issues in their subject lines.

🤖 Generated with Claude Code

pratapaditya04 and others added 2 commits May 26, 2026 22:24
… source of truth

Previously, the temporal-on-yarn execution path emitted a GobblinTrackingEvent
only on the failure branch of ExecuteGobblinWorkflowImpl (JOB_FAILED inside the
worker JVM); the success path emitted nothing, and the AM-killed-mid-execute
case produced no terminal GTE at all, leaving KafkaJobStatusMonitor without a
terminal state to record.

Register a JVM shutdown hook in GobblinTemporalJobLauncher (mirroring the
existing registerCleanupShutdownHook pattern) that, on AM exit, queries
Temporal via describeWorkflowExecution and emits a single completion GTE based
on the workflow's terminal status:
  COMPLETED                       -> JOB_SUCCEEDED
  CANCELED                        -> JOB_CANCEL
  FAILED/TERMINATED/TIMED_OUT     -> JOB_FAILED
  RUNNING/CONTINUED_AS_NEW/other  -> JOB_FAILED + failureReason=AM_TERMINATED_DURING_EXECUTION

Because each yarn application runs exactly one workflow (see
handleLaunchFinalization), AM completion coincides with job completion, so the
hook is the single source of truth. Removed the workflow-internal JOB_FAILED
emission to avoid duplicate events. AtomicBoolean guard makes the hook
idempotent across multiple shutdown triggers (close() + JVM exit).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…it-code propagation

The AM-side shutdown-hook GTE (from the prior commit on this branch) covers
graceful AM shutdown but does not run when the JVM dies before the hook
registers (OOM, SIGKILL, container-kill, CLI parse error). Both
GobblinTemporalApplicationMaster.main and GobblinYarnAppLauncher.main also
exited 0 even when the underlying job failed, masking failures from GaaS and
Grid Gateway dashboards.

GobblinTemporalJobLauncher now caches the terminal WorkflowExecutionStatus
into a static field, populated either by handleLaunchFinalization (normal
flow, while Temporal stubs are alive) or by emitJobCompletionGTE (JVM
shutdown). GobblinTemporalApplicationMaster.main reads that cache after
start() returns and calls System.exit accordingly, surfacing job failures as
non-zero AM exit codes.

GobblinYarnAppLauncher now emits a synthetic terminal GTE when the YARN
ApplicationReport reports FAILED/KILLED/UNDEFINED, using the same event names
(JOB_FAILED, JOB_CANCEL) and metadata schema as the AM hook so downstream
KafkaAvroJobStatusMonitor consumes them uniformly. Distinct failureReason
values (GGW_OBSERVED_AM_TERMINATION vs GGW_LOST_AM_VISIBILITY) let triage
tell apart an observed AM termination from a lost-visibility timeout on
exhausted report fetches. The launcher tracks an exit code on the same code
paths and main() now exits with it.

Emission is idempotency-guarded by an AtomicBoolean to handle EventBus
replays and the race between the two handlers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant