Skip to content

Commit

Permalink
chore(atomix): fix multiple local subscriptions on same topic
Browse files Browse the repository at this point in the history
  • Loading branch information
deepthidevaki committed Apr 29, 2020
1 parent 68df381 commit 931101f
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ void remove(final InternalSubscription subscription) {
/** Internal topic. */
private class InternalTopic {
private final String topic;
private final InternalSubscriber subscribers = new InternalSubscriber();
private final InternalSubscriber localSubscribers = new InternalSubscriber();
private final Set<MemberId> subscriptions = Sets.newCopyOnWriteArraySet();

InternalTopic(final String topic) {
Expand All @@ -313,7 +313,7 @@ private class InternalTopic {
* @return the local subscriber for the topic
*/
InternalSubscriber localSubscriber() {
return subscribers;
return localSubscribers;
}

/**
Expand Down Expand Up @@ -390,11 +390,13 @@ <M> CompletableFuture<Subscription> subscribe(
*
* @param subscription the subscription to register
*/
private CompletableFuture<Subscription> addLocalSubscription(
private synchronized CompletableFuture<Subscription> addLocalSubscription(
final InternalSubscription subscription) {
subscribers.add(subscription);
if (localSubscribers.subscriptions.isEmpty()) {
messagingService.registerHandler(subscription.topic(), localSubscribers);
}
localSubscribers.add(subscription);
subscriptions.add(localMemberId);
messagingService.registerHandler(subscription.topic(), subscribers);
return updateNodes().thenApply(v -> subscription);
}

Expand All @@ -403,11 +405,11 @@ private CompletableFuture<Subscription> addLocalSubscription(
*
* @param subscription the subscription to unregister
*/
private CompletableFuture<Void> removeLocalSubscription(
private synchronized CompletableFuture<Void> removeLocalSubscription(
final InternalSubscription subscription) {
subscribers.remove(subscription);
subscriptions.remove(localMemberId);
if (subscriptions.isEmpty()) {
localSubscribers.remove(subscription);
if (localSubscribers.subscriptions.isEmpty()) {
subscriptions.remove(localMemberId);
messagingService.unregisterHandler(subscription.topic());
}
return updateNodes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,95 @@ public void shouldSubscribeMultipleTopics() throws InterruptedException {
tearDown();
}

@Test
public void shouldBroadcastToMultipleLocalSubscriptionsForSameTopic()
throws InterruptedException {
// given
this.membersDiscovered = new CountDownLatch(0);
final Collection<Node> bootstrapLocations = buildBootstrapNodes(1);
final ClusterEventService eventService1 = buildServices(1, bootstrapLocations);

final Set<Integer> events = new CopyOnWriteArraySet<>();
final CountDownLatch latch = new CountDownLatch(2);

final String topic = "test-topic1";

eventService1
.<String>subscribe(
topic,
SERIALIZER::decode,
message -> {
assertEquals("Hello world!", message);
events.add(1);
latch.countDown();
},
MoreExecutors.directExecutor())
.join();

eventService1
.<String>subscribe(
topic,
SERIALIZER::decode,
message -> {
assertEquals("Hello world!", message);
events.add(2);
latch.countDown();
},
MoreExecutors.directExecutor())
.join();

// when
eventService1.broadcast(topic, "Hello world!", SERIALIZER::encode);

// then
assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
assertEquals(2, events.size());
assertThat(events).containsExactlyInAnyOrder(1, 2);

tearDown();
}

@Test
public void shouldNotCloseOtherSubscriptions() throws InterruptedException {
// given
this.membersDiscovered = new CountDownLatch(0);
final Collection<Node> bootstrapLocations = buildBootstrapNodes(1);
final ClusterEventService eventService1 = buildServices(1, bootstrapLocations);

final Set<Integer> events = new CopyOnWriteArraySet<>();
final CountDownLatch latch = new CountDownLatch(1);

final String topic = "test-topic1";

final var subscriptionToClose =
eventService1
.<String>subscribe(
topic, SERIALIZER::decode, message -> {}, MoreExecutors.directExecutor())
.join();

eventService1
.<String>subscribe(
topic,
SERIALIZER::decode,
message -> {
assertEquals("Hello world!", message);
events.add(2);
latch.countDown();
},
MoreExecutors.directExecutor())
.join();

// when
subscriptionToClose.close().join();
eventService1.broadcast(topic, "Hello world!", SERIALIZER::encode);

// then
assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
assertEquals(1, events.size());

tearDown();
}

@Test
public void shouldBroadcastAfterRestart() throws InterruptedException {
// given
Expand Down

0 comments on commit 931101f

Please sign in to comment.