Skip to content

Commit

Permalink
Changed the API for subscription queries. QueryUpdateEmitter is no lo…
Browse files Browse the repository at this point in the history
…nger injected via ParameterResolver.
  • Loading branch information
m1l4n54v1c committed Apr 2, 2018
1 parent 8afce01 commit 74b9db3
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 749 deletions.

This file was deleted.

23 changes: 2 additions & 21 deletions core/src/main/java/org/axonframework/queryhandling/QueryBus.java
Expand Up @@ -47,26 +47,6 @@ public interface QueryBus {
*/
<R> Registration subscribe(String queryName, Type responseType, MessageHandler<? super QueryMessage<?, R>> handler);

/**
* Subscribe the given {@code handler} to queries with given {@code queryName}, {@code initialResponseType} and
* {@code updateResponseType}.
* <p>
* If during emitting of incremental updates there is no {@code updateHandler}, {@link
* NoUpdateHandlerForEmitterException} will be thrown.
*
* @param queryName the name of the query request to subscribe
* @param initialResponseType the type of initial response the subscribed component answers with
* @param updateResponseType the type of incremental responses the subscribed component answers with
* @param handler a handler that implements the query
* @param <I> the type of initial response
* @param <U> the type of incremental responses
* @return a handle to un-subscribe the query handler
*/
<I, U> Registration subscribe(String queryName,
Type initialResponseType,
Type updateResponseType,
SubscriptionQueryMessageHandler<? super QueryMessage<?, I>, I, U> handler);

/**
* Dispatch the given {@code query} to a single QueryHandler subscribed to the given {@code query}'s queryName
* and responseType. This method returns all values returned by the Query Handler as a Collection. This may or may
Expand Down Expand Up @@ -118,5 +98,6 @@ <I, U> Registration subscribe(String queryName,
* @param <U> the incremental response types of the query
* @return a handle to un-subscribe {@code updateHandler}
*/
<Q, I, U> Registration subscriptionQuery(SubscriptionQueryMessage<Q, I, U> query, UpdateHandler<I, U> updateHandler);
<Q, I, U> Registration subscriptionQuery(SubscriptionQueryMessage<Q, I, U> query,
UpdateHandler<I, U> updateHandler);
}
Expand Up @@ -16,40 +16,114 @@

package org.axonframework.queryhandling;

import java.util.function.Function;
import java.util.function.Predicate;

/**
* Emitter used on query handling side in order to emit incremental updates on query side.
* Component which informs subscription queries about updates, errors and when there are no more updates.
*
* @param <U> the type of incremental updates
* @author Milan Savic
* @see UpdateHandler
* @since 3.3
*/
public interface QueryUpdateEmitter<U> {
public interface QueryUpdateEmitter {

/**
* Emits incremental update (as return value of provided update function) to subscription queries matching given
* filter.
*
* @param filter predicate on subscription query message used to filter subscription queries
* @param update function which returns incremental update
* @param <U> the type of the update
*/
<U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter,
Function<SubscriptionQueryMessage<?, ?, U>, U> update);

/**
* Emits given incremental update to subscription queries matching given filter.
*
* @param filter predicate on subscription query message used to filter subscription queries
* @param update incremental update
* @param <U> the type of the update
*/
default <U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter, U update) {
emit(filter, (Function<SubscriptionQueryMessage<?, ?, U>, U>) q -> update);
}

/**
* Emits incremental update (as return value of provided update function) to subscription queries matching given
* query type and filter.
*
* @param queryType the type of the query
* @param filter predicate on query payload used to filter subscription queries
* @param update function which returns incremental update
* @param <Q> the type of the query
* @param <U> the type of the update
*/
@SuppressWarnings("unchecked")
default <Q, U> void emit(Class<Q> queryType, Predicate<Q> filter, Function<Q, U> update) {
Predicate<SubscriptionQueryMessage<?, ?, U>> sqmFilter =
m -> m.getPayloadType().equals(queryType) && filter.test((Q) m.getPayload());

Function<SubscriptionQueryMessage<?, ?, U>, U> sqmUpdate = q -> update.apply((Q) q.getPayload());

emit(sqmFilter, sqmUpdate);
}

/**
* Emits a single update.
* Emits given incremental update to subscription queries matching given query type and filter.
*
* @param update the update
* @return {@code true} if emit was successful
* @param queryType the type of the query
* @param filter predicate on query payload used to filter subscription queries
* @param update incremental update
* @param <Q> the type of the query
* @param <U> the type of the update
*/
boolean emit(U update);
default <Q, U> void emit(Class<Q> queryType, Predicate<Q> filter, U update) {
emit(queryType, filter, q -> update);
}

/**
* Informs query side that there are no more updates.
* Completes subscription queries matching given filter.
*
* @param filter predicate on subscription query message used to filter subscription queries
*/
void complete(Predicate<SubscriptionQueryMessage<?, ?, ?>> filter);

/**
* Completes subscription queries matching given query type and filter.
*
* @param queryType the type of the query
* @param filter predicate on query payload used to filter subscription queries
* @param <Q> the type of the query
*/
void complete();
@SuppressWarnings("unchecked")
default <Q> void complete(Class<Q> queryType, Predicate<Q> filter) {
Predicate<SubscriptionQueryMessage<?, ?, ?>> sqmFilter =
m -> m.getPayloadType().equals(queryType) && filter.test((Q) m.getPayload());
complete(sqmFilter);
}

/**
* Informs query side that error occurred.
* Completes with an error subscription queries matching given filter.
*
* @param error the error
* @param filter predicate on subscription query message used to filter subscription queries
* @param cause the cause of an error
*/
void error(Throwable error);
void completeExceptionally(Predicate<SubscriptionQueryMessage<?, ?, ?>> filter, Throwable cause);

/**
* Registers a handler to be invoked when query cancels the registration.
* Completes with an error subscription queries matching given query type and filter
*
* @param r the handler to be invoked
* @param queryType the type of the query
* @param filter predicate on query payload used to filter subscription queries
* @param cause the cause of an error
* @param <Q> the type of the query
*/
void onRegistrationCanceled(Runnable r);
@SuppressWarnings("unchecked")
default <Q> void completeExceptionally(Class<Q> queryType, Predicate<Q> filter, Throwable cause) {
Predicate<SubscriptionQueryMessage<?, ?, ?>> sqmFilter =
m -> m.getPayloadType().equals(queryType) && filter.test((Q) m.getPayload());
completeExceptionally(sqmFilter, cause);
}
}

This file was deleted.

0 comments on commit 74b9db3

Please sign in to comment.