From 5e757477153d4ad20b8e6d49afb5a98cec951e6a Mon Sep 17 00:00:00 2001 From: pratapaditya04 Date: Tue, 19 May 2026 00:30:57 +0530 Subject: [PATCH] [GOBBLIN-2264] Key deadline reminders by dagAction only so DELETE events can unschedule them Deadline reminder JobKeys previously embedded the lease event time, but the DagActionStore DELETE event payload omits event time, so the change monitor could not cancel a still-pending deadline reminder for a dagAction whose row had already been resolved. Re-key deadline reminders by (flowGroup, flowName, flowExecutionId, jobName, dagActionType) only and add an unscheduleReminderJob(DagAction) overload that the DELETE handler can invoke. Replace any pre-existing deadline reminder on schedule to avoid ObjectAlreadyExistsException when the delete-then-reinsert duplicate-insert path in DagProcUtils.sendEnforce*DeadlineDagAction races with an out-of-order change event. Retry reminders still embed event time and are intentionally not unscheduled on DELETE; their JobKey is not derivable from the DELETE payload, and orphaned retries fire harmlessly because the lease arbiter sees the row is gone. Re-enable the previously TODO-disabled DELETE assertion in DagManagementDagActionStoreChangeMonitorTest to cover the activated code path. --- ...gementDagActionStoreChangeMonitorTest.java | 16 ++-- .../DagActionReminderScheduler.java | 65 ++++++++++++++-- ...ManagementDagActionStoreChangeMonitor.java | 10 +-- .../DagActionReminderSchedulerTest.java | 77 +++++++++++++++++-- 4 files changed, 139 insertions(+), 29 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java index e9e71ce4cf3..0cad4e2e603 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java @@ -43,10 +43,12 @@ import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent; import org.apache.gobblin.service.monitoring.OperationType; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** @@ -100,8 +102,7 @@ public void setupMockMonitor() { @BeforeClass public void setUp() throws Exception { - doNothing().when(dagActionReminderScheduler).unscheduleReminderJob(any(), anyBoolean()); - + doNothing().when(dagActionReminderScheduler).unscheduleReminderJob(any(DagActionStore.DagAction.class)); } /** @@ -114,12 +115,11 @@ public void testProcessMessageWithDelete() throws SchedulerException { DagActionStore.DagAction dagAction = new DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, Long.parseLong(FLOW_EXECUTION_ID), JOB_NAME, DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE); mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); - /* TODO: skip deadline removal for now and let them fire - verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(), times(1)) - .unscheduleReminderJob(eq(dagAction), eq(true)); + // DELETE of an ENFORCE_*_DEADLINE row must unschedule the corresponding deadline reminder. Retry reminders are + // intentionally not unscheduled here (their JobKey embeds the lease event time, which the DELETE event payload + // does not carry); orphaned retries fire harmlessly via the lease arbiter. verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(), times(1)) - .unscheduleReminderJob(eq(dagAction), eq(false)); - */ + .unscheduleReminderJob(eq(dagAction)); } /** diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java index c5d997a45b7..544511dee7b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java @@ -97,6 +97,16 @@ public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long remind DagActionStore.DagAction dagAction = leaseParams.getDagAction(); JobDetail jobDetail = createReminderJobDetail(leaseParams, isDeadlineReminder); Trigger trigger = createReminderJobTrigger(leaseParams, reminderDurationMillis, System::currentTimeMillis, isDeadlineReminder); + // Deadline reminders are keyed only by the dagAction's primary key (no event time). The DagActionStore enforces + // at most one row per that key, so any pre-existing deadline reminder with the same key represents a stale entry + // from the duplicate-insert path (delete+reinsert in DagProcUtils.sendEnforce*DeadlineDagAction). If the change + // events arrive out of order on the consumer, the new INSERT could otherwise collide with the not-yet-unscheduled + // old reminder and throw ObjectAlreadyExistsException, causing the new deadline to be lost. Replace semantics + // are safe here because the newly-scheduled reminder is authoritative for that dagAction tuple. + if (isDeadlineReminder && quartzScheduler.checkExists(jobDetail.getKey())) { + log.info("Replacing pre-existing deadline reminder for {}", dagAction); + quartzScheduler.deleteJob(jobDetail.getKey()); + } log.info("Setting reminder for {} in {} ms, isDeadlineTrigger: {}", dagAction, reminderDurationMillis, isDeadlineReminder); quartzScheduler.scheduleJob(jobDetail, trigger); } @@ -109,16 +119,44 @@ public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams, boolea } /** - * Creates a key for the reminder job by concatenating all dagAction fields and the eventTime of the dagAction. + * Unschedule a deadline reminder identified solely by its {@link DagActionStore.DagAction}. The deadline reminder + * key intentionally omits the lease event time (see {@link #createDagActionReminderKey}), so callers that only + * have the dagAction (e.g. a DagActionStore DELETE event, whose payload has no event time) can still remove the + * reminder. + */ + public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws SchedulerException { + log.info("Unsetting deadline reminder for {}", dagAction); + if (!quartzScheduler.deleteJob(new JobKey(createDeadlineReminderKey(dagAction), DeadlineReminderKeyGroup))) { + // Expected on the normal lifecycle: the reminder fires at the deadline, the enforce-deadline task cleans up + // via deleteDagAction, and by the time the resulting DELETE event reaches this handler the Quartz job is + // already gone. Also benign on cold-start / partition-rebalance, where the reminder was scheduled on a + // different host. Logged at debug to avoid flooding warn-level operator views. + log.debug("No deadline reminder to unschedule for {} (reminder already fired, or scheduled on a different host).", + dagAction); + } + } + + /** + * Creates a key for the reminder job by concatenating all dagAction fields and, for retry reminders only, the + * lease event time of the dagAction. *

- * This ensures unique keys for multiple instances of the same action on the same flow execution that originate more - * than 'epsilon' apart. {@link MultiActiveLeaseArbiter} uses the eventTime to distinguish these distinct occurrences - * of the same action. This is necessary to prevent insertion failures due to previous reminders. + * For retry reminders the event time is required to ensure unique keys for multiple instances of the same action + * on the same flow execution that originate more than 'epsilon' apart (applicable to KILL and RESUME). + * {@link MultiActiveLeaseArbiter} uses the event time to distinguish these distinct occurrences. Without it, + * subsequent reminders for the same action would fail to insert because the Quartz key already exists. *

- * Applicable only for KILL and RESUME actions; duplication for other actions is an error. + * For deadline reminders ({@code ENFORCE_JOB_START_DEADLINE}, {@code ENFORCE_FLOW_FINISH_DEADLINE}) the event time + * is omitted. The DagActionStore enforces a primary key over + * {@code (flowGroup, flowName, flowExecutionId, jobName, dagActionType)} (see + * {@code DagProcUtils.sendEnforce*DeadlineDagAction}), so at most one deadline reminder exists for that tuple at + * any time. Omitting the event time lets the change monitor unschedule the reminder when the dagAction row is + * deleted, without needing the original event time (which is not present in the DELETE event payload). */ - public static String createDagActionReminderKey(DagActionStore.LeaseParams leaseParams) { + public static String createDagActionReminderKey(DagActionStore.LeaseParams leaseParams, boolean isDeadlineReminder) { DagActionStore.DagAction dagAction = leaseParams.getDagAction(); + if (isDeadlineReminder) { + return createDeadlineReminderKey(dagAction); + } return String.join(".", dagAction.getFlowGroup(), dagAction.getFlowName(), @@ -128,16 +166,27 @@ public static String createDagActionReminderKey(DagActionStore.LeaseParams lease String.valueOf(leaseParams.getEventTimeMillis())); } + private static String createDeadlineReminderKey(DagActionStore.DagAction dagAction) { + return String.join(".", + dagAction.getFlowGroup(), + dagAction.getFlowName(), + String.valueOf(dagAction.getFlowExecutionId()), + dagAction.getJobName(), + String.valueOf(dagAction.getDagActionType())); + } + /** * Creates a JobKey object for the reminder job where the name is the DagActionReminderKey from above and the group is * the flowGroup */ public static JobKey createJobKey(DagActionStore.LeaseParams leaseParams, boolean isDeadlineReminder) { - return new JobKey(createDagActionReminderKey(leaseParams), isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup); + return new JobKey(createDagActionReminderKey(leaseParams, isDeadlineReminder), + isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup); } private static TriggerKey createTriggerKey(DagActionStore.LeaseParams leaseParams, boolean isDeadlineReminder) { - return new TriggerKey(createDagActionReminderKey(leaseParams), isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup); + return new TriggerKey(createDagActionReminderKey(leaseParams, isDeadlineReminder), + isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup); } /** diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java index 9c03c4787d8..cd733109714 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java @@ -74,14 +74,14 @@ protected void handleDagAction(String operation, DagActionStore.DagAction dagAct break; case "DELETE": log.debug("Deleted dagAction from DagActionStore: {}", dagAction); - /* TODO: skip deadline removal for now and let them fire + // Cancel any pending deadline reminder so it does not fire for an action that has already been resolved. + // Retry reminders are not unscheduled here because their JobKey includes the lease event time, which is + // not carried in the DELETE event payload; orphaned retry reminders fire harmlessly (the lease arbiter + // sees the action is gone and drops the work). if (dagActionType == DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE || dagActionType == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) { - this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, true); - // clear any deadline reminders as well as any retry reminders - this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, false); + this.dagActionReminderScheduler.unscheduleReminderJob(dagAction); } - */ break; default: log.warn( diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java index 20e83a895c0..6c1fd072c9c 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java @@ -29,6 +29,7 @@ import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; +import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; @@ -61,10 +62,18 @@ public class DagActionReminderSchedulerTest { DagActionStore.DagActionType.LAUNCH.name(), eventTimeMillis); String expectedKey2 = Joiner.on(".").join(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH.name(), eventTimeMillis2); + // Deadline reminder keys intentionally omit the lease event time, since the DagActionStore primary key already + // guarantees uniqueness over (flowGroup, flowName, flowExecutionId, jobName, dagActionType) for ENFORCE_*_DEADLINE. + String expectedDeadlineKey = Joiner.on(".").join(flowGroup, flowName, flowExecutionId, jobName, + DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE.name()); DagActionStore.DagAction launchDagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); + DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = new DagActionStore.DagAction(flowGroup, flowName, + flowExecutionId, jobName, DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE); DagActionStore.LeaseParams launchLeaseParams = new DagActionStore.LeaseParams(launchDagAction, eventTimeMillis); DagActionStore.LeaseParams launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction, eventTimeMillis2); + DagActionStore.LeaseParams enforceFlowFinishDeadlineLeaseParams = + new DagActionStore.LeaseParams(enforceFlowFinishDeadlineDagAction, eventTimeMillis); DagActionReminderScheduler dagActionReminderScheduler; DagManagement dagManagement = mock(DagManagement.class); private static boolean testJobRan = false; @@ -77,8 +86,22 @@ private void setup() throws Exception { @Test public void testCreateDagActionReminderKey() { - Assert.assertEquals(expectedKey, DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams)); - Assert.assertEquals(expectedKey2, DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams2)); + // Retry reminders embed the lease event time so multiple events for the same dagAction can coexist. + Assert.assertEquals(expectedKey, DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams, false)); + Assert.assertEquals(expectedKey2, DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams2, false)); + } + + @Test + public void testCreateDagActionReminderKeyForDeadlineOmitsEventTime() { + // Deadline reminders are uniquely keyed by the dagAction's primary key alone; event time is stripped so the + // DELETE-event handler (which has no event time in its payload) can still find and unschedule the reminder. + Assert.assertEquals(expectedDeadlineKey, + DagActionReminderScheduler.createDagActionReminderKey(enforceFlowFinishDeadlineLeaseParams, true)); + // The same dagAction with a different event time must produce the same key for deadline reminders. + DagActionStore.LeaseParams alternateEventTime = + new DagActionStore.LeaseParams(enforceFlowFinishDeadlineDagAction, eventTimeMillis2); + Assert.assertEquals(expectedDeadlineKey, + DagActionReminderScheduler.createDagActionReminderKey(alternateEventTime, true)); } @Test @@ -170,15 +193,53 @@ public void testReminderJobExecuteFallsBackToUnknownWhenKeyAbsent() { } /* - Add deadline reminders for multiple launches of the same flow and assert no exception is thrown and they can be - deleted as well. + Schedule retry reminders for multiple distinct events of the same dagAction (the realistic multi-KILL/RESUME case + where lease event time differentiates the requests) and assert no exception is thrown and both can be deleted. */ @Test public void testRemindersForMultipleFlowExecutions() throws SchedulerException { - this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 50000, true); - this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams2, 50000, true); - this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams, true); - this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2, true); + this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 50000, false); + this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams2, 50000, false); + this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams, false); + this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2, false); + } + + /* + Verify deadline reminders are replaced (not double-registered) when the duplicate-insert path causes the same + dagAction to schedule a second time before the first reminder has been unscheduled. The pre-existing reminder is + silently removed; no ObjectAlreadyExistsException should propagate. + */ + @Test + public void testScheduleDeadlineReminderReplacesExistingEntry() throws SchedulerException { + JobKey deadlineKey = DagActionReminderScheduler.createJobKey(enforceFlowFinishDeadlineLeaseParams, true); + this.dagActionReminderScheduler.scheduleReminder(enforceFlowFinishDeadlineLeaseParams, 50000, true); + Assert.assertTrue(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey)); + + // Re-schedule with a different lease event time - this models the duplicate-insert + out-of-order events case. + DagActionStore.LeaseParams secondEvent = + new DagActionStore.LeaseParams(enforceFlowFinishDeadlineDagAction, eventTimeMillis2); + this.dagActionReminderScheduler.scheduleReminder(secondEvent, 50000, true); + Assert.assertTrue(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey)); + + this.dagActionReminderScheduler.unscheduleReminderJob(enforceFlowFinishDeadlineDagAction); + Assert.assertFalse(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey)); + } + + /* + Verify the new DagAction-only unschedule overload removes a deadline reminder when called with just the dagAction + (matching the DELETE-event path in DagManagementDagActionStoreChangeMonitor, which has no lease event time). + */ + @Test + public void testUnscheduleDeadlineReminderByDagAction() throws SchedulerException { + JobKey deadlineKey = DagActionReminderScheduler.createJobKey(enforceFlowFinishDeadlineLeaseParams, true); + this.dagActionReminderScheduler.scheduleReminder(enforceFlowFinishDeadlineLeaseParams, 50000, true); + Assert.assertTrue(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey)); + + this.dagActionReminderScheduler.unscheduleReminderJob(enforceFlowFinishDeadlineDagAction); + Assert.assertFalse(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey)); + + // Idempotent: a second unschedule for an already-fired/removed reminder must not throw. + this.dagActionReminderScheduler.unscheduleReminderJob(enforceFlowFinishDeadlineDagAction); } // Test multiple schedulers can co-exist and run their jobs of different types