Skip to content

Commit

Permalink
Prevent multiple subscriptions of messages with the same id.
Browse files Browse the repository at this point in the history
  • Loading branch information
m1l4n54v1c committed Jun 14, 2018
1 parent b8f6843 commit 44e4091
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 24 deletions.
Expand Up @@ -75,7 +75,7 @@ public class SimpleQueryBus implements QueryBus, QueryUpdateEmitter {
private static final Logger logger = LoggerFactory.getLogger(SimpleQueryBus.class); private static final Logger logger = LoggerFactory.getLogger(SimpleQueryBus.class);


private final ConcurrentMap<String, CopyOnWriteArrayList<QuerySubscription>> subscriptions = new ConcurrentHashMap<>(); private final ConcurrentMap<String, CopyOnWriteArrayList<QuerySubscription>> subscriptions = new ConcurrentHashMap<>();
private final ConcurrentMap<SubscriptionQueryMessage<?, ?, ?>, CopyOnWriteArrayList<FluxSinkWrapper<?>>> updateHandlers = new ConcurrentHashMap<>(); private final ConcurrentMap<SubscriptionQueryMessage<?, ?, ?>, FluxSinkWrapper<?>> updateHandlers = new ConcurrentHashMap<>();
private final MessageMonitor<? super QueryMessage<?, ?>> messageMonitor; private final MessageMonitor<? super QueryMessage<?, ?>> messageMonitor;
private final MessageMonitor<? super SubscriptionQueryUpdateMessage<?>> updateMessageMonitor; private final MessageMonitor<? super SubscriptionQueryUpdateMessage<?>> updateMessageMonitor;
private final QueryInvocationErrorHandler errorHandler; private final QueryInvocationErrorHandler errorHandler;
Expand Down Expand Up @@ -229,6 +229,12 @@ public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQu
SubscriptionQueryMessage<Q, I, U> query, SubscriptionQueryMessage<Q, I, U> query,
SubscriptionQueryBackpressure backpressure, SubscriptionQueryBackpressure backpressure,
int updateBufferSize) { int updateBufferSize) {
boolean alreadyExists = updateHandlers.keySet()
.stream()
.anyMatch(m -> m.getIdentifier().equals(query.getIdentifier()));
if(alreadyExists) {
throw new IllegalArgumentException("There is already a subscription with the same message identifier");
}


MonoWrapper<QueryResponseMessage<I>> initialResult = MonoWrapper.create(monoSink -> query(query) MonoWrapper<QueryResponseMessage<I>> initialResult = MonoWrapper.create(monoSink -> query(query)
.thenAccept(monoSink::success) .thenAccept(monoSink::success)
Expand All @@ -240,8 +246,7 @@ public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQu
EmitterProcessor<SubscriptionQueryUpdateMessage<U>> processor = EmitterProcessor.create(updateBufferSize); EmitterProcessor<SubscriptionQueryUpdateMessage<U>> processor = EmitterProcessor.create(updateBufferSize);
FluxSink<SubscriptionQueryUpdateMessage<U>> sink = processor.sink(backpressure.getOverflowStrategy()); FluxSink<SubscriptionQueryUpdateMessage<U>> sink = processor.sink(backpressure.getOverflowStrategy());
sink.onDispose(() -> updateHandlers.remove(query)); sink.onDispose(() -> updateHandlers.remove(query));
updateHandlers.computeIfAbsent(query, k -> new CopyOnWriteArrayList<>()) updateHandlers.put(query, new FluxSinkWrapper<>(sink));
.add(new FluxSinkWrapper<>(sink));


return new DefaultSubscriptionQueryResult<>(initialResult.getMono(), return new DefaultSubscriptionQueryResult<>(initialResult.getMono(),
processor.replay(updateBufferSize).autoConnect()); processor.replay(updateBufferSize).autoConnect());
Expand All @@ -255,8 +260,7 @@ public <U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter,
.stream() .stream()
.filter(sqm -> filter.test((SubscriptionQueryMessage<?, ?, U>) sqm)) .filter(sqm -> filter.test((SubscriptionQueryMessage<?, ?, U>) sqm))
.forEach(query -> Optional.ofNullable(updateHandlers.get(query)) .forEach(query -> Optional.ofNullable(updateHandlers.get(query))
.ifPresent(handlers -> handlers .ifPresent(uh -> doEmit(query, uh, update)));
.forEach(uh -> doEmit(query, uh, update))));
} }


@Override @Override
Expand All @@ -266,7 +270,6 @@ public void complete(Predicate<SubscriptionQueryMessage<?, ?, ?>> filter) {
.filter(filter) .filter(filter)
.map(updateHandlers::remove) .map(updateHandlers::remove)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.flatMap(List::stream)
.forEach(FluxSinkWrapper::complete); .forEach(FluxSinkWrapper::complete);
} }


Expand All @@ -277,7 +280,6 @@ public void completeExceptionally(Predicate<SubscriptionQueryMessage<?, ?, ?>> f
.filter(filter) .filter(filter)
.map(updateHandlers::remove) .map(updateHandlers::remove)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.flatMap(List::stream)
.forEach(uh -> uh.error(cause)); .forEach(uh -> uh.error(cause));
} }


Expand Down
Expand Up @@ -207,7 +207,7 @@ public void testSeveralSubscriptions() {
assertEquals(Arrays.asList("Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), update3); assertEquals(Arrays.asList("Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), update3);
} }


@Test @Test(expected = IllegalArgumentException.class)
public void testDoubleSubscriptionMessage() { public void testDoubleSubscriptionMessage() {
// given // given
SubscriptionQueryMessage<String, List<String>, String> queryMessage = new GenericSubscriptionQueryMessage<>( SubscriptionQueryMessage<String, List<String>, String> queryMessage = new GenericSubscriptionQueryMessage<>(
Expand All @@ -217,22 +217,8 @@ public void testDoubleSubscriptionMessage() {
ResponseTypes.instanceOf(String.class)); ResponseTypes.instanceOf(String.class));


// when // when
SubscriptionQueryResult<QueryResponseMessage<List<String>>, SubscriptionQueryUpdateMessage<String>> result1 = queryBus queryBus.subscriptionQuery(queryMessage);
.subscriptionQuery(queryMessage); queryBus.subscriptionQuery(queryMessage);
SubscriptionQueryResult<QueryResponseMessage<List<String>>, SubscriptionQueryUpdateMessage<String>> result2 = queryBus
.subscriptionQuery(queryMessage);
List<String> update1 = new ArrayList<>();
List<String> update2 = new ArrayList<>();
result1.updates().map(Message::getPayload).subscribe(update1::add);
result2.updates().map(Message::getPayload).subscribe(update2::add);
chatQueryHandler.emitter.emit(String.class, "axonFrameworkCR"::equals, "Update1");
chatQueryHandler.emitter.emit(String.class, "axonFrameworkCR"::equals, "Update2");
chatQueryHandler.emitter.emit(String.class, "axonFrameworkCR"::equals, "Update3");
chatQueryHandler.emitter.emit(String.class, "axonFrameworkCR"::equals, "Update4");
chatQueryHandler.emitter.complete(String.class, "axonFrameworkCR"::equals);

assertEquals(Arrays.asList("Update1", "Update2", "Update3", "Update4"), update1);
assertEquals(Arrays.asList("Update1", "Update2", "Update3", "Update4"), update2);
} }


@Test @Test
Expand Down

0 comments on commit 44e4091

Please sign in to comment.