Skip to content

Commit

Permalink
[HWKMETRICS-168] emit event when scheduler is done with time slice
Browse files Browse the repository at this point in the history
The getAvailableLeases method now publishes the timestamp when all work is
done. This is basically a test hook so that we can make tests consistent and
repeatable.
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent 24517e2 commit 20929c7
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public interface TaskScheduler {

Observable<Lease> start();

Observable<Task2> createTask(String name, Map<String, String> parameters, Trigger trigger);
// Observable<Task2> createTask(String name, Map<String, String> parameters, Trigger trigger);

Observable<Task2> scheduleTask(String name, Map<String, String> parameters, Trigger trigger);
Observable<Task2> scheduleTask(String name, String groupKey, int executionOrder, Map<String, String> parameters,
Trigger trigger);

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class TaskSchedulerImpl implements TaskScheduler {
*/
private PublishSubject<Task2> taskSubject;

private PublishSubject<Long> tickSubject;

public TaskSchedulerImpl(RxSession session, Queries queries) {
this.session = session;
this.queries = queries;
Expand All @@ -130,6 +132,7 @@ public TaskSchedulerImpl(RxSession session, Queries queries) {
leaseScheduler = Schedulers.from(leaseExecutor);

taskSubject = PublishSubject.create();
tickSubject = PublishSubject.create();
}

void setTickScheduler(Scheduler scheduler) {
Expand Down Expand Up @@ -184,6 +187,10 @@ public Subscription subscribe(Subscriber<Task2> subscriber) {
return taskSubject.subscribe(new SubscriberWrapper(subscriber));
}

Observable<Long> getFinishedTimeSlices() {
return tickSubject;
}

/**
* Starts the scheduler so that it starts emitting tasks for execution.
*
Expand Down Expand Up @@ -245,6 +252,7 @@ public Observable<Lease> start() {
// not want to try and acquire another lease until we have
// finished with the tasks for the current lease.
latch.await();
logger.debug("Done waiting!");
} catch (InterruptedException e) {
logger.warn("Interrupted waiting for task execution to complete", e);
}
Expand Down Expand Up @@ -325,8 +333,10 @@ private Observable<Lease> getAvailableLeases(Date timeSlice) {
leases = findAvailableLeases(timeSlice);
}
logger.debug("No more leases to process for {}", timeSlice);
// TODO we do not want to perform a delete if there are no leases for the time slice
session.execute(queries.deleteLeases.bind(timeSlice)).toBlocking().first();
subscriber.onCompleted();
tickSubject.onNext(timeSlice.getTime());
} catch (Exception e) {
subscriber.onError(e);
}
Expand Down Expand Up @@ -375,10 +385,10 @@ private Observable<Task2Impl> getQueue(Lease lease) {
/**
* Creates an observable that emits a task that has been executed. Task execution is
* accomplished by publishing the task. This method is somewhat of a hack because it
* is really just for side effects. We want tasks to execute in parallel. Wrapping this
* task execution in an observable over which we then flatMap is the one way I have
* managed to achieve the parallel execution. The observable should run on the tasks
* scheduler.
* is really just for side effects. We want tasks from different groups to execute in
* parallel. Execution of tasks within the same group should be serialized based on
* their specified order. If the tasks have the same order, they can be executed in
* parallel. The observable should run on the tasks scheduler.
*
* @param task The task to emit for execution
* @return An observable that emits a task once it has been executed.
Expand All @@ -395,8 +405,8 @@ private Observable<Task2Impl> execute(Task2Impl task) {
logger.debug("Finished executing {}", task);

});
// Subscribe on the same scheduler to make sure tasks within the same group execute
// in order.
// Subscribe on the same scheduler thread to make sure tasks within the same group
// execute in order.
return observable.subscribeOn(Schedulers.immediate());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.hawkular.metrics.tasks.impl.TaskSchedulerImpl.getTriggerValue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.datastax.driver.core.ResultSet;
import org.hawkular.metrics.tasks.BaseTest;
import org.hawkular.metrics.tasks.api.SingleExecutionTrigger;
import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import rx.Observable;
import rx.observers.TestSubscriber;
Expand All @@ -43,72 +52,229 @@
*/
public class TaskSchedulerITest extends BaseITest {

private TaskSchedulerImpl scheduler;
private static Logger logger = LoggerFactory.getLogger(TaskSchedulerTest.class);

private TestTaskScheduler scheduler;

private TestScheduler tickScheduler;

private long startTime;

private Observable<Lease> leaseObservable;

private Observable<Long> finishedTimeSlices;

private class TestTaskScheduler extends TaskSchedulerImpl {

private Function<String, Integer> defaultComputeShard = super::computeShard;

private Function<String, Integer> computeShard = super::computeShard;

public TestTaskScheduler(RxSession session, Queries queries) {
super(session, queries);
computeShard = defaultComputeShard;
}

public void setComputeShardFn(Function<String, Integer> computeShard) {
this.computeShard = computeShard;
}

public void resetComputeShardFn() {
computeShard = defaultComputeShard;
}

@Override
int computeShard(String key) {
return computeShard.apply(key);
}
}

@BeforeClass
public void initClass() {
startTime = System.currentTimeMillis();
scheduler = new TaskSchedulerImpl(rxSession, queries);
scheduler = new TestTaskScheduler(rxSession, queries);
tickScheduler = Schedulers.test();
tickScheduler.advanceTimeTo(startTime, TimeUnit.MILLISECONDS);
scheduler.setTickScheduler(tickScheduler);
finishedTimeSlices = scheduler.getFinishedTimeSlices();
leaseObservable = scheduler.start();
}

@BeforeMethod
public void initMethod() {
scheduler.resetComputeShardFn();
}

/**
* This test exercises the simple scenario of a task that only executes once. No other tasks are scheduled for
* execution during this time.
*/
@Test
public void executeSingleTask() {
String group = "group-1";
int order = 100;
SingleExecutionTrigger trigger = new SingleExecutionTrigger(tickScheduler.now() + TimeUnit.SECONDS.toMillis(1));
SingleExecutionTrigger trigger = new SingleExecutionTrigger(tickScheduler.now() + SECONDS.toMillis(1));
Date timeSlice = new Date(trigger.getTriggerTime());
Task2Impl task = new Task2Impl(randomUUID(), group, order, "task1", emptyMap(), trigger);

setUpTasksForExecution(timeSlice, task);

TestSubscriber<Lease> leaseSubscriber = new TestSubscriber<>();
leaseObservable.take(1).observeOn(Schedulers.immediate()).subscribe(leaseSubscriber);
TestSubscriber<Long> timeSlicesSubscriber = new TestSubscriber<>();
finishedTimeSlices.take(1).observeOn(Schedulers.immediate()).subscribe(timeSlicesSubscriber);

TaskSubscriber taskSubscriber = new TaskSubscriber();
scheduler.subscribe(taskSubscriber);

tickScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
tickScheduler.advanceTimeBy(1, SECONDS);

leaseSubscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
leaseSubscriber.assertTerminalEvent();
timeSlicesSubscriber.awaitTerminalEvent(5, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertTerminalEvent();
taskSubscriber.assertValueCount(1);

taskSubscriber.assertReceivedOnNext(singletonList(task));

// verify that the lease and queue have been deleted
assertLeasesDoNotExist(trigger.getTriggerTime());
assertQueueDoesNotExist(trigger.getTriggerTime(), group);
}

/**
* In this tests all tasks belong to the same group, and each task defines a unique ordering. We need to verify
* that the execution honors that ordering. A {@link SingleExecutionTrigger single execution trigger} is used for
* all of the tasks, which means that each task should only be emitted once. The tasks should not be rescheduled.
*/
@Test
public void executeMultipleTasksFromSameGroup() {
int numTasks = 10;
String group = "group-1";
SingleExecutionTrigger trigger = new SingleExecutionTrigger(tickScheduler.now() + TimeUnit.SECONDS.toMillis(1));
SingleExecutionTrigger trigger = new SingleExecutionTrigger(tickScheduler.now() + SECONDS.toMillis(1));

Observable<Task2> tasks = Observable.range(1, numTasks).map(i -> new Task2Impl(randomUUID(), group, i * 10,
"task-" + i, emptyMap(), trigger)).cache().cast(Task2.class);
Observable<Task2> tasks = createTasks(numTasks, group, trigger).cache();
setUpTasksForExecution(tasks);

TestSubscriber<Lease> leaseSubscriber = new TestSubscriber<>();
leaseObservable.take(1).observeOn(Schedulers.immediate()).subscribe(leaseSubscriber);
TestSubscriber<Long> timeSlicesSubscriber = new TestSubscriber<>();
finishedTimeSlices.take(1).observeOn(Schedulers.immediate()).subscribe(timeSlicesSubscriber);

TaskSubscriber taskSubscriber = new TaskSubscriber();
scheduler.subscribe(taskSubscriber);

tickScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
leaseSubscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
leaseSubscriber.assertTerminalEvent();
taskSubscriber.assertValueCount(numTasks);
tickScheduler.advanceTimeBy(1, SECONDS);
timeSlicesSubscriber.awaitTerminalEvent(5, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertCompleted();

assertEquals(taskSubscriber.getOnNextEvents().size(), numTasks);
taskSubscriber.assertValueCount(numTasks);
taskSubscriber.assertReceivedOnNext(tasks.toList().toBlocking().first());


// verify that the lease and queue have been deleted
assertLeasesDoNotExist(trigger.getTriggerTime());
assertQueueDoesNotExist(trigger.getTriggerTime(), group);
}

/**
* In this test there are two groups. Tasks define a unique ordering in each group. We need to verify that the
* execution honors that ordering within each group. Each task should only be emitted once since a
* {@link SingleExecutionTrigger} is used. Tasks should not be rescheduled.
*/
@Test
public void executeMultipleTasksFromMultipleGroupsInDifferentQueues() {
int numTasks = 10;
final String group1 = "group-one";
final String group2 = "group-two";
SingleExecutionTrigger trigger = new SingleExecutionTrigger(tickScheduler.now() + SECONDS.toMillis(1));

scheduler.setComputeShardFn(group -> {
switch (group) {
case group1:
return 1;
case group2:
return 2;
default:
throw new IllegalArgumentException(group + " is not a recognized group key");
}
});

Observable<Task2> group1Tasks = createTasks(numTasks, group1, trigger).cache();
Observable<Task2> group2Tasks = createTasks(numTasks, group2, trigger).cache();

setUpTasksForExecution(group1Tasks.concatWith(group2Tasks));

TestSubscriber<Long> timeSlicesSubscriber = new TestSubscriber<>();
finishedTimeSlices.takeUntil(time -> time > trigger.getTriggerTime() + SECONDS.toMillis(1))
.observeOn(Schedulers.immediate())
.subscribe(timeSlicesSubscriber);

TaskSubscriber taskSubscriber = new TaskSubscriber();
scheduler.subscribe(taskSubscriber);

tickScheduler.advanceTimeBy(3, SECONDS);
timeSlicesSubscriber.awaitTerminalEvent(5, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertCompleted();

taskSubscriber.assertValueCount(numTasks * 2);
List<Task2> actualGroup1Tasks = taskSubscriber.getOnNextEvents().stream()
.filter(t -> t.getGroupKey().equals(group1)).collect(toList());
List<Task2> actualGroup2Tasks = taskSubscriber.getOnNextEvents().stream()
.filter(t -> t.getGroupKey().equals(group2)).collect(toList());

assertEquals(actualGroup1Tasks, group1Tasks.toList().toBlocking().first(), group1 + " tasks do not match");
assertEquals(actualGroup2Tasks, group2Tasks.toList().toBlocking().first(), group2 + " tasks do not match");

// verify that the leases and queues have been deleted
assertLeasesDoNotExist(trigger.getTriggerTime());
assertQueueDoesNotExist(trigger.getTriggerTime(), group1);
assertQueueDoesNotExist(trigger.getTriggerTime(), group2);
}

@Test
public void executeMultipleTasksFromMultipleGroupsInSameQueue() {
int numTasks = 10;
final String group1 = "group-one";
final String group2 = "group-two";
final int shard = 1;
SingleExecutionTrigger trigger = new SingleExecutionTrigger(tickScheduler.now() + SECONDS.toMillis(1));

scheduler.setComputeShardFn(group -> shard);

Observable<Task2> group1Tasks = createTasks(numTasks, group1, trigger).cache();
Observable<Task2> group2Tasks = createTasks(numTasks, group2, trigger).cache();

setUpTasksForExecution(group1Tasks.concatWith(group2Tasks));

TestSubscriber<Long> timeSlicesSubscriber = new TestSubscriber<>();
finishedTimeSlices.takeUntil(time -> time > trigger.getTriggerTime() + SECONDS.toMillis(1))
.observeOn(Schedulers.immediate())
.subscribe(timeSlicesSubscriber);

TaskSubscriber taskSubscriber = new TaskSubscriber();
scheduler.subscribe(taskSubscriber);

tickScheduler.advanceTimeBy(3, SECONDS);
timeSlicesSubscriber.awaitTerminalEvent(5, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertCompleted();

taskSubscriber.assertValueCount(numTasks * 2);
List<Task2> actualGroup1Tasks = taskSubscriber.getOnNextEvents().stream()
.filter(t -> t.getGroupKey().equals(group1)).collect(toList());
List<Task2> actualGroup2Tasks = taskSubscriber.getOnNextEvents().stream()
.filter(t -> t.getGroupKey().equals(group2)).collect(toList());

assertEquals(actualGroup1Tasks, group1Tasks.toList().toBlocking().first(), group1 + " tasks do not match");
assertEquals(actualGroup2Tasks, group2Tasks.toList().toBlocking().first(), group2 + " tasks do not match");

// verify that the leases and queues have been deleted
assertLeasesDoNotExist(trigger.getTriggerTime());
assertQueueDoesNotExist(trigger.getTriggerTime(), group1);
assertQueueDoesNotExist(trigger.getTriggerTime(), group2);
}

private Observable<Task2> createTasks(int count, String group, Trigger trigger) {
return Observable.range(1, count).map(i -> new Task2Impl(randomUUID(), group, i * 10, "task-" + i, emptyMap(),
trigger));
}

/**
Expand All @@ -128,7 +294,7 @@ private void setUpTasksForExecution(Date timeSlice, Task2Impl... tasks) {
}
TestSubscriber<ResultSet> subscriber = new TestSubscriber<>();
Observable.merge(resultSets).subscribe(subscriber);
subscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
subscriber.awaitTerminalEvent(5, SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
}
Expand All @@ -137,7 +303,7 @@ private void setUpTasksForExecution(Observable<Task2> tasks) {
Observable<ResultSet> resultSets = tasks.flatMap(t -> Observable.concat(insertIntoQueue(t), createLease(t)));
TestSubscriber<ResultSet> subscriber = new TestSubscriber<>();
resultSets.subscribe(subscriber);
subscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
subscriber.awaitTerminalEvent(5, SECONDS);
subscriber.assertNoErrors();
subscriber.assertCompleted();
}
Expand All @@ -159,4 +325,19 @@ private Observable<ResultSet> createLease(Task2 task) {
scheduler.computeShard(task.getGroupKey())));
}

private void assertLeasesDoNotExist(long time) {
Date timeSlice = new Date(time);
ResultSet resultSet = session.execute(queries.findLeases.bind(timeSlice));
assertTrue(resultSet.isExhausted(), "Did not expect to find any leases for " + timeSlice + " but found " +
+ resultSet.all().size() + " lease(s)");
}

private void assertQueueDoesNotExist(long time, String groupKey) {
Date timeSlice = new Date(time);
int shard = scheduler.computeShard(groupKey);
ResultSet resultSet = session.execute(queries.getTasksFromQueue.bind(timeSlice, shard));
assertTrue(resultSet.isExhausted(), "Found TaskQueue{timeSlice=" + time + ", shard=" + shard + "} but " +
"did not expect it to exist");
}

}

0 comments on commit 20929c7

Please sign in to comment.