Skip to content

Commit

Permalink
Removed blocking behaviour of future resolving on SimpleQueryBus.
Browse files Browse the repository at this point in the history
  • Loading branch information
m1l4n54v1c committed May 3, 2018
1 parent 2233617 commit b0c1917
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 44 deletions.
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -142,8 +143,8 @@ private boolean unsubscribe(String queryName,
public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R> query) {
MessageMonitor.MonitorCallback monitorCallback = messageMonitor.onMessageIngested(query);
QueryMessage<Q, R> interceptedQuery = intercept(query);
CompletableFuture<QueryResponseMessage<R>> completableFuture = new CompletableFuture<>();
List<MessageHandler<? super QueryMessage<?, ?>>> handlers = getHandlersForMessage(interceptedQuery);
CompletableFuture<QueryResponseMessage<R>> result = new CompletableFuture<>();
try {
if (handlers.isEmpty()) {
throw new NoHandlerForQueryException(format("No handler found for %s with response type %s",
Expand All @@ -152,14 +153,10 @@ public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R
}
Iterator<MessageHandler<? super QueryMessage<?, ?>>> handlerIterator = handlers.iterator();
boolean invocationSuccess = false;
QueryResponseMessage<R> result = null;
while (!invocationSuccess && handlerIterator.hasNext()) {
try {
DefaultUnitOfWork<QueryMessage<Q, R>> uow = DefaultUnitOfWork.startAndGet(interceptedQuery);
result = GenericQueryResponseMessage.asNullableResponseMessage(
query.getResponseType().responseMessagePayloadType(),
interceptAndInvoke(uow, handlerIterator.next())
);
result = interceptAndInvoke(uow, handlerIterator.next());
invocationSuccess = true;
} catch (NoHandlerForQueryException e) {
// Ignore this Query Handler, as we may have another one which is suitable
Expand All @@ -170,13 +167,12 @@ public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R
interceptedQuery.getQueryName(),
interceptedQuery.getResponseType()));
}
completableFuture.complete(result);
monitorCallback.reportSuccess();
} catch (Exception e) {
completableFuture.completeExceptionally(e);
result.completeExceptionally(e);
monitorCallback.reportFailure(e);
}
return completableFuture;
return result;
}

@Override
Expand All @@ -189,20 +185,23 @@ public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q, R> q
return Stream.empty();
}

long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
return handlers.stream()
.map(mh -> {
QueryResponseMessage<R> result = null;
.map(handler -> {
try {
result = interceptAndInvoke(DefaultUnitOfWork.startAndGet(interceptedQuery), mh);
long leftTimeout = deadline - System.currentTimeMillis();
leftTimeout = leftTimeout < 0 ? 0 : leftTimeout;
QueryResponseMessage<R> response =
interceptAndInvoke(DefaultUnitOfWork.startAndGet(interceptedQuery), handler)
.get(leftTimeout, TimeUnit.MILLISECONDS);
monitorCallback.reportSuccess();
return result;
return response;
} catch (Exception e) {
monitorCallback.reportFailure(e);
errorHandler.onError(e, interceptedQuery, mh);
errorHandler.onError(e, interceptedQuery, handler);
return null;
}
return result;
})
.filter(Objects::nonNull);
}).filter(Objects::nonNull);
}

@SuppressWarnings("unchecked")
Expand All @@ -228,11 +227,19 @@ public <Q, I, U> Registration subscriptionQuery(SubscriptionQueryMessage<Q, I, U
while (!invocationSuccess && handlerIterator.hasNext()) {
try {
DefaultUnitOfWork<QueryMessage<Q, I>> uow = DefaultUnitOfWork.startAndGet(interceptedQuery);
I initialResponse = interceptAndInvoke(uow, handlerIterator.next()).getPayload();

updateHandler.onInitialResult(initialResponse);

updateHandlers.put(query, updateHandler);
interceptAndInvoke(uow, handlerIterator.next())
.thenAccept(responseMessage -> {
try {
I initialResponse = responseMessage.getPayload();
updateHandler.onInitialResult(initialResponse);
updateHandlers.put(query, updateHandler);
registeringSubscriptionQueryLocks.remove(interceptedQuery);
monitorCallback.reportSuccess();
} catch (Exception e) {
monitorCallback.reportFailure(e);
updateHandler.onCompletedExceptionally(e);
}
});

invocationSuccess = true;
} catch (NoHandlerForQueryException e) {
Expand All @@ -246,16 +253,13 @@ public <Q, I, U> Registration subscriptionQuery(SubscriptionQueryMessage<Q, I, U
interceptedQuery.getResponseType(),
interceptedQuery.getUpdateResponseType()));
}
monitorCallback.reportSuccess();
} catch (Exception e) {
monitorCallback.reportFailure(e);
updateHandler.onCompletedExceptionally(e);
} finally {
rwLock.writeLock().unlock();
}

registeringSubscriptionQueryLocks.remove(interceptedQuery);

return () -> {
updateHandlers.remove(query);
return true;
Expand Down Expand Up @@ -328,21 +332,35 @@ private void registeringSubscriptionQueryReadSafe(Predicate<?> subscriptionQuery
}

@SuppressWarnings("unchecked")
private <Q, R> QueryResponseMessage<R> interceptAndInvoke(UnitOfWork<QueryMessage<Q, R>> uow,
private <Q, R> CompletableFuture<QueryResponseMessage<R>> interceptAndInvoke(UnitOfWork<QueryMessage<Q, R>> uow,
MessageHandler<? super QueryMessage<?, R>> handler)
throws Exception {
return uow.executeWithResult(() -> {
ResponseType<R> responseType = uow.getMessage().getResponseType();
Object queryResponse = new DefaultInterceptorChain<>(uow, handlerInterceptors, handler).proceed();
if (queryResponse instanceof Future) {
queryResponse = ((Future) queryResponse).get();
if (queryResponse instanceof CompletableFuture) {
return ((CompletableFuture) queryResponse).thenCompose(
result -> buildCompletableFuture(responseType, result));
} else if (queryResponse instanceof Future) {
return CompletableFuture.supplyAsync(() -> {
try {
return ((Future) queryResponse).get();
} catch (InterruptedException|ExecutionException e) {
throw new RuntimeException(e);
}
});
}
return GenericQueryResponseMessage.asNullableResponseMessage(
responseType.responseMessagePayloadType(),
responseType.convert(queryResponse));
return buildCompletableFuture(responseType, queryResponse);
});
}

private <R> CompletableFuture<QueryResponseMessage<R>> buildCompletableFuture(ResponseType<R> responseType,
Object queryResponse) {
return CompletableFuture.completedFuture(GenericQueryResponseMessage.asNullableResponseMessage(
responseType.responseMessagePayloadType(),
responseType.convert(queryResponse)));
}

@SuppressWarnings("unchecked")
private <Q, R, T extends QueryMessage<Q, R>> T intercept(T query) {
T intercepted = query;
Expand Down
Expand Up @@ -38,7 +38,13 @@ protected AbstractResponseType(Class<?> expectedResponseType) {
this.expectedResponseType = expectedResponseType;
}

protected Type unwrapFuture(Type type) {
/**
* Tries to unwrap generic type if provided {@code type} is of type {@link Future}.
*
* @param type to be unwrapped
* @return unwrapped generic, or original if provided {@code type} is not of type {@link Future}
*/
protected Type unwrapIfTypeFuture(Type type) {
Type futureType = TypeReflectionUtils.getExactSuperType(type, Future.class);
if (futureType instanceof ParameterizedType) {
Type[] actualTypeArguments = ((ParameterizedType) futureType).getActualTypeArguments();
Expand Down
Expand Up @@ -35,7 +35,7 @@ public InstanceResponseType(Class<R> expectedResponseType) {
*/
@Override
public boolean matches(Type responseType) {
Type unwrapped = unwrapFuture(responseType);
Type unwrapped = unwrapIfTypeFuture(responseType);
return isGenericAssignableFrom(unwrapped) || isAssignableFrom(unwrapped);
}

Expand Down
Expand Up @@ -56,7 +56,7 @@ public MultipleInstancesResponseType(Class<R> expectedCollectionGenericType) {
*/
@Override
public boolean matches(Type responseType) {
Type unwrapped = unwrapFuture(responseType);
Type unwrapped = unwrapIfTypeFuture(responseType);
return isIterableOfExpectedType(unwrapped) ||
isStreamOfExpectedType(unwrapped) ||
isGenericArrayOfExpectedType(unwrapped) ||
Expand Down
Expand Up @@ -27,12 +27,14 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

/**
* Tests for different types of queries hitting query handlers with Future as a response type.
Expand All @@ -41,6 +43,8 @@
*/
public class FutureAsResponseTypeToQueryHandlersTest {

private static final int FUTURE_RESOLVING_TIMEOUT = 500;

private final SimpleQueryBus queryBus = new SimpleQueryBus();
private final MyQueryHandler myQueryHandler = new MyQueryHandler();
private final AnnotationQueryHandlerAdapter<MyQueryHandler> annotationQueryHandlerAdapter = new AnnotationQueryHandlerAdapter<>(
Expand Down Expand Up @@ -77,7 +81,7 @@ public void testScatterGatherQueryWithMultipleResponses() {
"criteria", "myQueryWithMultipleResponses", ResponseTypes.multipleInstancesOf(String.class));

List<String> response = queryBus
.scatterGather(queryMessage, 10, TimeUnit.MILLISECONDS)
.scatterGather(queryMessage, FUTURE_RESOLVING_TIMEOUT + 100, TimeUnit.MILLISECONDS)
.map(Message::getPayload)
.flatMap(Collection::stream)
.collect(Collectors.toList());
Expand All @@ -91,7 +95,7 @@ public void testScatterGatherQueryWithSingleResponse() {
"criteria", "myQueryWithSingleResponse", ResponseTypes.instanceOf(String.class));

String response = queryBus
.scatterGather(queryMessage, 10, TimeUnit.MILLISECONDS)
.scatterGather(queryMessage, FUTURE_RESOLVING_TIMEOUT + 100, TimeUnit.MILLISECONDS)
.map(Message::getPayload)
.findFirst()
.orElse(null);
Expand All @@ -100,7 +104,7 @@ public void testScatterGatherQueryWithSingleResponse() {
}

@Test
public void testSubscriptionQueryWithMultipleResponses() {
public void testSubscriptionQueryWithMultipleResponses() throws InterruptedException {
SubscriptionQueryMessage<String, List<String>, String> queryMessage = new GenericSubscriptionQueryMessage<>(
"criteria",
"myQueryWithMultipleResponses",
Expand All @@ -111,13 +115,15 @@ public void testSubscriptionQueryWithMultipleResponses() {

Registration registration = queryBus.subscriptionQuery(queryMessage, updateHandler);

Thread.sleep(FUTURE_RESOLVING_TIMEOUT + 100); // wait for future to resolve

verify(updateHandler).onInitialResult(Arrays.asList("Response1", "Response2"));

registration.close();
}

@Test
public void testSubscriptionQueryWithSingleResponse() {
public void testSubscriptionQueryWithSingleResponse() throws InterruptedException {
SubscriptionQueryMessage<String, String, String> queryMessage = new GenericSubscriptionQueryMessage<>(
"criteria",
"myQueryWithSingleResponse",
Expand All @@ -128,6 +134,8 @@ public void testSubscriptionQueryWithSingleResponse() {

Registration registration = queryBus.subscriptionQuery(queryMessage, updateHandler);

Thread.sleep(FUTURE_RESOLVING_TIMEOUT + 100); // wait for future to resolve

verify(updateHandler).onInitialResult("Response");

registration.close();
Expand All @@ -136,17 +144,23 @@ public void testSubscriptionQueryWithSingleResponse() {
@SuppressWarnings("unused")
private static class MyQueryHandler {

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

@QueryHandler(queryName = "myQueryWithMultipleResponses")
public CompletableFuture<List<String>> queryHandler1(String criteria) {
CompletableFuture<List<String>> completableFuture = new CompletableFuture<>();
completableFuture.complete(Arrays.asList("Response1", "Response2"));
executor.schedule(() -> completableFuture.complete(Arrays.asList("Response1", "Response2")),
FUTURE_RESOLVING_TIMEOUT,
TimeUnit.MILLISECONDS);
return completableFuture;
}

@QueryHandler(queryName = "myQueryWithSingleResponse")
public CompletableFuture<String> queryHandler2(String criteria) {
public Future<String> queryHandler2(String criteria) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
completableFuture.complete("Response");
executor.schedule(() -> completableFuture.complete("Response"),
FUTURE_RESOLVING_TIMEOUT,
TimeUnit.MILLISECONDS);
return completableFuture;
}
}
Expand Down

0 comments on commit b0c1917

Please sign in to comment.