Skip to content

Commit

Permalink
[HWKMETRICS-199] add an endpoint to block until scheduler finishes work
Browse files Browse the repository at this point in the history
There is a new endpoint to block for a specified duration. On the server side,
we subscribe to the task schedulers time slices observable which emits a
timestamp at the completion of each time slice. That is, a timestamp is emitted
when all of the work for the time slice is finished. A request like,

  GET /hawkulr/metrics/clock/wait?duration=5min

will block the request until the task scheduler has completed the work for five
time slices.
  • Loading branch information
John Sanda committed Aug 11, 2015
1 parent 5cd1fbf commit 9606c68
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.hawkular.metrics.api.jaxrs.handler;

import static java.util.concurrent.TimeUnit.SECONDS;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
Expand All @@ -27,12 +29,18 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import org.hawkular.metrics.api.jaxrs.param.Duration;
import org.hawkular.metrics.api.jaxrs.util.VirtualClock;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;


/**
Expand All @@ -45,14 +53,13 @@ public class VirtualClockHandler {

public static final String PATH = "/clock";

@Inject
private static Logger logger = LoggerFactory.getLogger(VirtualClockHandler.class);

private VirtualClock virtualClock;

@Inject
private TaskScheduler taskScheduler;

private static boolean started;

@GET
public Response getTime() {
return Response.ok(ImmutableMap.<String, Object>of("now", virtualClock.now())).build();
Expand All @@ -62,9 +69,8 @@ public Response getTime() {
public Response setTime(Map<String, Object> params) {
Long time = (Long) params.get("time");
virtualClock.advanceTimeTo(time);
if (!started) {
if (!taskScheduler.isRunning()) {
taskScheduler.start();
started = true;
}
return Response.ok().build();
}
Expand All @@ -75,4 +81,27 @@ public Response incrementTime(Duration duration) {
return Response.ok().build();
}

@GET
@Path("/wait")
public Response waitForDuration(@QueryParam("duration") Duration duration) {
int numMinutes = (int) TimeUnit.MINUTES.convert(duration.getValue(), duration.getTimeUnit());
TestSubscriber<Long> timeSlicesSubscriber = new TestSubscriber<>();
taskScheduler.getFinishedTimeSlices()
.take(numMinutes)
.observeOn(Schedulers.immediate())
.subscribe(timeSlicesSubscriber);

try {
timeSlicesSubscriber.awaitTerminalEvent(10, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertTerminalEvent();

return Response.ok().build();
} catch (Exception e) {
logger.warn("Failed to wait " + numMinutes + " minutes for task scheduler to complete work", e);
return Response.serverError().entity(ImmutableMap.of("errorMsg", Throwables.getStackTraceAsString(e)))
.build();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ class CountersITest extends RESTTest {
assertEquals(200, response.status)

setTime(start.plusMinutes(4))
Thread.sleep(2500)

response = hawkularMetrics.get(path: "clock/wait", query: [duration: "4mn"])
assertEquals("There was an error waiting: $response.data", 200, response.status)

response = hawkularMetrics.get(
path: "counters/$counter/rate",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Observable<Task2> scheduleTask(String name, String groupKey, int executionOrder,

void shutdown();

Observable<Long> getFinishedTimeSlices();

boolean isRunning();

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ public class TaskSchedulerImpl implements TaskScheduler {

private DateTimeService dateTimeService;

private boolean shutdown;

private boolean started;
private boolean running;

/**
* A subject to broadcast tasks that are to be executed. Other task scheduling libraries
Expand Down Expand Up @@ -193,6 +191,7 @@ public Subscription subscribe(Subscriber<Task2> subscriber) {
return taskSubject.subscribe(new SubscriberWrapper(subscriber));
}

@Override
public Observable<Long> getFinishedTimeSlices() {
return tickSubject;
}
Expand Down Expand Up @@ -281,6 +280,7 @@ public Observable<Lease> start() {
// primary motivation for having this method return a hot observable.
PublishSubject<Lease> leasesSubject = PublishSubject.create();
processedLeases.subscribe(leasesSubject);
running = true;
return leasesSubject;
}

Expand All @@ -305,7 +305,7 @@ private Observable<Date> createTicks() {
// figure something out.
return Observable.timer(0, 1, TimeUnit.MINUTES, tickScheduler)
.map(tick -> currentTimeSlice())
.takeUntil(d -> shutdown)
.takeUntil(d -> !running)
.doOnNext(tick -> logger.debug("Tick {}", tick))
.observeOn(leaseScheduler);
}
Expand Down Expand Up @@ -427,7 +427,7 @@ private Observable<Task2Impl> execute(Task2Impl task) {
public void shutdown() {
try {
logger.debug("shutting down");
shutdown = true;
running = false;

if (leasesSubscription != null) {
leasesSubscription.unsubscribe();
Expand Down

0 comments on commit 9606c68

Please sign in to comment.