Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

public enum ScheduleType {

CRON, QUARTZ;
CRON, QUARTZ, RFC5545;

}
15 changes: 15 additions & 0 deletions SingularityService/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,21 @@
<artifactId>api-all</artifactId>
</dependency>

<dependency>
<groupId>org.dmfs</groupId>
<artifactId>lib-recur</artifactId>
</dependency>

<dependency>
<groupId>org.dmfs</groupId>
<artifactId>rfc5545-datetime</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.hubspot.singularity.data;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.hubspot.singularity.WebExceptions.badRequest;
import static com.hubspot.singularity.WebExceptions.checkBadRequest;

import java.net.URI;
Expand All @@ -14,7 +15,8 @@
import javax.inject.Singleton;

import org.apache.commons.lang3.StringUtils;

import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException;
import org.dmfs.rfc5545.recur.RecurrenceRule;
import org.quartz.CronExpression;

import com.google.common.base.Joiner;
Expand Down Expand Up @@ -136,22 +138,26 @@ public SingularityRequest checkSingularityRequest(SingularityRequest request, Op
if (request.isScheduled()) {
checkBadRequest(request.getQuartzSchedule().isPresent() || request.getSchedule().isPresent(), "Specify at least one of schedule or quartzSchedule");

final String originalSchedule = request.getQuartzScheduleSafe();
String originalSchedule = request.getQuartzScheduleSafe();

if (request.getQuartzSchedule().isPresent() && !request.getSchedule().isPresent()) {
checkBadRequest(request.getScheduleType().or(ScheduleType.QUARTZ) == ScheduleType.QUARTZ, "If using quartzSchedule specify scheduleType QUARTZ or leave it blank");
}
if (request.getScheduleType().or(ScheduleType.QUARTZ) != ScheduleType.RFC5545) {
if (request.getQuartzSchedule().isPresent() && !request.getSchedule().isPresent()) {
checkBadRequest(request.getScheduleType().or(ScheduleType.QUARTZ) == ScheduleType.QUARTZ, "If using quartzSchedule specify scheduleType QUARTZ or leave it blank");
}

if (request.getQuartzSchedule().isPresent() || (request.getScheduleType().isPresent() && request.getScheduleType().get() == ScheduleType.QUARTZ)) {
quartzSchedule = originalSchedule;
} else {
checkBadRequest(request.getScheduleType().or(ScheduleType.CRON) == ScheduleType.CRON, "If not using quartzSchedule specify scheduleType CRON or leave it blank");
checkBadRequest(!request.getQuartzSchedule().isPresent(), "If using schedule type CRON do not specify quartzSchedule");
if (request.getQuartzSchedule().isPresent() || (request.getScheduleType().isPresent() && request.getScheduleType().get() == ScheduleType.QUARTZ)) {
quartzSchedule = originalSchedule;
} else {
checkBadRequest(request.getScheduleType().or(ScheduleType.CRON) == ScheduleType.CRON, "If not using quartzSchedule specify scheduleType CRON or leave it blank");
checkBadRequest(!request.getQuartzSchedule().isPresent(), "If using schedule type CRON do not specify quartzSchedule");

quartzSchedule = getQuartzScheduleFromCronSchedule(originalSchedule);
}
quartzSchedule = getQuartzScheduleFromCronSchedule(originalSchedule);
}

checkBadRequest(isValidCronSchedule(quartzSchedule), "Schedule %s (from: %s) was not valid", quartzSchedule, originalSchedule);
checkBadRequest(isValidCronSchedule(quartzSchedule), "Schedule %s (from: %s) was not valid", quartzSchedule, originalSchedule);
} else {
checkForValidRFC5545Schedule(request.getSchedule().get());
}
} else {
checkBadRequest(!request.getQuartzSchedule().isPresent() && !request.getSchedule().isPresent(), "Non-scheduled requests can not specify a schedule");
checkBadRequest(!request.getScheduleType().isPresent(), "ScheduleType can only be set for scheduled requests");
Expand All @@ -174,6 +180,14 @@ public SingularityRequest checkSingularityRequest(SingularityRequest request, Op
return request.toBuilder().setQuartzSchedule(Optional.fromNullable(quartzSchedule)).build();
}

private void checkForValidRFC5545Schedule(String schedule) {
try {
new RecurrenceRule(schedule);
} catch (InvalidRecurrenceRuleException ex) {
badRequest("Schedule %s was not a valid RFC5545 schedule, error was: %s", schedule, ex);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could simplify this by just calling badRequest() in the catch block

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


public SingularityWebhook checkSingularityWebhook(SingularityWebhook webhook) {
checkNotNull(webhook, "Webhook is null");
checkNotNull(webhook.getUri(), "URI is null");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.hubspot.singularity.helpers;

import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException;
import org.dmfs.rfc5545.recur.RecurrenceRule;
import org.dmfs.rfc5545.DateTime;
import org.dmfs.rfc5545.recur.RecurrenceRule.Part;
import org.dmfs.rfc5545.recur.RecurrenceRuleIterator;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class RFC5545Schedule {
public static final int MAX_ITERATIONS = 10000;
private final RecurrenceRule recurrenceRule;
private final org.joda.time.DateTime dtStart;

public RFC5545Schedule(String schedule) throws InvalidRecurrenceRuleException {
// DTSTART is RFC5545 but NOT in the recur string, but its a nice to have? :)
Pattern pattern = Pattern.compile("DTSTART=([0-9]{8}T[0-9]{6})");
Matcher matcher = pattern.matcher(schedule);

if (matcher.find()) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMdd'T'HHmmss");
if (schedule.contains("REPEAT") || schedule.contains("COUNT")) {
this.dtStart = formatter.parseDateTime(matcher.group(1));
} else {
org.joda.time.DateTime start = formatter.parseDateTime(matcher.group(1));
org.joda.time.DateTime now = org.joda.time.DateTime.now().withSecondOfMinute(0);
if (now.getMillis() > start.getMillis()) {
start = now;
}
this.dtStart = start;
}
this.recurrenceRule = new RecurrenceRule(matcher.replaceAll("").replace("RRULE:", ""));
} else {
this.recurrenceRule = new RecurrenceRule(schedule);
this.dtStart = org.joda.time.DateTime.now().withSecondOfMinute(0);
}
}

public org.joda.time.DateTime getStartDateTime() {
return dtStart;
}

public Date getNextValidTime() {
final long now = System.currentTimeMillis();
DateTime startDateTime = new DateTime(dtStart.getYear(), (dtStart.getMonthOfYear() - 1), dtStart.getDayOfMonth(),
dtStart.getHourOfDay(), dtStart.getMinuteOfHour(), dtStart.getSecondOfMinute());
RecurrenceRuleIterator timeIterator = recurrenceRule.iterator(startDateTime);

int count = 0;
while (timeIterator.hasNext() && (count < MAX_ITERATIONS || (recurrenceRule.hasPart(Part.COUNT) && count < recurrenceRule.getCount()))) {
count ++;
long nextRunAtTimestamp = timeIterator.nextMillis();
if (nextRunAtTimestamp >= now) {
return new Date(nextRunAtTimestamp);
}
}
return null;
}

public RecurrenceRule getRecurrenceRule() {
return recurrenceRule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import javax.inject.Singleton;

import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,13 +19,15 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.hubspot.singularity.ScheduleType;
import com.hubspot.singularity.SingularityDeployStatistics;
import com.hubspot.singularity.SingularityRequestWithState;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.helpers.RFC5545Schedule;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;
import com.hubspot.singularity.smtp.SingularityMailer;

Expand Down Expand Up @@ -110,23 +113,29 @@ private Optional<Long> getExpectedRuntime(SingularityRequestWithState request, S
return deployStatistics.get().getAverageRuntimeMillis();
}

final CronExpression cronExpression;
String scheduleExpression = request.getRequest().getScheduleTypeSafe() == ScheduleType.RFC5545 ? request.getRequest().getSchedule().get() : request.getRequest().getQuartzScheduleSafe();
Date nextRunAtDate;

try {
cronExpression = new CronExpression(request.getRequest().getQuartzScheduleSafe());
} catch (ParseException e) {
LOG.warn("Unable to parse cron for {} ({})", taskId, request.getRequest().getQuartzScheduleSafe(), e);
exceptionNotifier.notify(e, ImmutableMap.of("taskId", taskId.toString()));
return Optional.absent();
}

final Date startDate = new Date(taskId.getStartedAt());
final Date nextRunAtDate = cronExpression.getNextValidTimeAfter(startDate);

if (nextRunAtDate == null) {
String msg = String.format("No next run date found for %s (%s)", taskId, request.getRequest().getQuartzScheduleSafe());
LOG.warn(msg);
exceptionNotifier.notify(msg, ImmutableMap.of("taskId", taskId.toString()));
if (request.getRequest().getScheduleTypeSafe() == ScheduleType.RFC5545) {
final RFC5545Schedule rfc5545Schedule = new RFC5545Schedule(scheduleExpression);
nextRunAtDate = rfc5545Schedule.getNextValidTime();
} else {
final CronExpression cronExpression = new CronExpression(scheduleExpression);
final Date startDate = new Date(taskId.getStartedAt());
nextRunAtDate = cronExpression.getNextValidTimeAfter(startDate);
}

if (nextRunAtDate == null) {
String msg = String.format("No next run date found for %s (%s)", taskId, scheduleExpression);
LOG.warn(msg);
exceptionNotifier.notify(msg, ImmutableMap.of("taskId", taskId.toString()));
return Optional.absent();
}

} catch (ParseException|InvalidRecurrenceRuleException e) {
LOG.warn("Unable to parse schedule of type {} for expression {} (taskId: {}, err: {})", request.getRequest().getScheduleTypeSafe(), scheduleExpression, taskId, e);
exceptionNotifier.notify(e, ImmutableMap.of("taskId", taskId.toString(), "scheduleExpression", scheduleExpression, "scheduleType", request.getRequest().getScheduleTypeSafe().toString()));
return Optional.absent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskStatus.Reason;
import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +36,7 @@
import com.hubspot.singularity.MachineState;
import com.hubspot.singularity.RequestState;
import com.hubspot.singularity.RequestType;
import com.hubspot.singularity.ScheduleType;
import com.hubspot.singularity.SingularityCreateResult;
import com.hubspot.singularity.SingularityDeployMarker;
import com.hubspot.singularity.SingularityDeployStatistics;
Expand Down Expand Up @@ -65,6 +67,7 @@
import com.hubspot.singularity.data.SlaveManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.TaskRequestManager;
import com.hubspot.singularity.helpers.RFC5545Schedule;
import com.hubspot.singularity.smtp.SingularityMailer;

@Singleton
Expand Down Expand Up @@ -672,10 +675,18 @@ private Optional<Long> getNextRunAt(SingularityRequest request, RequestState sta
LOG.info("Scheduling requested immediate run of {}", request.getId());
} else {
try {
final CronExpression cronExpression = new CronExpression(request.getQuartzScheduleSafe());

final Date scheduleFrom = new Date(now);
final Date nextRunAtDate = cronExpression.getNextValidTimeAfter(scheduleFrom);
Date nextRunAtDate = null;
Date scheduleFrom = null;

if (request.getScheduleTypeSafe() == ScheduleType.RFC5545) {
final RFC5545Schedule rfc5545Schedule = new RFC5545Schedule(request.getSchedule().get());
nextRunAtDate = rfc5545Schedule.getNextValidTime();
scheduleFrom = new Date(rfc5545Schedule.getStartDateTime().getMillis());
} else {
scheduleFrom = new Date(now);
final CronExpression cronExpression = new CronExpression(request.getQuartzScheduleSafe());
nextRunAtDate = cronExpression.getNextValidTimeAfter(scheduleFrom);
}

if (nextRunAtDate == null) {
return Optional.absent();
Expand All @@ -686,7 +697,7 @@ private Optional<Long> getNextRunAt(SingularityRequest request, RequestState sta
nextRunAt = Math.max(nextRunAtDate.getTime(), now); // don't create a schedule that is overdue as this is used to indicate that singularity is not fulfilling requests.

LOG.trace("Scheduling next run of {} (schedule: {}) at {} (from: {})", request.getId(), request.getSchedule(), nextRunAtDate, scheduleFrom);
} catch (ParseException pe) {
} catch (ParseException | InvalidRecurrenceRuleException pe) {
throw Throwables.propagate(pe);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.hubspot.singularity.helpers;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.dmfs.rfc5545.DateTime;
import org.junit.Assert;
import org.junit.Test;

public class RFC5545ScheduleTest {
@Test
public void testEveryWeekdayRFC5545Schdule() throws Exception {
String schedule = "FREQ=DAILY;BYDAY=MO,TU,WE,TH,FR";
Date nextValidTime = new RFC5545Schedule(schedule).getNextValidTime();
Assert.assertTrue(nextValidTime.after(new Date()));
Assert.assertTrue(nextValidTime.before(new Date(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3))));
}

@Test
public void testMaxIterations() throws Exception {
String schedule = "DTSTART=19970902T090000\nRRULE:FREQ=HOURLY;COUNT=10001";
Assert.assertEquals(new RFC5545Schedule(schedule).getStartDateTime(), new org.joda.time.DateTime(1997, 9, 2, 9, 0, 0));
Assert.assertEquals(new RFC5545Schedule(schedule).getNextValidTime(), null);
}

@Test
public void testRecurInPastDoesNotGiveNext() throws Exception {
String schedule = "FREQ=YEARLY;INTERVAL=4;BYMONTH=11;BYDAY=TU;BYMONTHDAY=2,3,4,5,6,7,8;COUNT=1";
Assert.assertEquals(new RFC5545Schedule(schedule).getNextValidTime(), null);
}
}
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,18 @@
<artifactId>metrics-graphite</artifactId>
<version>${dep.metrics.version}</version><!-- TOOD: add this to HubSpot's basepom -->
</dependency>

<dependency>
<groupId>org.dmfs</groupId>
<artifactId>lib-recur</artifactId>
<version>0.9.4</version>
</dependency>

<dependency>
<groupId>org.dmfs</groupId>
<artifactId>rfc5545-datetime</artifactId>
<version>0.2.2</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down