Skip to content

Commit

Permalink
[HWKMETRICS-168] add test to verify execution order and fix query
Browse files Browse the repository at this point in the history
I am adding a test that simulates a long running task. Every task is supposed
execute every minute. One of the tasks takes 3 minutes to complete. Tasks are
split into two groups. The test verifies that tests are executed in the
expected order despite the long delay.

This commit also fixes a bug in the query to find available leases. The filter
function needs to also check that the owern is not set in addition to the lease
not being finished.
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent 8cdedc4 commit 4467d6f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ public Observable<Long> getFinishedTimeSlices() {
return tickSubject;
}

public Observable<Task2> getTasks() {
return taskSubject;
}

/**
* Starts the scheduler so that it starts emitting tasks for execution.
*
Expand Down Expand Up @@ -361,7 +365,7 @@ private List<Lease> findAvailableLeases(Date timeSlice) {
return session.execute(queries.findLeases.bind(timeSlice))
.flatMap(Observable::from)
.map(row -> new Lease(timeSlice.getTime(), row.getInt(0), row.getString(1), row.getBool(2)))
.filter(lease -> !lease.isFinished())
.filter(lease -> !lease.isFinished() && lease.getOwner() == null)
.toList()
.toBlocking()
.firstOrDefault(Collections.<Lease>emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.hawkular.rx.cassandra.driver.RxSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -110,10 +109,10 @@ public void initClass() {
AbstractTrigger.now = tickScheduler::now;
}

@AfterClass
public void shutdown() {
scheduler.shutdown();
}
// @AfterClass
// public void shutdown() {
// scheduler.shutdown();
// }

@BeforeMethod
public void initMethod() {
Expand Down Expand Up @@ -327,6 +326,67 @@ public void executeMultipleTasksFromMultipleGroupsInDifferentQueues() {
assertQueueDoesNotExist(trigger.getTriggerTime(), group2);
}

@Test
public void executeTasksFromMultipleGroupsMultipleTimes() {
int numTasks = 3;
final String group1 = "group-ONE";
final String group2 = "group-TWO";
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, MINUTES)
.withInterval(1, MINUTES)
.build();

scheduler.setComputeShardFn(group -> {
switch (group) {
case group1:
return 1;
case group2:
return 2;
default:
throw new IllegalArgumentException(group + " is not a recognized group key");
}
});

TaskSubscriber taskSubscriber = new TaskSubscriber();
taskSubscriber.setOnNext(task -> {
if (task.getGroupKey().equals(group1) && task.getName().equals("task-2")) {
int count = 0;
while (count++ < 3) {
tickScheduler.advanceTimeBy(1, MINUTES);
}
}
});

Observable<Task2> group1Tasks = createTasks(numTasks, group1, trigger).cache();
Observable<Task2> group2Tasks = createTasks(numTasks, group2, trigger).cache();

setUpTasksForExecution(group1Tasks.concatWith(group2Tasks));

scheduler.getTasks().take(12).subscribe(taskSubscriber);

tickScheduler.advanceTimeBy(4, MINUTES);

taskSubscriber.awaitTerminalEvent(5, SECONDS);
taskSubscriber.assertNoErrors();
taskSubscriber.assertCompleted();

taskSubscriber.assertValueCount(numTasks * 4);

List<Task2> actualTasks = taskSubscriber.getOnNextEvents();

Observable<Task2> nextGroup1Tasks = group1Tasks.map(task -> new Task2Impl(task.getId(), group1,
task.getOrder(), task.getName(), task.getParameters(), trigger.nextTrigger()));

Observable<Task2> nextGroup2Tasks = group2Tasks.map(task -> new Task2Impl(task.getId(), group2,
task.getOrder(), task.getName(), task.getParameters(), trigger.nextTrigger()));
List<Task2> expectedTasks = Observable.concat(group1Tasks, group2Tasks, nextGroup1Tasks, nextGroup2Tasks)
.toList()
.toBlocking()
.first();

assertEquals(actualTasks, expectedTasks, "The tasks do not match");
}

@Test
public void executeMultipleTasksFromMultipleGroupsInSameQueue() {
int numTasks = 10;
Expand Down

0 comments on commit 4467d6f

Please sign in to comment.