Skip to content

Commit

Permalink
[GOBBLIN-1920] Use db laundered timestamp for reminder event (#3788)
Browse files Browse the repository at this point in the history
* Use db laundered timestamp for reminder event

* Add more details to comment and remove extra param

* remove unused param

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>
  • Loading branch information
umustafi and Urmi Mustafi committed Sep 25, 2023
1 parent a392ebd commit 028b85f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ protected Trigger createAndScheduleReminder(JobKey origJobKey, MultiActiveLeaseA
// refer to the same set of jobProperties)
String reminderSuffix = createSuffixForJobTrigger(status);
JobKey reminderJobKey = new JobKey(origJobKey.getName() + reminderSuffix, origJobKey.getGroup());
JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, reminderJobKey, status,
triggerEventTimeMillis);
JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, reminderJobKey, status);
Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey, getJobPropertiesFromJobDetail(jobDetail),
Optional.of(reminderSuffix));
log.debug("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting to schedule reminder for event {} with "
Expand Down Expand Up @@ -236,17 +235,16 @@ public static String createSuffixForJobTrigger(MultiActiveLeaseArbiter.LeasedToA
* @param originalKey
* @param reminderKey
* @param status
* @param triggerEventTimeMillis
* @return
* @throws SchedulerException
*/
protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey, JobKey reminderKey,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status, long triggerEventTimeMillis)
MultiActiveLeaseArbiter.LeasedToAnotherStatus status)
throws SchedulerException {
JobDetailImpl jobDetail = (JobDetailImpl) this.schedulerService.getScheduler().getJobDetail(originalKey);
jobDetail.setKey(reminderKey);
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap = updatePropsInJobDataMap(jobDataMap, status, triggerEventTimeMillis, schedulerMaxBackoffMillis);
jobDataMap = updatePropsInJobDataMap(jobDataMap, status, schedulerMaxBackoffMillis);
jobDetail.setJobDataMap(jobDataMap);
return jobDetail;
}
Expand All @@ -260,14 +258,12 @@ public static Properties getJobPropertiesFromJobDetail(JobDetail jobDetail) {
* provided returns the updated JobDataMap to the user
* @param jobDataMap
* @param leasedToAnotherStatus
* @param triggerEventTimeMillis
* @param schedulerMaxBackoffMillis
* @return
*/
@VisibleForTesting
public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus, long triggerEventTimeMillis,
int schedulerMaxBackoffMillis) {
MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus, int schedulerMaxBackoffMillis) {
Properties prevJobProps = (Properties) jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
// Add a small randomization to the minimum reminder wait time to avoid 'thundering herd' issue
long delayPeriodMillis = leasedToAnotherStatus.getMinimumLingerDurationMillis()
Expand All @@ -277,8 +273,11 @@ public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
// Saves the following properties in jobProps to retrieve when the trigger fires
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
// Use the db laundered timestamp for the reminder to ensure consensus between hosts. Participant trigger timestamps
// can differ between participants and be interpreted as a reminder for a distinct flow trigger which will cause
// excess flows to be triggered by the reminder functionality.
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
String.valueOf(triggerEventTimeMillis));
String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
// Update job data map and reset it in jobDetail
jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
return jobDataMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public void testUpdatePropsInJobDataMap() {
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, originalProperties);

JobDataMap newJobDataMap = FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
eventToTrigger, schedulerBackOffMillis);
schedulerBackOffMillis);
Properties newProperties = (Properties) newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertNotEquals("0",
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
Assert.assertEquals(String.valueOf(eventToTrigger),
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
}

Expand All @@ -85,11 +85,11 @@ public void testSetPropsInJobDataMap() {
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, originalProperties);

JobDataMap newJobDataMap = FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
eventToTrigger, schedulerBackOffMillis);
schedulerBackOffMillis);
Properties newProperties = (Properties) newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertTrue(newProperties.containsKey(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
Assert.assertEquals(String.valueOf(eventToTrigger),
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
}

Expand Down

0 comments on commit 028b85f

Please sign in to comment.