Skip to content

Commit

Permalink
#11 Subscription Query - Work In Progress
Browse files Browse the repository at this point in the history
  • Loading branch information
saratry committed Jun 5, 2018
1 parent bfe2588 commit 05a6ce0
Show file tree
Hide file tree
Showing 16 changed files with 515 additions and 779 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ public <R> Registration subscribe(String queryName, Type responseType,
public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R> queryMessage) { public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R> queryMessage) {
QueryMessage<Q, R> interceptedQuery = dispatchInterceptors.intercept(queryMessage); QueryMessage<Q, R> interceptedQuery = dispatchInterceptors.intercept(queryMessage);
CompletableFuture<QueryResponseMessage<R>> completableFuture = new CompletableFuture<>(); CompletableFuture<QueryResponseMessage<R>> completableFuture = new CompletableFuture<>();
QueryServiceGrpc.newStub(platformConnectionManager.getChannel()) queryServiceStub()
.withInterceptors(interceptors)
.query(serializer.serializeRequest(interceptedQuery, 1, .query(serializer.serializeRequest(interceptedQuery, 1,
TimeUnit.HOURS.toMillis(1), priorityCalculator.determinePriority(interceptedQuery)), TimeUnit.HOURS.toMillis(1), priorityCalculator.determinePriority(interceptedQuery)),
new StreamObserver<QueryResponse>() { new StreamObserver<QueryResponse>() {
Expand All @@ -166,12 +165,15 @@ public void onCompleted() {
return completableFuture; return completableFuture;
} }


public QueryServiceGrpc.QueryServiceStub queryServiceStub() {
return QueryServiceGrpc.newStub(platformConnectionManager.getChannel()).withInterceptors(interceptors);
}

@Override @Override
public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q, R> queryMessage, long timeout, TimeUnit timeUnit) { public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q, R> queryMessage, long timeout, TimeUnit timeUnit) {
QueryMessage<Q, R> interceptedQuery = dispatchInterceptors.intercept(queryMessage); QueryMessage<Q, R> interceptedQuery = dispatchInterceptors.intercept(queryMessage);
QueueBackedSpliterator<QueryResponseMessage<R>> resultSpliterator = new QueueBackedSpliterator<>(timeout, timeUnit); QueueBackedSpliterator<QueryResponseMessage<R>> resultSpliterator = new QueueBackedSpliterator<>(timeout, timeUnit);
QueryServiceGrpc.newStub(platformConnectionManager.getChannel()) queryServiceStub()
.withInterceptors(interceptors)
.withDeadlineAfter(timeout, timeUnit) .withDeadlineAfter(timeout, timeUnit)
.query(serializer.serializeRequest(interceptedQuery, -1, timeUnit.toMillis(timeout), .query(serializer.serializeRequest(interceptedQuery, -1, timeUnit.toMillis(timeout),
priorityCalculator.determinePriority(interceptedQuery)), priorityCalculator.determinePriority(interceptedQuery)),
Expand Down Expand Up @@ -400,7 +402,7 @@ public int hashCode() {
} }
} }


void publish(QueryProviderOutbound providerOutbound){ public void publish(QueryProviderOutbound providerOutbound){
this.queryProvider.getSubscriberObserver().onNext(providerOutbound); this.queryProvider.getSubscriberObserver().onNext(providerOutbound);
} }


Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 05a6ce0

Please sign in to comment.