Skip to content

Commit

Permalink
[HWKMETRICS-683] Use the correct scheduler for the insertion work.
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefan Negrea committed Jun 29, 2017
1 parent 52a03f8 commit a68ffee
Showing 1 changed file with 2 additions and 1 deletion.
Expand Up @@ -41,6 +41,7 @@
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import rx.Observable;
import rx.schedulers.Schedulers;

@RestEndpoint(path = "/")
public class GenericInserter implements RestHandler {
Expand Down Expand Up @@ -83,6 +84,6 @@ public <T> void addData(RoutingContext ctx, MetricType<T> type) {
Observable<Metric<T>> metrics = Functions.metricToObservable(TenantFilter.getTenant(ctx), counters,
type);
Observable<Void> observable = metricsService.addDataPoints(type, metrics);
observable.subscribe(new ResultSetObserver(ctx));
observable.subscribeOn(Schedulers.io()).subscribe(new ResultSetObserver(ctx));
}
}

3 comments on commit a68ffee

@jsanda
Copy link
Contributor

@jsanda jsanda commented on a68ffee Jun 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for the subscribeOn because addDataPoints is already async. If it was a blocking call, then you would want to use subscribeOn.

@stefannegrea
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsanda, that change doubled the throughput. I am still trying to understand why.

Do you have any insights into what could be going on there?

@jsanda
Copy link
Contributor

@jsanda jsanda commented on a68ffee Jun 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure. The stuff that executes on subscription are the operators that set up the call chain. As I understand it the callbacks that get passed to operators are not executed on subscription. They get executed when the source observable has a value to emit and push downstream. All of the heavy lifting when values are pushed downstream via onNext. I will probably need to look at and run the tests myself. Can you point me to them?

Here is something else to consider. With Vert.x and the Cassandra driver we are operating in more async environment than we were with JAX-RS. In general we should not have to be scheduling work on the I/O scheduler since everything should already be async. One of the benefits of the async stack is that we should be able to do more work with fewer resources such as threads.

Please sign in to comment.