Skip to content

Commit

Permalink
[HWKMETRICS-168] add support for setting number of executions in repe…
Browse files Browse the repository at this point in the history
…ating trigger

This is generally useful feature, but I added it right now to help with
integration tests.
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent dcce78b commit bdec66d
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 39 deletions.
4 changes: 3 additions & 1 deletion schema-manager/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ CREATE TYPE ${keyspace}.trigger_def (
type int,
trigger_time bigint,
delay bigint,
interval bigint
interval bigint,
repeat_count int,
execution_count int
);

-- #
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public class RepeatingTrigger implements Trigger {

private long delay;

private Integer repeatCount;

private int executionCount;

public static class Builder {

private RepeatingTrigger trigger = new RepeatingTrigger();
Expand All @@ -48,8 +52,14 @@ public Builder withDelay(int delay, TimeUnit timeUnit) {
return this;
}

public Builder withRepeatCount(int count) {
trigger.repeatCount = count;
return this;
}

public RepeatingTrigger build() {
trigger.triggerTime = trigger.getExecutionDateTime().getMillis() + trigger.delay;
trigger.executionCount = 1;
return trigger;
}

Expand All @@ -62,13 +72,29 @@ public RepeatingTrigger(long interval) {
this.interval = interval;
}

public RepeatingTrigger(long interval, long delay, long triggerTime) {
this.interval = interval;
this.delay = delay;
this.triggerTime = triggerTime;
}

// TODO shoud this constructor be exposed in the client API?
// We need this constructor for use by the scheduler when creating a trigger from a row
// in the database.
public RepeatingTrigger(long interval, long delay, long triggerTime) {
public RepeatingTrigger(long interval, long delay, long triggerTime, int executionCount) {
this.interval = interval;
this.delay = delay;
this.triggerTime = triggerTime;
this.executionCount = executionCount;
}

public RepeatingTrigger(long interval, long delay, long triggerTime, int repeatCount, int executionCount) {
this.interval = interval;
this.delay = delay;
this.triggerTime = triggerTime;
this.executionCount = executionCount;
this.repeatCount = repeatCount == 0 ? null : repeatCount;
this.executionCount = executionCount;
}

public long getInterval() {
Expand All @@ -84,9 +110,27 @@ public long getTriggerTime() {
return triggerTime;
}

public Integer getRepeatCount() {
return repeatCount;
}

public int getExecutionCount() {
return executionCount;
}

@Override
public Trigger nextTrigger() {
return new RepeatingTrigger(interval, delay, triggerTime + interval);
if (repeatCount != null && executionCount + 1 > repeatCount) {
return null;
}
RepeatingTrigger next = new RepeatingTrigger();
next.interval = interval;
next.delay = delay;
next.triggerTime = triggerTime + interval;
next.repeatCount = repeatCount;
next.executionCount = executionCount + 1;

return next;
}

private DateTime getExecutionDateTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ public TaskSchedulerImpl(RxSession session, Queries queries) {
taskSubject = PublishSubject.create();
}

void setTickScheduler(Scheduler scheduler) {
this.tickScheduler = scheduler;
}

private class SubscriberWrapper extends Subscriber<Task2> {

private Subscriber<Task2> delegate;
Expand Down Expand Up @@ -495,6 +499,11 @@ public Observable<Task2> scheduleTask(String name, Map<String, String> parameter
* completed.
*/
private Observable<Task2Impl> rescheduleTask(Task2Impl task) {
Trigger nextTrigger = task.getTrigger().nextTrigger();
if (nextTrigger == null) {
logger.debug("There are no more executions for {}", task);
return Observable.just(task);
}
Task2Impl newTask = new Task2Impl(task.getId(), task.getShard(), task.getName(), task.getParameters(),
task.getTrigger().nextTrigger());
UDTValue triggerUDT = getTriggerValue(session, newTask.getTrigger());
Expand Down Expand Up @@ -549,7 +558,13 @@ static Trigger getTrigger(UDTValue value) {
if (type != 1) {
throw new IllegalArgumentException("Trigger type [" + type + "] is not supported");
}
return new RepeatingTrigger(value.getLong("interval"), value.getLong("delay"), value.getLong("trigger_time"));
return new RepeatingTrigger(
value.getLong("interval"),
value.getLong("delay"),
value.getLong("trigger_time"),
value.getInt("repeat_count"),
value.getInt("execution_count")
);
}

static UDTValue getTriggerValue(RxSession session, Trigger trigger) {
Expand All @@ -568,6 +583,10 @@ static UDTValue getRepeatingTriggerValue(RxSession session, RepeatingTrigger tri
if (trigger.getDelay() > 0) {
triggerUDT.setLong("delay", trigger.getDelay());
}
if (trigger.getRepeatCount() != null) {
triggerUDT.setInt("repeat_count", trigger.getRepeatCount());
triggerUDT.setInt("execution_count", trigger.getExecutionCount());
}

return triggerUDT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
Expand All @@ -45,6 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import rx.Observable;
import rx.observers.TestSubscriber;
Expand All @@ -67,6 +70,8 @@ public class TaskSchedulerImplTest extends BaseTest {

private Observable<Lease> leaseObservable;

private AtomicInteger ids = new AtomicInteger();

@BeforeClass
public void initClass() {
findTasks = session.prepare("select id, shard, name, params, trigger from tasks");
Expand All @@ -77,6 +82,11 @@ public void initClass() {
leaseObservable = scheduler.start();
}

@BeforeMethod
public void beforeTestMethod(Method method) {
logger.debug("Preparing to execute {}", method.getName());
}

// @Test
public void createTask() {
TaskSchedulerImpl scheduler = new TaskSchedulerImpl(rxSession, queries);
Expand Down Expand Up @@ -145,7 +155,7 @@ public void scheduleTask() {
"The leases for time slice [" + timeSlice + "] do not match");
}

// @Test
@Test
public void scheduleAndExecuteTaskWithNoParams() throws Exception {
String taskName = "SimpleTaskWithNoParams";
Trigger trigger = new RepeatingTrigger.Builder()
Expand Down Expand Up @@ -191,7 +201,7 @@ public void scheduleAndExecuteTaskWithNoParams() throws Exception {
assertLeasesDoNotExist(trigger.nextTrigger().getTriggerTime());
}

// @Test
@Test
public void scheduleAndExecuteTaskWithParams() throws Exception {
String taskName = "SimpleTaskWithParams";
Map<String, String> params = ImmutableMap.of("x", "1", "y", "2", "z", "3");
Expand Down Expand Up @@ -238,35 +248,20 @@ public void scheduleAndExecuteTaskWithParams() throws Exception {
assertLeasesDoNotExist(trigger.nextTrigger().getTriggerTime());
}

// @Test
@Test
public void executeMultipleTasksAcrossDifferentShards() throws Exception {
Trigger trigger = new RepeatingTrigger.Builder()
.withInterval(1, TimeUnit.SECONDS)
.withDelay(2, TimeUnit.SECONDS)
.build();
UDTValue triggerUDT = getTriggerValue(rxSession, trigger);
Date timeSlice = new Date(trigger.getTriggerTime());
Task2Impl task1 = new Task2Impl(randomUUID(), 0, "task1", emptyMap(), trigger);
Task2Impl task2 = new Task2Impl(randomUUID(), 1, "task2", emptyMap(), trigger);

TestSubscriber<ResultSet> resultSetSubscriber = new TestSubscriber<>();

TaskSubscriber taskSubscriber = new TaskSubscriber();
scheduler.subscribe(taskSubscriber);

Observable<ResultSet> resultSets = Observable.merge(
rxSession.execute(queries.insertIntoQueue.bind(timeSlice, task1.getShard(), task1.getId(),
task1.getName(), emptyMap(), triggerUDT)),
rxSession.execute(queries.insertIntoQueue.bind(timeSlice, task2.getShard(), task2.getId(),
task2.getName(), emptyMap(), triggerUDT)),
rxSession.execute(queries.createLease.bind(timeSlice, task1.getShard())),
rxSession.execute(queries.createLease.bind(timeSlice, task2.getShard()))
);

resultSets.subscribe(resultSetSubscriber);
resultSetSubscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
resultSetSubscriber.assertCompleted();
resultSetSubscriber.assertNoErrors();
setUpTasksForExecution(timeSlice, task1, task2);

TestSubscriber<Lease> leaseSubscriber = new TestSubscriber<>();
leaseObservable.take(4).observeOn(Schedulers.immediate()).subscribe(leaseSubscriber);
Expand Down Expand Up @@ -315,13 +310,10 @@ public void executeTaskThatThrowsException() {
.withInterval(1, TimeUnit.SECONDS)
.withDelay(2, TimeUnit.SECONDS)
.build();
UDTValue triggerUDT = getTriggerValue(rxSession, trigger);
Date timeSlice = new Date(trigger.getTriggerTime());
Task2Impl task1 = new Task2Impl(randomUUID(), 0, "task1", emptyMap(), trigger);
Task2Impl task2 = new Task2Impl(randomUUID(), 1, "task2", emptyMap(), trigger);

TestSubscriber<ResultSet> resultSetSubscriber = new TestSubscriber<>();

TaskSubscriber taskSubscriber = new TaskSubscriber();
taskSubscriber.setOnNext(task -> {
if (task.getName().equals(task1.getName()) && task.getTrigger().equals(trigger)) {
Expand All @@ -332,19 +324,7 @@ public void executeTaskThatThrowsException() {
});
scheduler.subscribe(taskSubscriber);

Observable<ResultSet> resultSets = Observable.merge(
rxSession.execute(queries.insertIntoQueue.bind(timeSlice, task1.getShard(), task1.getId(),
task1.getName(), emptyMap(), triggerUDT)),
rxSession.execute(queries.insertIntoQueue.bind(timeSlice, task2.getShard(), task2.getId(),
task2.getName(), emptyMap(), triggerUDT)),
rxSession.execute(queries.createLease.bind(timeSlice, task1.getShard())),
rxSession.execute(queries.createLease.bind(timeSlice, task2.getShard()))
);

resultSets.subscribe(resultSetSubscriber);
resultSetSubscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
resultSetSubscriber.assertCompleted();
resultSetSubscriber.assertNoErrors();
setUpTasksForExecution(timeSlice, task1, task2);

TestSubscriber<Lease> leaseSubscriber = new TestSubscriber<>();
leaseObservable.take(4).observeOn(Schedulers.immediate()).subscribe(leaseSubscriber);
Expand Down Expand Up @@ -377,6 +357,105 @@ public void executeTaskThatThrowsException() {
"first trigger " + trigger + " do not match expected values - ");
assertEquals(ImmutableSet.copyOf(actual.subList(1, 3)), expectedValuesFor2ndTrigger, "The tasks for the " +
"second trigger " + trigger.nextTrigger() + " do not match expected values - ");

assertQueueDoesNotExist(trigger.getTriggerTime(), task1.getShard());
assertQueueDoesNotExist(trigger.getTriggerTime(), task2.getShard());
assertQueueDoesNotExist(trigger.nextTrigger().getTriggerTime(), task1.getShard());
assertQueueDoesNotExist(trigger.nextTrigger().getTriggerTime(), task2.getShard());

assertLeasesDoNotExist(trigger.getTriggerTime());
assertLeasesDoNotExist(trigger.nextTrigger().getTriggerTime());
}

@Test
public void executeLongRunningTask() {
Trigger trigger = new RepeatingTrigger.Builder()
.withInterval(1, TimeUnit.SECONDS)
.withDelay(2, TimeUnit.SECONDS)
.withRepeatCount(2)
.build();
Date timeSlice = new Date(trigger.getTriggerTime());
Task2Impl task1 = new Task2Impl(randomUUID(), 0, "taskLongRunning-1", emptyMap(), trigger);
Task2Impl task2 = new Task2Impl(randomUUID(), 1, "taskLongRunning-2", emptyMap(), trigger);

TaskSubscriber taskSubscriber = new TaskSubscriber();
taskSubscriber.setOnNext(task -> {
if (task.getName().equals(task1.getName())) {
logger.debug("Executing long running task {}", task);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
}
});
scheduler.subscribe(taskSubscriber);

setUpTasksForExecution(timeSlice, task1, task2);

TestSubscriber<Lease> leaseSubscriber = new TestSubscriber<>();
leaseObservable.take(4).observeOn(Schedulers.immediate()).subscribe(leaseSubscriber);

leaseSubscriber.awaitTerminalEvent(10, TimeUnit.SECONDS);
leaseSubscriber.assertCompleted();

List<Lease> actualLeases = leaseSubscriber.getOnNextEvents();
assertEquals(actualLeases.size(), 4, "Expected to receive four leases");

List<Task2> onNextEvents = taskSubscriber.getOnNextEvents();
assertTrue(taskSubscriber.getOnNextEvents().size() >= 4, "Expected to receive at least four task events but " +
"received " + onNextEvents.size() + " events: " + onNextEvents);

List<Task2> actual = onNextEvents.subList(0, 4);

Set<Task2Impl> expectedValuesFor1stTrigger = ImmutableSet.of(task1, task2);
Set<Task2Impl> expectedValuesFor2ndTrigger = ImmutableSet.of(
new Task2Impl(task1.getId(), task1.getShard(), task1.getName(), task1.getParameters(),
trigger.nextTrigger()),
new Task2Impl(task2.getId(), task2.getShard(), task2.getName(), task2.getParameters(),
trigger.nextTrigger())
);

// The scheduler guarantees that tasks are executed in order with respect to time
// There are no guarantees though around execution order for tasks scheduled for
// the same execution time. The first two events/tasks should have the first trigger
// time, and the latter two tasks should have the next trigger time.
assertEquals(ImmutableSet.copyOf(actual.subList(0, 2)), expectedValuesFor1stTrigger, "The tasks for the " +
"first trigger " + trigger + " do not match expected values - ");
assertEquals(ImmutableSet.copyOf(actual.subList(2, 4)), expectedValuesFor2ndTrigger, "The tasks for the " +
"second trigger " + trigger.nextTrigger() + " do not match expected values - ");

assertQueueDoesNotExist(trigger.getTriggerTime(), task1.getShard());
assertQueueDoesNotExist(trigger.getTriggerTime(), task2.getShard());
assertQueueDoesNotExist(trigger.nextTrigger().getTriggerTime(), task1.getShard());
assertQueueDoesNotExist(trigger.nextTrigger().getTriggerTime(), task2.getShard());

assertLeasesDoNotExist(trigger.getTriggerTime());
assertLeasesDoNotExist(trigger.nextTrigger().getTriggerTime());
}

/**
* Inserts the tasks into the time slice queue and creates the leases. The method then
* blocks until all writes have completed and asserts that there are no errors.
*
* @param timeSlice The time slice for which tasks and leases will be created
* @param tasks The tasks to create
*/
private void setUpTasksForExecution(Date timeSlice, Task2Impl... tasks) {
List<Observable<ResultSet>> resultSets = new ArrayList<>();
for (Task2Impl t : tasks) {
resultSets.add(rxSession.execute(queries.insertIntoQueue.bind(timeSlice, t.getShard(), t.getId(),
t.getName(), t.getParameters(), getTriggerValue(rxSession, t.getTrigger()))));
resultSets.add(rxSession.execute(queries.createLease.bind(timeSlice, t.getShard())));
}
TestSubscriber<ResultSet> subscriber = new TestSubscriber<>();
Observable.merge(resultSets).subscribe(subscriber);
subscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
}

private int nextId() {
return ids.getAndIncrement();
}

// @Test
Expand Down

0 comments on commit bdec66d

Please sign in to comment.