Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@
import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
import org.apache.gobblin.service.monitoring.OperationType;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;


/**
Expand Down Expand Up @@ -100,8 +102,7 @@ public void setupMockMonitor() {

@BeforeClass
public void setUp() throws Exception {
doNothing().when(dagActionReminderScheduler).unscheduleReminderJob(any(), anyBoolean());

doNothing().when(dagActionReminderScheduler).unscheduleReminderJob(any(DagActionStore.DagAction.class));
}

/**
Expand All @@ -114,12 +115,11 @@ public void testProcessMessageWithDelete() throws SchedulerException {
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, Long.parseLong(FLOW_EXECUTION_ID), JOB_NAME,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
/* TODO: skip deadline removal for now and let them fire
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(), times(1))
.unscheduleReminderJob(eq(dagAction), eq(true));
// DELETE of an ENFORCE_*_DEADLINE row must unschedule the corresponding deadline reminder. Retry reminders are
// intentionally not unscheduled here (their JobKey embeds the lease event time, which the DELETE event payload
// does not carry); orphaned retries fire harmlessly via the lease arbiter.
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(), times(1))
.unscheduleReminderJob(eq(dagAction), eq(false));
*/
.unscheduleReminderJob(eq(dagAction));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long remind
DagActionStore.DagAction dagAction = leaseParams.getDagAction();
JobDetail jobDetail = createReminderJobDetail(leaseParams, isDeadlineReminder);
Trigger trigger = createReminderJobTrigger(leaseParams, reminderDurationMillis, System::currentTimeMillis, isDeadlineReminder);
// Deadline reminders are keyed only by the dagAction's primary key (no event time). The DagActionStore enforces
// at most one row per that key, so any pre-existing deadline reminder with the same key represents a stale entry
// from the duplicate-insert path (delete+reinsert in DagProcUtils.sendEnforce*DeadlineDagAction). If the change
// events arrive out of order on the consumer, the new INSERT could otherwise collide with the not-yet-unscheduled
// old reminder and throw ObjectAlreadyExistsException, causing the new deadline to be lost. Replace semantics
// are safe here because the newly-scheduled reminder is authoritative for that dagAction tuple.
if (isDeadlineReminder && quartzScheduler.checkExists(jobDetail.getKey())) {
log.info("Replacing pre-existing deadline reminder for {}", dagAction);
quartzScheduler.deleteJob(jobDetail.getKey());
}
log.info("Setting reminder for {} in {} ms, isDeadlineTrigger: {}", dagAction, reminderDurationMillis, isDeadlineReminder);
quartzScheduler.scheduleJob(jobDetail, trigger);
}
Expand All @@ -109,16 +119,44 @@ public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams, boolea
}

/**
* Creates a key for the reminder job by concatenating all dagAction fields and the eventTime of the dagAction.
* Unschedule a deadline reminder identified solely by its {@link DagActionStore.DagAction}. The deadline reminder
* key intentionally omits the lease event time (see {@link #createDagActionReminderKey}), so callers that only
* have the dagAction (e.g. a DagActionStore DELETE event, whose payload has no event time) can still remove the
* reminder.
*/
public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws SchedulerException {
log.info("Unsetting deadline reminder for {}", dagAction);
if (!quartzScheduler.deleteJob(new JobKey(createDeadlineReminderKey(dagAction), DeadlineReminderKeyGroup))) {
// Expected on the normal lifecycle: the reminder fires at the deadline, the enforce-deadline task cleans up
// via deleteDagAction, and by the time the resulting DELETE event reaches this handler the Quartz job is
// already gone. Also benign on cold-start / partition-rebalance, where the reminder was scheduled on a
// different host. Logged at debug to avoid flooding warn-level operator views.
log.debug("No deadline reminder to unschedule for {} (reminder already fired, or scheduled on a different host).",
dagAction);
}
}

/**
* Creates a key for the reminder job by concatenating all dagAction fields and, for retry reminders only, the
* lease event time of the dagAction.
* <p>
* This ensures unique keys for multiple instances of the same action on the same flow execution that originate more
* than 'epsilon' apart. {@link MultiActiveLeaseArbiter} uses the eventTime to distinguish these distinct occurrences
* of the same action. This is necessary to prevent insertion failures due to previous reminders.
* For retry reminders the event time is required to ensure unique keys for multiple instances of the same action
* on the same flow execution that originate more than 'epsilon' apart (applicable to KILL and RESUME).
* {@link MultiActiveLeaseArbiter} uses the event time to distinguish these distinct occurrences. Without it,
* subsequent reminders for the same action would fail to insert because the Quartz key already exists.
* <p>
* Applicable only for KILL and RESUME actions; duplication for other actions is an error.
* For deadline reminders ({@code ENFORCE_JOB_START_DEADLINE}, {@code ENFORCE_FLOW_FINISH_DEADLINE}) the event time
* is omitted. The DagActionStore enforces a primary key over
* {@code (flowGroup, flowName, flowExecutionId, jobName, dagActionType)} (see
* {@code DagProcUtils.sendEnforce*DeadlineDagAction}), so at most one deadline reminder exists for that tuple at
* any time. Omitting the event time lets the change monitor unschedule the reminder when the dagAction row is
* deleted, without needing the original event time (which is not present in the DELETE event payload).
*/
public static String createDagActionReminderKey(DagActionStore.LeaseParams leaseParams) {
public static String createDagActionReminderKey(DagActionStore.LeaseParams leaseParams, boolean isDeadlineReminder) {
DagActionStore.DagAction dagAction = leaseParams.getDagAction();
if (isDeadlineReminder) {
return createDeadlineReminderKey(dagAction);
}
return String.join(".",
dagAction.getFlowGroup(),
dagAction.getFlowName(),
Expand All @@ -128,16 +166,27 @@ public static String createDagActionReminderKey(DagActionStore.LeaseParams lease
String.valueOf(leaseParams.getEventTimeMillis()));
}

private static String createDeadlineReminderKey(DagActionStore.DagAction dagAction) {
return String.join(".",
dagAction.getFlowGroup(),
dagAction.getFlowName(),
String.valueOf(dagAction.getFlowExecutionId()),
dagAction.getJobName(),
String.valueOf(dagAction.getDagActionType()));
}

/**
* Creates a JobKey object for the reminder job where the name is the DagActionReminderKey from above and the group is
* the flowGroup
*/
public static JobKey createJobKey(DagActionStore.LeaseParams leaseParams, boolean isDeadlineReminder) {
return new JobKey(createDagActionReminderKey(leaseParams), isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
return new JobKey(createDagActionReminderKey(leaseParams, isDeadlineReminder),
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
}

private static TriggerKey createTriggerKey(DagActionStore.LeaseParams leaseParams, boolean isDeadlineReminder) {
return new TriggerKey(createDagActionReminderKey(leaseParams), isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
return new TriggerKey(createDagActionReminderKey(leaseParams, isDeadlineReminder),
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ protected void handleDagAction(String operation, DagActionStore.DagAction dagAct
break;
case "DELETE":
log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
/* TODO: skip deadline removal for now and let them fire
// Cancel any pending deadline reminder so it does not fire for an action that has already been resolved.
// Retry reminders are not unscheduled here because their JobKey includes the lease event time, which is
// not carried in the DELETE event payload; orphaned retry reminders fire harmlessly (the lease arbiter
// sees the action is gone and drops the work).
if (dagActionType == DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE
|| dagActionType == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, true);
// clear any deadline reminders as well as any retry reminders
this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, false);
this.dagActionReminderScheduler.unscheduleReminderJob(dagAction);
}
*/
break;
default:
log.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
Expand Down Expand Up @@ -61,10 +62,18 @@ public class DagActionReminderSchedulerTest {
DagActionStore.DagActionType.LAUNCH.name(), eventTimeMillis);
String expectedKey2 = Joiner.on(".").join(flowGroup, flowName, flowExecutionId, jobName,
DagActionStore.DagActionType.LAUNCH.name(), eventTimeMillis2);
// Deadline reminder keys intentionally omit the lease event time, since the DagActionStore primary key already
// guarantees uniqueness over (flowGroup, flowName, flowExecutionId, jobName, dagActionType) for ENFORCE_*_DEADLINE.
String expectedDeadlineKey = Joiner.on(".").join(flowGroup, flowName, flowExecutionId, jobName,
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE.name());
DagActionStore.DagAction launchDagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
DagActionStore.DagActionType.LAUNCH);
DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
DagActionStore.LeaseParams launchLeaseParams = new DagActionStore.LeaseParams(launchDagAction, eventTimeMillis);
DagActionStore.LeaseParams launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction, eventTimeMillis2);
DagActionStore.LeaseParams enforceFlowFinishDeadlineLeaseParams =
new DagActionStore.LeaseParams(enforceFlowFinishDeadlineDagAction, eventTimeMillis);
DagActionReminderScheduler dagActionReminderScheduler;
DagManagement dagManagement = mock(DagManagement.class);
private static boolean testJobRan = false;
Expand All @@ -77,8 +86,22 @@ private void setup() throws Exception {

@Test
public void testCreateDagActionReminderKey() {
Assert.assertEquals(expectedKey, DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams));
Assert.assertEquals(expectedKey2, DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams2));
// Retry reminders embed the lease event time so multiple events for the same dagAction can coexist.
Assert.assertEquals(expectedKey, DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams, false));
Assert.assertEquals(expectedKey2, DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams2, false));
}

@Test
public void testCreateDagActionReminderKeyForDeadlineOmitsEventTime() {
// Deadline reminders are uniquely keyed by the dagAction's primary key alone; event time is stripped so the
// DELETE-event handler (which has no event time in its payload) can still find and unschedule the reminder.
Assert.assertEquals(expectedDeadlineKey,
DagActionReminderScheduler.createDagActionReminderKey(enforceFlowFinishDeadlineLeaseParams, true));
// The same dagAction with a different event time must produce the same key for deadline reminders.
DagActionStore.LeaseParams alternateEventTime =
new DagActionStore.LeaseParams(enforceFlowFinishDeadlineDagAction, eventTimeMillis2);
Assert.assertEquals(expectedDeadlineKey,
DagActionReminderScheduler.createDagActionReminderKey(alternateEventTime, true));
}

@Test
Expand Down Expand Up @@ -170,15 +193,53 @@ public void testReminderJobExecuteFallsBackToUnknownWhenKeyAbsent() {
}

/*
Add deadline reminders for multiple launches of the same flow and assert no exception is thrown and they can be
deleted as well.
Schedule retry reminders for multiple distinct events of the same dagAction (the realistic multi-KILL/RESUME case
where lease event time differentiates the requests) and assert no exception is thrown and both can be deleted.
*/
@Test
public void testRemindersForMultipleFlowExecutions() throws SchedulerException {
this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 50000, true);
this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams2, 50000, true);
this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams, true);
this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2, true);
this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 50000, false);
this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams2, 50000, false);
this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams, false);
this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2, false);
}

/*
Verify deadline reminders are replaced (not double-registered) when the duplicate-insert path causes the same
dagAction to schedule a second time before the first reminder has been unscheduled. The pre-existing reminder is
silently removed; no ObjectAlreadyExistsException should propagate.
*/
@Test
public void testScheduleDeadlineReminderReplacesExistingEntry() throws SchedulerException {
JobKey deadlineKey = DagActionReminderScheduler.createJobKey(enforceFlowFinishDeadlineLeaseParams, true);
this.dagActionReminderScheduler.scheduleReminder(enforceFlowFinishDeadlineLeaseParams, 50000, true);
Assert.assertTrue(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey));

// Re-schedule with a different lease event time - this models the duplicate-insert + out-of-order events case.
DagActionStore.LeaseParams secondEvent =
new DagActionStore.LeaseParams(enforceFlowFinishDeadlineDagAction, eventTimeMillis2);
this.dagActionReminderScheduler.scheduleReminder(secondEvent, 50000, true);
Assert.assertTrue(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey));

this.dagActionReminderScheduler.unscheduleReminderJob(enforceFlowFinishDeadlineDagAction);
Assert.assertFalse(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey));
}

/*
Verify the new DagAction-only unschedule overload removes a deadline reminder when called with just the dagAction
(matching the DELETE-event path in DagManagementDagActionStoreChangeMonitor, which has no lease event time).
*/
@Test
public void testUnscheduleDeadlineReminderByDagAction() throws SchedulerException {
JobKey deadlineKey = DagActionReminderScheduler.createJobKey(enforceFlowFinishDeadlineLeaseParams, true);
this.dagActionReminderScheduler.scheduleReminder(enforceFlowFinishDeadlineLeaseParams, 50000, true);
Assert.assertTrue(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey));

this.dagActionReminderScheduler.unscheduleReminderJob(enforceFlowFinishDeadlineDagAction);
Assert.assertFalse(this.dagActionReminderScheduler.quartzScheduler.checkExists(deadlineKey));

// Idempotent: a second unschedule for an already-fired/removed reminder must not throw.
this.dagActionReminderScheduler.unscheduleReminderJob(enforceFlowFinishDeadlineDagAction);
}

// Test multiple schedulers can co-exist and run their jobs of different types
Expand Down
Loading