Skip to content

Commit

Permalink
[HWKMETRICS-168] more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent 20929c7 commit 4b38e1a
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.joda.time.DateTime;
import org.joda.time.Duration;
Expand All @@ -28,6 +29,8 @@
*/
public class RepeatingTrigger implements Trigger {

public static Supplier<Long> now = System::currentTimeMillis;

private long triggerTime;

private long interval;
Expand Down Expand Up @@ -134,7 +137,7 @@ public Trigger nextTrigger() {
}

private DateTime getExecutionDateTime() {
DateTime dt = DateTime.now();
DateTime dt = new DateTime(now.get());
Duration duration = Duration.millis(interval);
Period p = duration.toPeriod();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public Observable<Lease> start() {
logger.debug("Loading tasks for {}", lease);
CountDownLatch latch = new CountDownLatch(1);
getQueue(lease)
.observeOn(tasksScheduler)
.groupBy(Task2Impl::getGroupKey)
.flatMap(group -> group.flatMap(this::execute).map(this::rescheduleTask))
.subscribe(
Expand Down Expand Up @@ -371,15 +372,13 @@ private boolean acquire(Lease lease) {
* Loads the task queue for the specified lease. The returned observable emits tasks in
* the queue. The observable should execute on the lease scheduler.
*/
private Observable<Task2Impl> getQueue(Lease lease) {
Observable<Task2Impl> getQueue(Lease lease) {
logger.debug("Loading task queue for {}", lease);
return session.execute(queries.getTasksFromQueue.bind(new Date(lease.getTimeSlice()), lease.getShard()),
Schedulers.immediate())
.flatMap(Observable::from)
.map(row -> new Task2Impl(row.getUUID(2), row.getString(0), row.getInt(1), row.getString(3),
row.getMap(4, String.class, String.class), getTrigger(row.getUDTValue(5))))
// .subscribeOn(tasksScheduler);
.observeOn(tasksScheduler);
row.getMap(4, String.class, String.class), getTrigger(row.getUDTValue(5))));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.hawkular.metrics.tasks.impl;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
Expand All @@ -28,11 +29,15 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.google.common.collect.ImmutableMap;
import org.hawkular.metrics.tasks.BaseTest;
import org.hawkular.metrics.tasks.api.RepeatingTrigger;
import org.hawkular.metrics.tasks.api.SingleExecutionTrigger;
import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.Trigger;
Expand Down Expand Up @@ -98,6 +103,8 @@ public void initClass() {
scheduler.setTickScheduler(tickScheduler);
finishedTimeSlices = scheduler.getFinishedTimeSlices();
leaseObservable = scheduler.start();

RepeatingTrigger.now = tickScheduler::now;
}

@BeforeMethod
Expand Down Expand Up @@ -130,15 +137,98 @@ public void executeSingleTask() {
timeSlicesSubscriber.awaitTerminalEvent(5, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertTerminalEvent();
taskSubscriber.assertValueCount(1);

taskSubscriber.assertValueCount(1);
taskSubscriber.assertReceivedOnNext(singletonList(task));

// verify that the lease and queue have been deleted
assertLeasesDoNotExist(trigger.getTriggerTime());
assertQueueDoesNotExist(trigger.getTriggerTime(), group);
}

@Test
public void executeRepeatingTask() {
RepeatingTrigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, SECONDS)
.withInterval(1, SECONDS)
.build();
String group = "test-group";
int order = 10;
Map<String, String> params = ImmutableMap.of("x", "1", "y", "2", "z", "3");
Task2Impl task = new Task2Impl(randomUUID(), group, order, "task-1", params, trigger);

setUpTasksForExecution(Observable.just(task));

TestSubscriber<Long> timeSlicesSubscriber = new TestSubscriber<>();
finishedTimeSlices.take(2).observeOn(Schedulers.immediate()).subscribe(timeSlicesSubscriber);

TaskSubscriber taskSubscriber = new TaskSubscriber();
scheduler.subscribe(taskSubscriber);

tickScheduler.advanceTimeBy(3, SECONDS);

timeSlicesSubscriber.awaitTerminalEvent(5, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertTerminalEvent();

taskSubscriber.assertValueCount(2);

Trigger nextTrigger = getNthTrigger(trigger, 3);
Task2Impl nextTask = new Task2Impl(task.getId(), group, order, task.getName(), params, nextTrigger);

assertLeaseExists(nextTrigger.getTriggerTime(), group);
assertEquals(getQueue(nextTask.getTrigger().getTriggerTime(), group), singletonList(nextTask),
"The queue should does not match the expected values");

assertLeasesDoNotExist(trigger.getTriggerTime());
assertLeasesDoNotExist(trigger.nextTrigger().getTriggerTime());

assertQueueDoesNotExist(trigger.getTriggerTime(), group);
assertQueueDoesNotExist(trigger.nextTrigger().getTriggerTime(), group);
}

@Test
public void executeTaskThatRepeatsTwice() {
RepeatingTrigger trigger = new RepeatingTrigger.Builder()
.withInterval(1, SECONDS)
.withDelay(1, SECONDS)
.withRepeatCount(2)
.build();
String group = "test-group";
int order = 10;
Task2Impl task = new Task2Impl(randomUUID(), group, order, "task-1", emptyMap(), trigger);

setUpTasksForExecution(Observable.just(task));

TestSubscriber<Long> timeSlicesSubscriber = new TestSubscriber<>();
finishedTimeSlices.take(2).observeOn(Schedulers.immediate()).subscribe(timeSlicesSubscriber);

TaskSubscriber taskSubscriber = new TaskSubscriber();
scheduler.subscribe(taskSubscriber);

tickScheduler.advanceTimeBy(3, SECONDS);

timeSlicesSubscriber.awaitTerminalEvent(5, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertTerminalEvent();

taskSubscriber.assertValueCount(2);

Task2Impl nextTask = new Task2Impl(task.getId(), group, order, task.getName(), task.getParameters(),
trigger.nextTrigger());
taskSubscriber.assertReceivedOnNext(asList(task, nextTask));

assertLeasesDoNotExist(trigger.getTriggerTime());
assertLeasesDoNotExist(trigger.nextTrigger().getTriggerTime());
// make sure a 3rd execution was not scheduled
assertLeasesDoNotExist(trigger.nextTrigger().getTriggerTime() + trigger.getInterval());

assertQueueDoesNotExist(trigger.getTriggerTime(), group);
assertQueueDoesNotExist(trigger.nextTrigger().getTriggerTime(), group);
// make sure a 3rd execution was not scheduled
assertQueueDoesNotExist(trigger.nextTrigger().getTriggerTime() + trigger.getInterval(), group);
}

/**
* In this tests all tasks belong to the same group, and each task defines a unique ordering. We need to verify
* that the execution honors that ordering. A {@link SingleExecutionTrigger single execution trigger} is used for
Expand Down Expand Up @@ -325,6 +415,34 @@ private Observable<ResultSet> createLease(Task2 task) {
scheduler.computeShard(task.getGroupKey())));
}

private List<Task2Impl> getQueue(long time, String group) {
int shard = scheduler.computeShard(group);

return scheduler.getQueue(new Lease(time, shard, null, false)).toList().toBlocking().first();
}

private Trigger getNthTrigger(Trigger trigger, int n) {
Trigger next = trigger;
for (int i = 1; i < n; ++i) {
next = next.nextTrigger();
}
return next;
}

private void assertLeaseExists(long time, String group) {
Date timeSlice = new Date(time);
int shard = scheduler.computeShard(group);
ResultSet resultSet = session.execute(queries.findLeases.bind(timeSlice));
boolean found = false;
for (Row row : resultSet) {
if (row.getInt(0) == shard) {
found = true;
break;
}
}
assertTrue(found, "Expected to find lease for {time=" + time + ", shard=" + shard + ", group=" + group + "}");
}

private void assertLeasesDoNotExist(long time) {
Date timeSlice = new Date(time);
ResultSet resultSet = session.execute(queries.findLeases.bind(timeSlice));
Expand Down

0 comments on commit 4b38e1a

Please sign in to comment.