Skip to content

Commit

Permalink
[HWKMETRICS-130] make time unit task interval and window configurable
Browse files Browse the repository at this point in the history
This is primarily to support testing. I do not think we are going to support
sub-minute intervals, but doing so makes tests easier/faster.
  • Loading branch information
John Sanda committed Jun 19, 2015
1 parent d927794 commit d24abcf
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Set;

import org.joda.time.DateTime;
import org.joda.time.Duration;

/**
* Represents a unit of work to be performed by a user supplied function. All tasks are considered repeating and are
Expand Down Expand Up @@ -49,14 +48,14 @@ public interface Task {
/**
* The frequency of the task execution, e.g., five minutes.
*/
Duration getInterval();
int getInterval();

/**
* Specifies the amount of data to be included. For an aggregation task that runs every 5 minutes with a window of
* 15 minutes, the task should query for data from the past 15 minutes using {@link #getTimeSlice() timeSlice} as
* the end time.
*/
Duration getWindow();
int getWindow();

/**
* The end time of the time slice for which the task is scheduled. If a task has an interval of an hour, then the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import java.util.TreeSet;
import java.util.function.Consumer;

import org.hawkular.metrics.tasks.api.TaskType;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskType;
import org.joda.time.DateTime;
import org.joda.time.Duration;

/**
* @author jsanda
Expand All @@ -41,9 +40,9 @@ class TaskContainer implements Iterable<Task> {

private Set<String> sources;

private Duration interval;
private int interval;

private Duration window;
private int window;

private DateTime timeSlice;

Expand Down Expand Up @@ -77,8 +76,10 @@ public TaskContainer(TaskType taskType, DateTime timeSlice, int segment, String
this.segment = segment;
this.target = target;
this.sources = sources;
this.interval = Duration.standardMinutes(interval);
this.window = Duration.standardMinutes(window);
// this.interval = Duration.standardMinutes(interval);
// this.window = Duration.standardMinutes(window);
this.interval = interval;
this.window = window;
this.failedTimeSlices.addAll(failedTimeSlices);
}

Expand All @@ -94,11 +95,11 @@ public Set<String> getSources() {
return sources;
}

public Duration getInterval() {
public int getInterval() {
return interval;
}

public Duration getWindow() {
public int getWindow() {
return window;
}

Expand Down Expand Up @@ -184,8 +185,10 @@ public String toString() {
.add("taskDef", taskType.getName())
.add("target", target)
.add("sources", sources)
.add("interval", interval.toStandardMinutes())
.add("window", window.toStandardMinutes())
// .add("interval", interval.toStandardMinutes())
// .add("window", window.toStandardMinutes())
.add("interval", interval)
.add("window", window)
.add("failedTimeSlices", failedTimeSlices)
.add("timeSlice", timeSlice)
.add("segment", segment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
*/
package org.hawkular.metrics.tasks.impl;

import static org.joda.time.Duration.standardMinutes;

import java.util.Objects;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
import org.hawkular.metrics.tasks.api.TaskType;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskType;
import org.joda.time.DateTime;
import org.joda.time.Duration;

/**
* @author jsanda
Expand All @@ -38,9 +35,9 @@ public class TaskImpl implements Task {

private Set<String> sources;

private Duration interval;
private int interval;

private Duration window;
private int window;

private DateTime timeSlice;

Expand All @@ -49,12 +46,24 @@ public TaskImpl(TaskType taskType, DateTime timeSlice, String target, String sou
this.timeSlice = timeSlice;
this.target = target;
this.sources = ImmutableSet.of(source);
this.interval = standardMinutes(interval);
this.window = standardMinutes(window);
// this.interval = standardMinutes(interval);
// this.window = standardMinutes(window);
this.interval = interval;
this.window = window;
}

public TaskImpl(TaskType taskType, DateTime timeSlice, String target, Set<String> sources, Duration interval,
Duration window) {
// public TaskImpl(TaskType taskType, DateTime timeSlice, String target, Set<String> sources, Duration interval,
// Duration window) {
// this.taskType = taskType;
// this.timeSlice = timeSlice;
// this.target = target;
// this.sources = sources;
// this.interval = interval;
// this.window = window;
// }

public TaskImpl(TaskType taskType, DateTime timeSlice, String target, Set<String> sources, int interval,
int window) {
this.taskType = taskType;
this.timeSlice = timeSlice;
this.target = target;
Expand All @@ -79,12 +88,12 @@ public Set<String> getSources() {
}

@Override
public Duration getInterval() {
public int getInterval() {
return interval;
}

@Override
public Duration getWindow() {
public int getWindow() {
return window;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static java.util.stream.Collectors.toSet;
import static org.joda.time.DateTime.now;
import static org.joda.time.Duration.standardHours;
import static org.joda.time.Duration.standardMinutes;
import static org.joda.time.Duration.standardSeconds;

Expand Down Expand Up @@ -51,6 +52,7 @@
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/**
* @author jsanda
Expand Down Expand Up @@ -160,6 +162,7 @@ public void start() {
public void shutdown() {
logger.info("Shutting down");

tasksExecuted.onCompleted();
leaseService.shutdown();
ticker.shutdownNow();
scheduler.shutdownNow();
Expand Down Expand Up @@ -200,32 +203,30 @@ private TaskType findTaskType(String type) {
public Observable<Task> scheduleTask(DateTime time, Task task) {
TaskType taskType = findTaskType(task.getTaskType().getName());

DateTime currentTimeSlice = dateTimeService.getTimeSlice(time, task.getInterval());
DateTime timeSlice = currentTimeSlice.plus(task.getInterval());
DateTime currentTimeSlice = dateTimeService.getTimeSlice(time, getDuration(task.getInterval()));
DateTime timeSlice = currentTimeSlice.plus(getDuration(task.getInterval()));

return scheduleTaskAt(timeSlice, task).map(scheduledTime -> new TaskImpl(task.getTaskType(), scheduledTime,
task.getTarget(), task.getSources(), task.getInterval(), task.getWindow()));
}

private Observable<TaskContainer> rescheduleTask(TaskContainer taskContainer) {
TaskType taskType = taskContainer.getTaskType();
DateTime nextTimeSlice = taskContainer.getTimeSlice().plus(taskContainer.getInterval());
DateTime nextTimeSlice = taskContainer.getTimeSlice().plus(getDuration(taskContainer.getInterval()));
int segment = Math.abs(taskContainer.getTarget().hashCode() % taskType.getSegments());
int segmentsPerOffset = taskType.getSegments() / taskType.getSegmentOffsets();
int segmentOffset = (segment / segmentsPerOffset) * segmentsPerOffset;
Observable<ResultSet> queueObservable;

if (taskContainer.getFailedTimeSlices().isEmpty()) {
queueObservable = rxSession.execute(queries.createTask.bind(taskType.getName(), nextTimeSlice.toDate(),
segment, taskContainer.getTarget(), taskContainer.getSources(),
taskContainer.getInterval().toStandardMinutes().getMinutes(),
taskContainer.getWindow().toStandardMinutes().getMinutes()));
segment, taskContainer.getTarget(), taskContainer.getSources(), taskContainer.getInterval(),
taskContainer.getWindow()));
} else {
queueObservable = rxSession.execute(queries.createTaskWithFailures.bind(taskType.getName(),
nextTimeSlice.toDate(), segment, taskContainer.getTarget(), taskContainer.getSources(),
taskContainer.getInterval().toStandardMinutes().getMinutes(),
taskContainer.getWindow().toStandardMinutes().getMinutes(), toDates(
taskContainer.getFailedTimeSlices())));
taskContainer.getInterval(), taskContainer.getWindow(),
toDates(taskContainer.getFailedTimeSlices())));
}
Observable<ResultSet> leaseObservable = rxSession.execute(queries.createLease.bind(nextTimeSlice.toDate(),
taskType.getName(), segmentOffset));
Expand All @@ -252,8 +253,8 @@ private Observable<DateTime> scheduleTaskAt(DateTime time, Task task) {
int segmentOffset = (segment / segmentsPerOffset) * segmentsPerOffset;

Observable<ResultSet> queueObservable = rxSession.execute(queries.createTask.bind(taskType.getName(),
time.toDate(), segment, task.getTarget(), task.getSources(), (int) task.getInterval()
.getStandardMinutes(), (int) task.getWindow().getStandardMinutes()));
time.toDate(), segment, task.getTarget(), task.getSources(), (int) task.getInterval(),
task.getWindow()));
Observable<ResultSet> leaseObservable = rxSession.execute(queries.createLease.bind(time.toDate(),
taskType.getName(), segmentOffset));

Expand Down Expand Up @@ -372,4 +373,13 @@ private Observable<ResultSet> deleteTaskSegment(TaskContainer taskContainer) {
return executedTasks;
};

private Duration getDuration(int duration) {
switch (timeUnit) {
case SECONDS: return standardSeconds(duration);
case MINUTES: return standardMinutes(duration);
case HOURS: return standardHours(duration);
default: throw new IllegalArgumentException(timeUnit + " is not a supported time unit");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.stream.Collectors.toSet;
import static org.joda.time.DateTime.now;
import static org.joda.time.Duration.standardMinutes;
import static org.joda.time.Duration.standardSeconds;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand All @@ -33,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -75,7 +77,7 @@ public void scheduleTask() throws Exception {
TaskServiceImpl taskService = new TaskServiceImpl(rxSession, queries, leaseService, singletonList(taskType));

DateTime expectedTimeSlice = dateTimeService.getTimeSlice(now(), standardMinutes(5)).plusMinutes(5);
taskService.scheduleTask(now(), task).toBlocking().first();
Task scheduledTask = taskService.scheduleTask(now(), task).toBlocking().first();

int segment = Math.abs(task.getTarget().hashCode() % task.getTaskType().getSegments());

Expand All @@ -85,6 +87,26 @@ public void scheduleTask() throws Exception {
assertLeasesCreated(expectedTimeSlice, newLease(taskType, segmentOffset));
}

@Test
public void scheduleTaskUsingSeconds() throws Exception {
String type = "seconds-test";
TaskType taskType = new TaskType().setName(type).setSegments(100).setSegmentOffsets(10);
int interval = 5;
int window = 15;
Task task = taskType.createTask("metric.5sec", "metric", interval, window);

TaskServiceImpl taskService = new TaskServiceImpl(rxSession, queries, leaseService, singletonList(taskType));
taskService.setTimeUnit(TimeUnit.SECONDS);

DateTime expectedTimeSlice = dateTimeService.getTimeSlice(now(), standardSeconds(1)).plusSeconds(interval);
taskService.scheduleTask(now(), task).toBlocking().first();

int segment = Math.abs(task.getTarget().hashCode() % task.getTaskType().getSegments());

assertTasksScheduled(taskType, expectedTimeSlice, segment, new TaskImpl(taskType, expectedTimeSlice,
"metric.5sec", "metric", interval, window));
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void doNotScheduleTaskHavingInvalidType() throws Exception {

Expand Down

0 comments on commit d24abcf

Please sign in to comment.