diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/ScheduleType.java b/SingularityBase/src/main/java/com/hubspot/singularity/ScheduleType.java index 1adad9b3ee..4eb950c995 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/ScheduleType.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/ScheduleType.java @@ -2,6 +2,6 @@ public enum ScheduleType { - CRON, QUARTZ; + CRON, QUARTZ, RFC5545; } diff --git a/SingularityService/pom.xml b/SingularityService/pom.xml index 25a7886bed..e0306b3194 100644 --- a/SingularityService/pom.xml +++ b/SingularityService/pom.xml @@ -374,6 +374,21 @@ api-all + + org.dmfs + lib-recur + + + + org.dmfs + rfc5545-datetime + + + + joda-time + joda-time + + diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/SingularityValidator.java b/SingularityService/src/main/java/com/hubspot/singularity/data/SingularityValidator.java index 0753365e94..acf29a04e4 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/SingularityValidator.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/SingularityValidator.java @@ -1,6 +1,7 @@ package com.hubspot.singularity.data; import static com.google.common.base.Preconditions.checkNotNull; +import static com.hubspot.singularity.WebExceptions.badRequest; import static com.hubspot.singularity.WebExceptions.checkBadRequest; import java.net.URI; @@ -14,7 +15,8 @@ import javax.inject.Singleton; import org.apache.commons.lang3.StringUtils; - +import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException; +import org.dmfs.rfc5545.recur.RecurrenceRule; import org.quartz.CronExpression; import com.google.common.base.Joiner; @@ -136,22 +138,26 @@ public SingularityRequest checkSingularityRequest(SingularityRequest request, Op if (request.isScheduled()) { checkBadRequest(request.getQuartzSchedule().isPresent() || request.getSchedule().isPresent(), "Specify at least one of schedule or quartzSchedule"); - final String originalSchedule = request.getQuartzScheduleSafe(); + String originalSchedule = request.getQuartzScheduleSafe(); - if (request.getQuartzSchedule().isPresent() && !request.getSchedule().isPresent()) { - checkBadRequest(request.getScheduleType().or(ScheduleType.QUARTZ) == ScheduleType.QUARTZ, "If using quartzSchedule specify scheduleType QUARTZ or leave it blank"); - } + if (request.getScheduleType().or(ScheduleType.QUARTZ) != ScheduleType.RFC5545) { + if (request.getQuartzSchedule().isPresent() && !request.getSchedule().isPresent()) { + checkBadRequest(request.getScheduleType().or(ScheduleType.QUARTZ) == ScheduleType.QUARTZ, "If using quartzSchedule specify scheduleType QUARTZ or leave it blank"); + } - if (request.getQuartzSchedule().isPresent() || (request.getScheduleType().isPresent() && request.getScheduleType().get() == ScheduleType.QUARTZ)) { - quartzSchedule = originalSchedule; - } else { - checkBadRequest(request.getScheduleType().or(ScheduleType.CRON) == ScheduleType.CRON, "If not using quartzSchedule specify scheduleType CRON or leave it blank"); - checkBadRequest(!request.getQuartzSchedule().isPresent(), "If using schedule type CRON do not specify quartzSchedule"); + if (request.getQuartzSchedule().isPresent() || (request.getScheduleType().isPresent() && request.getScheduleType().get() == ScheduleType.QUARTZ)) { + quartzSchedule = originalSchedule; + } else { + checkBadRequest(request.getScheduleType().or(ScheduleType.CRON) == ScheduleType.CRON, "If not using quartzSchedule specify scheduleType CRON or leave it blank"); + checkBadRequest(!request.getQuartzSchedule().isPresent(), "If using schedule type CRON do not specify quartzSchedule"); - quartzSchedule = getQuartzScheduleFromCronSchedule(originalSchedule); - } + quartzSchedule = getQuartzScheduleFromCronSchedule(originalSchedule); + } - checkBadRequest(isValidCronSchedule(quartzSchedule), "Schedule %s (from: %s) was not valid", quartzSchedule, originalSchedule); + checkBadRequest(isValidCronSchedule(quartzSchedule), "Schedule %s (from: %s) was not valid", quartzSchedule, originalSchedule); + } else { + checkForValidRFC5545Schedule(request.getSchedule().get()); + } } else { checkBadRequest(!request.getQuartzSchedule().isPresent() && !request.getSchedule().isPresent(), "Non-scheduled requests can not specify a schedule"); checkBadRequest(!request.getScheduleType().isPresent(), "ScheduleType can only be set for scheduled requests"); @@ -174,6 +180,14 @@ public SingularityRequest checkSingularityRequest(SingularityRequest request, Op return request.toBuilder().setQuartzSchedule(Optional.fromNullable(quartzSchedule)).build(); } + private void checkForValidRFC5545Schedule(String schedule) { + try { + new RecurrenceRule(schedule); + } catch (InvalidRecurrenceRuleException ex) { + badRequest("Schedule %s was not a valid RFC5545 schedule, error was: %s", schedule, ex); + } + } + public SingularityWebhook checkSingularityWebhook(SingularityWebhook webhook) { checkNotNull(webhook, "Webhook is null"); checkNotNull(webhook.getUri(), "URI is null"); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/helpers/RFC5545Schedule.java b/SingularityService/src/main/java/com/hubspot/singularity/helpers/RFC5545Schedule.java new file mode 100644 index 0000000000..6a0e305658 --- /dev/null +++ b/SingularityService/src/main/java/com/hubspot/singularity/helpers/RFC5545Schedule.java @@ -0,0 +1,68 @@ +package com.hubspot.singularity.helpers; + +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException; +import org.dmfs.rfc5545.recur.RecurrenceRule; +import org.dmfs.rfc5545.DateTime; +import org.dmfs.rfc5545.recur.RecurrenceRule.Part; +import org.dmfs.rfc5545.recur.RecurrenceRuleIterator; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +public class RFC5545Schedule { + public static final int MAX_ITERATIONS = 10000; + private final RecurrenceRule recurrenceRule; + private final org.joda.time.DateTime dtStart; + + public RFC5545Schedule(String schedule) throws InvalidRecurrenceRuleException { + // DTSTART is RFC5545 but NOT in the recur string, but its a nice to have? :) + Pattern pattern = Pattern.compile("DTSTART=([0-9]{8}T[0-9]{6})"); + Matcher matcher = pattern.matcher(schedule); + + if (matcher.find()) { + DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMdd'T'HHmmss"); + if (schedule.contains("REPEAT") || schedule.contains("COUNT")) { + this.dtStart = formatter.parseDateTime(matcher.group(1)); + } else { + org.joda.time.DateTime start = formatter.parseDateTime(matcher.group(1)); + org.joda.time.DateTime now = org.joda.time.DateTime.now().withSecondOfMinute(0); + if (now.getMillis() > start.getMillis()) { + start = now; + } + this.dtStart = start; + } + this.recurrenceRule = new RecurrenceRule(matcher.replaceAll("").replace("RRULE:", "")); + } else { + this.recurrenceRule = new RecurrenceRule(schedule); + this.dtStart = org.joda.time.DateTime.now().withSecondOfMinute(0); + } + } + + public org.joda.time.DateTime getStartDateTime() { + return dtStart; + } + + public Date getNextValidTime() { + final long now = System.currentTimeMillis(); + DateTime startDateTime = new DateTime(dtStart.getYear(), (dtStart.getMonthOfYear() - 1), dtStart.getDayOfMonth(), + dtStart.getHourOfDay(), dtStart.getMinuteOfHour(), dtStart.getSecondOfMinute()); + RecurrenceRuleIterator timeIterator = recurrenceRule.iterator(startDateTime); + + int count = 0; + while (timeIterator.hasNext() && (count < MAX_ITERATIONS || (recurrenceRule.hasPart(Part.COUNT) && count < recurrenceRule.getCount()))) { + count ++; + long nextRunAtTimestamp = timeIterator.nextMillis(); + if (nextRunAtTimestamp >= now) { + return new Date(nextRunAtTimestamp); + } + } + return null; + } + + public RecurrenceRule getRecurrenceRule() { + return recurrenceRule; + } +} diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduledJobPoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduledJobPoller.java index f5b7646d15..a3d5ee60fc 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduledJobPoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduledJobPoller.java @@ -9,6 +9,7 @@ import javax.inject.Singleton; +import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException; import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +19,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.inject.Inject; +import com.hubspot.singularity.ScheduleType; import com.hubspot.singularity.SingularityDeployStatistics; import com.hubspot.singularity.SingularityRequestWithState; import com.hubspot.singularity.SingularityTaskId; @@ -25,6 +27,7 @@ import com.hubspot.singularity.data.DeployManager; import com.hubspot.singularity.data.RequestManager; import com.hubspot.singularity.data.TaskManager; +import com.hubspot.singularity.helpers.RFC5545Schedule; import com.hubspot.singularity.sentry.SingularityExceptionNotifier; import com.hubspot.singularity.smtp.SingularityMailer; @@ -110,23 +113,29 @@ private Optional getExpectedRuntime(SingularityRequestWithState request, S return deployStatistics.get().getAverageRuntimeMillis(); } - final CronExpression cronExpression; + String scheduleExpression = request.getRequest().getScheduleTypeSafe() == ScheduleType.RFC5545 ? request.getRequest().getSchedule().get() : request.getRequest().getQuartzScheduleSafe(); + Date nextRunAtDate; try { - cronExpression = new CronExpression(request.getRequest().getQuartzScheduleSafe()); - } catch (ParseException e) { - LOG.warn("Unable to parse cron for {} ({})", taskId, request.getRequest().getQuartzScheduleSafe(), e); - exceptionNotifier.notify(e, ImmutableMap.of("taskId", taskId.toString())); - return Optional.absent(); - } - - final Date startDate = new Date(taskId.getStartedAt()); - final Date nextRunAtDate = cronExpression.getNextValidTimeAfter(startDate); - - if (nextRunAtDate == null) { - String msg = String.format("No next run date found for %s (%s)", taskId, request.getRequest().getQuartzScheduleSafe()); - LOG.warn(msg); - exceptionNotifier.notify(msg, ImmutableMap.of("taskId", taskId.toString())); + if (request.getRequest().getScheduleTypeSafe() == ScheduleType.RFC5545) { + final RFC5545Schedule rfc5545Schedule = new RFC5545Schedule(scheduleExpression); + nextRunAtDate = rfc5545Schedule.getNextValidTime(); + } else { + final CronExpression cronExpression = new CronExpression(scheduleExpression); + final Date startDate = new Date(taskId.getStartedAt()); + nextRunAtDate = cronExpression.getNextValidTimeAfter(startDate); + } + + if (nextRunAtDate == null) { + String msg = String.format("No next run date found for %s (%s)", taskId, scheduleExpression); + LOG.warn(msg); + exceptionNotifier.notify(msg, ImmutableMap.of("taskId", taskId.toString())); + return Optional.absent(); + } + + } catch (ParseException|InvalidRecurrenceRuleException e) { + LOG.warn("Unable to parse schedule of type {} for expression {} (taskId: {}, err: {})", request.getRequest().getScheduleTypeSafe(), scheduleExpression, taskId, e); + exceptionNotifier.notify(e, ImmutableMap.of("taskId", taskId.toString(), "scheduleExpression", scheduleExpression, "scheduleType", request.getRequest().getScheduleTypeSafe().toString())); return Optional.absent(); } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduler.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduler.java index 9e37f1d351..d193c6a620 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduler.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduler.java @@ -14,6 +14,7 @@ import org.apache.mesos.Protos; import org.apache.mesos.Protos.TaskStatus.Reason; +import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException; import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,7 @@ import com.hubspot.singularity.MachineState; import com.hubspot.singularity.RequestState; import com.hubspot.singularity.RequestType; +import com.hubspot.singularity.ScheduleType; import com.hubspot.singularity.SingularityCreateResult; import com.hubspot.singularity.SingularityDeployMarker; import com.hubspot.singularity.SingularityDeployStatistics; @@ -65,6 +67,7 @@ import com.hubspot.singularity.data.SlaveManager; import com.hubspot.singularity.data.TaskManager; import com.hubspot.singularity.data.TaskRequestManager; +import com.hubspot.singularity.helpers.RFC5545Schedule; import com.hubspot.singularity.smtp.SingularityMailer; @Singleton @@ -672,10 +675,18 @@ private Optional getNextRunAt(SingularityRequest request, RequestState sta LOG.info("Scheduling requested immediate run of {}", request.getId()); } else { try { - final CronExpression cronExpression = new CronExpression(request.getQuartzScheduleSafe()); - - final Date scheduleFrom = new Date(now); - final Date nextRunAtDate = cronExpression.getNextValidTimeAfter(scheduleFrom); + Date nextRunAtDate = null; + Date scheduleFrom = null; + + if (request.getScheduleTypeSafe() == ScheduleType.RFC5545) { + final RFC5545Schedule rfc5545Schedule = new RFC5545Schedule(request.getSchedule().get()); + nextRunAtDate = rfc5545Schedule.getNextValidTime(); + scheduleFrom = new Date(rfc5545Schedule.getStartDateTime().getMillis()); + } else { + scheduleFrom = new Date(now); + final CronExpression cronExpression = new CronExpression(request.getQuartzScheduleSafe()); + nextRunAtDate = cronExpression.getNextValidTimeAfter(scheduleFrom); + } if (nextRunAtDate == null) { return Optional.absent(); @@ -686,7 +697,7 @@ private Optional getNextRunAt(SingularityRequest request, RequestState sta nextRunAt = Math.max(nextRunAtDate.getTime(), now); // don't create a schedule that is overdue as this is used to indicate that singularity is not fulfilling requests. LOG.trace("Scheduling next run of {} (schedule: {}) at {} (from: {})", request.getId(), request.getSchedule(), nextRunAtDate, scheduleFrom); - } catch (ParseException pe) { + } catch (ParseException | InvalidRecurrenceRuleException pe) { throw Throwables.propagate(pe); } } diff --git a/SingularityService/src/test/java/com/hubspot/singularity/helpers/RFC5545ScheduleTest.java b/SingularityService/src/test/java/com/hubspot/singularity/helpers/RFC5545ScheduleTest.java new file mode 100644 index 0000000000..f7e59efefa --- /dev/null +++ b/SingularityService/src/test/java/com/hubspot/singularity/helpers/RFC5545ScheduleTest.java @@ -0,0 +1,31 @@ +package com.hubspot.singularity.helpers; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.dmfs.rfc5545.DateTime; +import org.junit.Assert; +import org.junit.Test; + +public class RFC5545ScheduleTest { + @Test + public void testEveryWeekdayRFC5545Schdule() throws Exception { + String schedule = "FREQ=DAILY;BYDAY=MO,TU,WE,TH,FR"; + Date nextValidTime = new RFC5545Schedule(schedule).getNextValidTime(); + Assert.assertTrue(nextValidTime.after(new Date())); + Assert.assertTrue(nextValidTime.before(new Date(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3)))); + } + + @Test + public void testMaxIterations() throws Exception { + String schedule = "DTSTART=19970902T090000\nRRULE:FREQ=HOURLY;COUNT=10001"; + Assert.assertEquals(new RFC5545Schedule(schedule).getStartDateTime(), new org.joda.time.DateTime(1997, 9, 2, 9, 0, 0)); + Assert.assertEquals(new RFC5545Schedule(schedule).getNextValidTime(), null); + } + + @Test + public void testRecurInPastDoesNotGiveNext() throws Exception { + String schedule = "FREQ=YEARLY;INTERVAL=4;BYMONTH=11;BYDAY=TU;BYMONTHDAY=2,3,4,5,6,7,8;COUNT=1"; + Assert.assertEquals(new RFC5545Schedule(schedule).getNextValidTime(), null); + } +} diff --git a/pom.xml b/pom.xml index bb5e43e836..c55ac50a10 100644 --- a/pom.xml +++ b/pom.xml @@ -226,6 +226,18 @@ metrics-graphite ${dep.metrics.version} + + + org.dmfs + lib-recur + 0.9.4 + + + + org.dmfs + rfc5545-datetime + 0.2.2 +