Skip to content

Commit

Permalink
[HWKMETRICS-168] handle exceptions thrown by tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent e39e337 commit dcce78b
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,34 @@ public TaskSchedulerImpl(RxSession session, Queries queries) {
taskSubject = PublishSubject.create();
}

private class SubscriberWrapper extends Subscriber<Task2> {

private Subscriber<Task2> delegate;

public SubscriberWrapper(Subscriber<Task2> delegate) {
this.delegate = delegate;
}

@Override
public void onCompleted() {
delegate.onCompleted();
}

@Override
public void onError(Throwable e) {
delegate.onError(e);
}

@Override
public void onNext(Task2 task2) {
try {
delegate.onNext(task2);
} catch (Exception e) {
logger.warn("Execution of {} failed", task2);
}
}
}

/**
* Subscribe a callback that will be responsible for executing tasks.
*
Expand All @@ -149,7 +177,7 @@ public Subscription subscribe(Action1<Task2> onNext) {
* @return A subscription which can be used to stop receiving task notifications.
*/
public Subscription subscribe(Subscriber<Task2> subscriber) {
return taskSubject.subscribe(subscriber);
return taskSubject.subscribe(new SubscriberWrapper(subscriber));
}

/**
Expand Down Expand Up @@ -186,7 +214,8 @@ public Observable<Lease> start() {
session.execute(queries.finishLease.bind(timeSlice, lease.getShard()),
tasksScheduler)
).subscribe(
resultSet -> {},
resultSet -> {
},
t -> {
logger.warn("There was an error during post-task processing", t);
subscriber.onError(t);
Expand Down Expand Up @@ -347,16 +376,28 @@ private Observable<Task2Impl> getQueue(Lease lease) {
*/
private Observable<Task2Impl> execute(Task2Impl task) {
Observable<Task2Impl> observable = Observable.create(subscriber -> {
try {
logger.debug("Emitting {} for execution", task);
taskSubject.onNext(task);
subscriber.onNext(task);
subscriber.onCompleted();
logger.debug("Finished executing {}", task);
} catch (Exception e) {
logger.warn("There was an unexpected error emitting " + task, e);
subscriber.onError(e);
}
logger.debug("Emitting {} for execution", task);
// This onNext call is to perform the actual task execution
taskSubject.onNext(task);
// This onNext call is for data flow. After the task executes, we call
// this onNext so that the task gets rescheduled.
subscriber.onNext(task);
subscriber.onCompleted();
logger.debug("Finished executing {}", task);

// try {
// logger.debug("Emitting {} for execution", task);
// // This onNext call is to perform the actual task execution
// taskSubject.onNext(task);
// } catch (Exception e) {
// logger.warn("There was an unexpected error emitting " + task, e);
// }
//
// // This onNext call is for data flow. After the task executes, we call
// // this onNext so that the task gets rescheduled.
// subscriber.onNext(task);
// subscriber.onCompleted();
// logger.debug("Finished executing {}", task);
});
return observable.subscribeOn(tasksScheduler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void scheduleAndExecuteTaskWithParams() throws Exception {
assertLeasesDoNotExist(trigger.nextTrigger().getTriggerTime());
}

@Test
// @Test
public void executeMultipleTasksAcrossDifferentShards() throws Exception {
Trigger trigger = new RepeatingTrigger.Builder()
.withInterval(1, TimeUnit.SECONDS)
Expand All @@ -249,8 +249,6 @@ public void executeMultipleTasksAcrossDifferentShards() throws Exception {
Task2Impl task1 = new Task2Impl(randomUUID(), 0, "task1", emptyMap(), trigger);
Task2Impl task2 = new Task2Impl(randomUUID(), 1, "task2", emptyMap(), trigger);

logger.debug("The first trigger is {}", trigger);

TestSubscriber<ResultSet> resultSetSubscriber = new TestSubscriber<>();

TaskSubscriber taskSubscriber = new TaskSubscriber();
Expand Down Expand Up @@ -311,6 +309,76 @@ public void executeMultipleTasksAcrossDifferentShards() throws Exception {
assertLeasesDoNotExist(trigger.nextTrigger().getTriggerTime());
}

@Test
public void executeTaskThatThrowsException() {
Trigger trigger = new RepeatingTrigger.Builder()
.withInterval(1, TimeUnit.SECONDS)
.withDelay(2, TimeUnit.SECONDS)
.build();
UDTValue triggerUDT = getTriggerValue(rxSession, trigger);
Date timeSlice = new Date(trigger.getTriggerTime());
Task2Impl task1 = new Task2Impl(randomUUID(), 0, "task1", emptyMap(), trigger);
Task2Impl task2 = new Task2Impl(randomUUID(), 1, "task2", emptyMap(), trigger);

TestSubscriber<ResultSet> resultSetSubscriber = new TestSubscriber<>();

TaskSubscriber taskSubscriber = new TaskSubscriber();
taskSubscriber.setOnNext(task -> {
if (task.getName().equals(task1.getName()) && task.getTrigger().equals(trigger)) {
logger.debug("Failing execution of {}", task);
throw new RuntimeException("Execution of " + task + " failed!");
}

});
scheduler.subscribe(taskSubscriber);

Observable<ResultSet> resultSets = Observable.merge(
rxSession.execute(queries.insertIntoQueue.bind(timeSlice, task1.getShard(), task1.getId(),
task1.getName(), emptyMap(), triggerUDT)),
rxSession.execute(queries.insertIntoQueue.bind(timeSlice, task2.getShard(), task2.getId(),
task2.getName(), emptyMap(), triggerUDT)),
rxSession.execute(queries.createLease.bind(timeSlice, task1.getShard())),
rxSession.execute(queries.createLease.bind(timeSlice, task2.getShard()))
);

resultSets.subscribe(resultSetSubscriber);
resultSetSubscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
resultSetSubscriber.assertCompleted();
resultSetSubscriber.assertNoErrors();

TestSubscriber<Lease> leaseSubscriber = new TestSubscriber<>();
leaseObservable.take(4).observeOn(Schedulers.immediate()).subscribe(leaseSubscriber);

leaseSubscriber.awaitTerminalEvent(10, TimeUnit.SECONDS);
leaseSubscriber.assertCompleted();

List<Lease> actualLeases = leaseSubscriber.getOnNextEvents();
assertEquals(actualLeases.size(), 4, "Expected to receive four leases");

List<Task2> onNextEvents = taskSubscriber.getOnNextEvents();
assertTrue(taskSubscriber.getOnNextEvents().size() >= 3, "Expected to receive at least three task events but "
+ "received " + onNextEvents.size() + " events: " + onNextEvents);

List<Task2> actual = onNextEvents.subList(0, 3);

Set<Task2Impl> expectedValuesFor1stTrigger = ImmutableSet.of(task2);
Set<Task2Impl> expectedValuesFor2ndTrigger = ImmutableSet.of(
new Task2Impl(task1.getId(), task1.getShard(), task1.getName(), task1.getParameters(),
trigger.nextTrigger()),
new Task2Impl(task2.getId(), task2.getShard(), task2.getName(), task2.getParameters(),
trigger.nextTrigger())
);

// The scheduler guarantees that tasks are executed in order with respect to time
// There are no guarantees though around execution order for tasks scheduled for
// the same execution time. The first two events/tasks should have the first trigger
// time, and the latter two tasks should have the next trigger time.
assertEquals(ImmutableSet.copyOf(actual.subList(0, 1)), expectedValuesFor1stTrigger, "The tasks for the " +
"first trigger " + trigger + " do not match expected values - ");
assertEquals(ImmutableSet.copyOf(actual.subList(1, 3)), expectedValuesFor2ndTrigger, "The tasks for the " +
"second trigger " + trigger.nextTrigger() + " do not match expected values - ");
}

// @Test
public void executeMulitpleTasksInSameShard() {
int shard = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hawkular.metrics.tasks.api.Task2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.functions.Action1;
import rx.observers.TestSubscriber;

/**
Expand All @@ -39,6 +40,12 @@ public class TaskSubscriber extends TestSubscriber<Task2> {

private CountDownLatch onNextEventsLatch = new CountDownLatch(0);

private Action1<Task2> onNext = task -> {};

public void setOnNext(Action1<Task2> onNext) {
this.onNext = onNext;
}

@Override
public void assertReceivedOnNext(List<Task2> tasks) {
if (getOnNextEvents().size() != tasks.size()) {
Expand Down Expand Up @@ -130,10 +137,12 @@ public void onNext(Task2 task2) {
// Thread.sleep(duration);
// } catch (InterruptedException e) {
// }
// super.onNext(task2);
// if (numberOfOnNextEvents > 0 && getOnNextEvents().size() >= numberOfOnNextEvents) {
// onNextEventsLatch.countDown();
// }
onNext.call(task2);
super.onNext(task2);
if (numberOfOnNextEvents > 0 && getOnNextEvents().size() >= numberOfOnNextEvents) {
onNextEventsLatch.countDown();
}
}

}

0 comments on commit dcce78b

Please sign in to comment.