Skip to content

Commit

Permalink
Worked out emitting a null update.
Browse files Browse the repository at this point in the history
  • Loading branch information
m1l4n54v1c committed Jun 26, 2018
1 parent 35b3a07 commit 4095202
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 21 deletions.
Expand Up @@ -36,21 +36,50 @@ public class GenericSubscriptionQueryUpdateMessage<U> extends MessageDecorator<U
private static final long serialVersionUID = 5872479410321475147L; private static final long serialVersionUID = 5872479410321475147L;


/** /**
* Creates {@link GenericSubscriptionQueryUpdateMessage} from provided object which represents incremental update. * Creates {@link GenericSubscriptionQueryUpdateMessage} from provided {@code payload} which represents incremental
* update. The provided {@code payload} may not be {@code null}, for nullable payloads use {@link
* #asNullableUpdateMessage(Class, Object)}.
* *
* @param o incremental update * @param payload incremental update
* @param <T> type of the {@link GenericSubscriptionQueryUpdateMessage} * @param <T> type of the {@link GenericSubscriptionQueryUpdateMessage}
* @return created message * @return created message
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> SubscriptionQueryUpdateMessage<T> from(Object o) { public static <T> SubscriptionQueryUpdateMessage<T> asUpdateMessage(Object payload) {
if (SubscriptionQueryUpdateMessage.class.isInstance(o)) { SubscriptionQueryUpdateMessage<T> message = extractMessage(payload);

This comment has been minimized.

Copy link
@abuijze

abuijze Jun 26, 2018

Member

Why not assert that the value is non-null, and then invoke the nullable version?

This comment has been minimized.

Copy link
@m1l4n54v1c

m1l4n54v1c Jun 26, 2018

Author Member

We don't have the type of the payload?

return (SubscriptionQueryUpdateMessage<T>) o; if (message != null) {
} else if (o instanceof Message) { return message;
Message message = (Message) o; }
return new GenericSubscriptionQueryUpdateMessage<>((T) payload);
}

/**
* Creates {@link GenericSubscriptionQueryUpdateMessage} based on provided {@code declaredType} and {@code payload}
* representing the payload. The payload can be {@code null}.
*
* @param declaredType the type of the payload
* @param payload the payload - incremental update
* @param <T> the type of the payload
* @return created message
*/
@SuppressWarnings("unchecked")
public static <T> SubscriptionQueryUpdateMessage<T> asNullableUpdateMessage(Class<T> declaredType, Object payload) {
SubscriptionQueryUpdateMessage<T> message = extractMessage(payload);
if (message != null) {
return message;
}
return new GenericSubscriptionQueryUpdateMessage<>(declaredType, (T) payload);
}

@SuppressWarnings("unchecked")
private static <T> SubscriptionQueryUpdateMessage<T> extractMessage(Object payload) {
if (SubscriptionQueryUpdateMessage.class.isInstance(payload)) {
return (SubscriptionQueryUpdateMessage<T>) payload;
} else if (payload instanceof Message) {
Message message = (Message) payload;
return new GenericSubscriptionQueryUpdateMessage<>(message); return new GenericSubscriptionQueryUpdateMessage<>(message);
} }
return new GenericSubscriptionQueryUpdateMessage<>((T) o); return null;
} }


/** /**
Expand All @@ -62,6 +91,29 @@ public GenericSubscriptionQueryUpdateMessage(U payload) {
this(new GenericMessage<>(payload, MetaData.emptyInstance())); this(new GenericMessage<>(payload, MetaData.emptyInstance()));
} }


/**
* Initializes {@link GenericSubscriptionQueryUpdateMessage} with incremental update of provided {@code
* declaredType}.
*
* @param declaredType the type of the update
* @param payload the payload of the update
*/
public GenericSubscriptionQueryUpdateMessage(Class<U> declaredType, U payload) {
this(declaredType, payload, MetaData.emptyInstance());
}

/**
* Initializes {@link GenericSubscriptionQueryUpdateMessage} with incremental update of provided {@code
* declaredType} and {@code metaData}.
*
* @param declaredType the type of the update
* @param payload the payload of the update
* @param metaData the metadata of the update
*/
public GenericSubscriptionQueryUpdateMessage(Class<U> declaredType, U payload, Map<String, ?> metaData) {
super(new GenericMessage<>(declaredType, payload, metaData));
}

/** /**
* Initializes a new decorator with given {@code delegate} message. The decorator delegates to the delegate for * Initializes a new decorator with given {@code delegate} message. The decorator delegates to the delegate for
* the message's payload, metadata and identifier. * the message's payload, metadata and identifier.
Expand Down
Expand Up @@ -37,14 +37,18 @@ public interface QueryUpdateEmitter {
<U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter, SubscriptionQueryUpdateMessage<U> update); <U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter, SubscriptionQueryUpdateMessage<U> update);


/** /**
* Emits given incremental update to subscription queries matching given filter. * Emits given incremental update to subscription queries matching given filter. If an {@code update} is {@code
* null}, emit will be skipped. In order to send nullable updates, use {@link #emit(Class, Predicate,
* SubscriptionQueryUpdateMessage)} or {@link #emit(Predicate, SubscriptionQueryUpdateMessage)} methods.
* *
* @param filter predicate on subscription query message used to filter subscription queries * @param filter predicate on subscription query message used to filter subscription queries
* @param update incremental update * @param update incremental update
* @param <U> the type of the update * @param <U> the type of the update
*/ */
default <U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter, U update) { default <U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter, U update) {
emit(filter, GenericSubscriptionQueryUpdateMessage.from(update)); if (update != null) {
emit(filter, GenericSubscriptionQueryUpdateMessage.asUpdateMessage(update));
}
} }


/** /**
Expand All @@ -65,7 +69,9 @@ default <Q, U> void emit(Class<Q> queryType, Predicate<? super Q> filter,
} }


/** /**
* Emits given incremental update to subscription queries matching given query type and filter. * Emits given incremental update to subscription queries matching given query type and filter. If an {@code update}
* is {@code null}, emit will be skipped. In order to send nullable updates, use {@link #emit(Class, Predicate,
* SubscriptionQueryUpdateMessage)} or {@link #emit(Predicate, SubscriptionQueryUpdateMessage)} methods.
* *
* @param queryType the type of the query * @param queryType the type of the query
* @param filter predicate on query payload used to filter subscription queries * @param filter predicate on query payload used to filter subscription queries
Expand All @@ -74,7 +80,9 @@ default <Q, U> void emit(Class<Q> queryType, Predicate<? super Q> filter,
* @param <U> the type of the update * @param <U> the type of the update
*/ */
default <Q, U> void emit(Class<Q> queryType, Predicate<? super Q> filter, U update) { default <Q, U> void emit(Class<Q> queryType, Predicate<? super Q> filter, U update) {
emit(queryType, filter, GenericSubscriptionQueryUpdateMessage.from(update)); if (update != null) {
emit(queryType, filter, GenericSubscriptionQueryUpdateMessage.asUpdateMessage(update));
}
} }


/** /**
Expand Down
Expand Up @@ -78,7 +78,7 @@ public void testEmittingAnUpdate() {
chatQueryHandler.emitter.complete(String.class, "axonFrameworkCR"::equals); chatQueryHandler.emitter.complete(String.class, "axonFrameworkCR"::equals);
chatQueryHandler.emitter.emit(String.class, chatQueryHandler.emitter.emit(String.class,
"axonFrameworkCR"::equals, "axonFrameworkCR"::equals,
GenericSubscriptionQueryUpdateMessage.from("Update12")); GenericSubscriptionQueryUpdateMessage.asUpdateMessage("Update12"));
StepVerifier.create(result1.initialResult().map(Message::getPayload)) StepVerifier.create(result1.initialResult().map(Message::getPayload))
.expectNext(Arrays.asList("Message1", "Message2", "Message3")) .expectNext(Arrays.asList("Message1", "Message2", "Message3"))
.expectComplete() .expectComplete()
Expand All @@ -88,7 +88,9 @@ public void testEmittingAnUpdate() {
.expectComplete() .expectComplete()
.verify(); .verify();


chatQueryHandler.emitter.emit(Integer.class, m -> m == 5, GenericSubscriptionQueryUpdateMessage.from(1)); chatQueryHandler.emitter.emit(Integer.class,
m -> m == 5,
GenericSubscriptionQueryUpdateMessage.asUpdateMessage(1));
chatQueryHandler.emitter.complete(Integer.class, m -> m == 5); chatQueryHandler.emitter.complete(Integer.class, m -> m == 5);
chatQueryHandler.emitter.emit(Integer.class, m -> m == 5, 2); chatQueryHandler.emitter.emit(Integer.class, m -> m == 5, 2);
StepVerifier.create(result2.initialResult().map(Message::getPayload)) StepVerifier.create(result2.initialResult().map(Message::getPayload))
Expand All @@ -99,6 +101,31 @@ public void testEmittingAnUpdate() {
.verifyComplete(); .verifyComplete();
} }


@Test
public void testEmittingNullUpdate() {
// given
SubscriptionQueryMessage<String, List<String>, String> queryMessage = new GenericSubscriptionQueryMessage<>(
"axonFrameworkCR",
"chatMessages",
ResponseTypes.multipleInstancesOf(String.class),
ResponseTypes.instanceOf(String.class));

// when
SubscriptionQueryResult<QueryResponseMessage<List<String>>, SubscriptionQueryUpdateMessage<String>> result = queryBus
.subscriptionQuery(queryMessage);

// then
chatQueryHandler.emitter.emit(String.class,
"axonFrameworkCR"::equals,
GenericSubscriptionQueryUpdateMessage
.asNullableUpdateMessage(String.class, null));
chatQueryHandler.emitter.complete(String.class, "axonFrameworkCR"::equals);

StepVerifier.create(result.updates())
.expectNextMatches(m -> m.getPayload() == null)
.verifyComplete();
}

@Test @Test
public void testCompletingSubscriptionQueryExceptionally() { public void testCompletingSubscriptionQueryExceptionally() {
// given // given
Expand Down Expand Up @@ -231,9 +258,38 @@ public void testSeveralSubscriptions() {


assertEquals(Arrays.asList("Message1", "Message2", "Message3"), initial1); assertEquals(Arrays.asList("Message1", "Message2", "Message3"), initial1);
assertEquals(Arrays.asList("Message1", "Message2", "Message3"), initial2); assertEquals(Arrays.asList("Message1", "Message2", "Message3"), initial2);
assertEquals(Arrays.asList("Update1", "Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), update1); assertEquals(Arrays.asList("Update1",
assertEquals(Arrays.asList("Update1", "Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), update2); "Update2",
assertEquals(Arrays.asList("Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), update3); "Update3",
"Update4",
"Update5",
"Update6",
"Update7",
"Update8",
"Update9",
"Update10",
"Update11"), update1);
assertEquals(Arrays.asList("Update1",
"Update2",
"Update3",
"Update4",
"Update5",
"Update6",
"Update7",
"Update8",
"Update9",
"Update10",
"Update11"), update2);
assertEquals(Arrays.asList("Update2",
"Update3",
"Update4",
"Update5",
"Update6",
"Update7",
"Update8",
"Update9",
"Update10",
"Update11"), update3);
} }


@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
Expand All @@ -259,7 +315,9 @@ public void testBufferOverflow() {
ResponseTypes.instanceOf(String.class)); ResponseTypes.instanceOf(String.class));


SubscriptionQueryResult<QueryResponseMessage<List<String>>, SubscriptionQueryUpdateMessage<String>> result = queryBus SubscriptionQueryResult<QueryResponseMessage<List<String>>, SubscriptionQueryUpdateMessage<String>> result = queryBus
.subscriptionQuery(queryMessage, new SubscriptionQueryBackpressure(FluxSink.OverflowStrategy.ERROR), 200); .subscriptionQuery(queryMessage,
new SubscriptionQueryBackpressure(FluxSink.OverflowStrategy.ERROR),
200);


for (int i = 0; i < 201; i++) { for (int i = 0; i < 201; i++) {
chatQueryHandler.emitter.emit(String.class, "axonFrameworkCR"::equals, "Update" + i); chatQueryHandler.emitter.emit(String.class, "axonFrameworkCR"::equals, "Update" + i);
Expand Down Expand Up @@ -376,10 +434,10 @@ public String emitFirstThenReturnInitial(String criteria) throws InterruptedExce
Executors.newSingleThreadExecutor().submit(() -> { Executors.newSingleThreadExecutor().submit(() -> {
emitter.emit(String.class, emitter.emit(String.class,
"axonFrameworkCR"::equals, "axonFrameworkCR"::equals,
GenericSubscriptionQueryUpdateMessage.from("Update1")); GenericSubscriptionQueryUpdateMessage.asUpdateMessage("Update1"));
emitter.emit(String.class, emitter.emit(String.class,
"axonFrameworkCR"::equals, "axonFrameworkCR"::equals,
GenericSubscriptionQueryUpdateMessage.from("Update2")); GenericSubscriptionQueryUpdateMessage.asUpdateMessage("Update2"));
emitter.complete(String.class, "axonFrameworkCR"::equals); emitter.complete(String.class, "axonFrameworkCR"::equals);
}); });


Expand Down

0 comments on commit 4095202

Please sign in to comment.