Skip to content

Commit

Permalink
MAILBOX-371 Add Concurrent Test for RabbitMQEventBus with Key Registr…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
hoangdat authored and chibenwa committed Jan 16, 2019
1 parent 9f3b7ec commit af95d7b
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 13 deletions.
Expand Up @@ -21,16 +21,22 @@

import static com.jayway.awaitility.Awaitility.await;
import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_2;
import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_3;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.jayway.awaitility.core.ConditionFactory;

public interface EventBusConcurrentTestContract {
Expand All @@ -42,6 +48,8 @@ public interface EventBusConcurrentTestContract {
int OPERATION_COUNT = 30;
int TOTAL_DISPATCH_OPERATIONS = THREAD_COUNT * OPERATION_COUNT;

Set<RegistrationKey> ALL_KEYS = ImmutableSet.of(KEY_1, KEY_2, KEY_3);

static EventBusTestFixture.MailboxListenerCountingSuccessfulExecution newCountingListener() {
return new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution();
}
Expand Down Expand Up @@ -75,10 +83,65 @@ default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithSingleE
.of(countingListener1, countingListener2, countingListener3)))
.isEqualTo(totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS));
}

@Test
default void concurrentDispatchKeyShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
eventBus().register(countingListener1, KEY_1);
eventBus().register(countingListener2, KEY_2);
eventBus().register(countingListener3, KEY_3);

int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
int totalEventBus = 1;

ConcurrentTestRunner.builder()
.operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);

AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList
.of(countingListener1, countingListener2, countingListener3)))
.isEqualTo(totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS));
}

@Test
default void concurrentDispatchShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();

eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
eventBus().register(countingListener2, new EventBusTestFixture.GroupB());
eventBus().register(countingListener3, new EventBusTestFixture.GroupC());

int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS;

eventBus().register(countingListener1, KEY_1);
eventBus().register(countingListener2, KEY_2);
eventBus().register(countingListener3, KEY_3);
int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS;

ConcurrentTestRunner.builder()
.operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);

AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList
.of(countingListener1, countingListener2, countingListener3)))
.isEqualTo(totalEventDeliveredGlobally + totalEventDeliveredByKeys));
}
}

interface MultiEventBusConcurrentContract extends EventBusContract.MultipleEventBusContract {

EventBus eventBus3();

@Test
default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
Expand All @@ -105,5 +168,69 @@ default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithMultipl
.of(countingListener1, countingListener2, countingListener3)))
.isEqualTo(totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS));
}

@Test
default void concurrentDispatchKeyShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();

eventBus().register(countingListener1, KEY_1);
eventBus().register(countingListener2, KEY_2);
eventBus().register(countingListener3, KEY_3);

eventBus2().register(countingListener1, KEY_1);
eventBus2().register(countingListener2, KEY_2);
eventBus2().register(countingListener3, KEY_3);

int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
int totalEventBus = 2; // eventBus1 + eventBus2

ConcurrentTestRunner.builder()
.operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);

AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList
.of(countingListener1, countingListener2, countingListener3)))
.isEqualTo(totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS));
}

@Test
default void concurrentDispatchShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();

eventBus2().register(countingListener1, GROUP_A);
eventBus2().register(countingListener2, new EventBusTestFixture.GroupB());
eventBus2().register(countingListener3, new EventBusTestFixture.GroupC());
int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS;

eventBus().register(countingListener1, KEY_1);
eventBus().register(countingListener2, KEY_2);

eventBus2().register(countingListener1, KEY_1);
eventBus2().register(countingListener2, KEY_2);

eventBus3().register(countingListener3, KEY_1);
eventBus3().register(countingListener3, KEY_2);

int totalKeyListenerRegistrations = 2; // KEY1 + KEY2
int totalEventBus = 3; // eventBus1 + eventBus2 + eventBus3
int totalEventDeliveredByKeys = totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS;

ConcurrentTestRunner.builder()
.operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);

AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList
.of(countingListener1, countingListener2, countingListener3)))
.isEqualTo(totalEventDeliveredGlobally + totalEventDeliveredByKeys));
}
}
}
Expand Up @@ -92,9 +92,11 @@ class GroupC extends Group {}
int FIVE_HUNDRED_MS = 500;
MailboxId ID_1 = TEST_ID;
MailboxId ID_2 = TestId.of(24);
MailboxId ID_3 = TestId.of(36);
ImmutableSet<RegistrationKey> NO_KEYS = ImmutableSet.of();
MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(ID_1);
MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
MailboxIdRegistrationKey KEY_3 = new MailboxIdRegistrationKey(ID_3);
List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class, GroupC.class);

GroupA GROUP_A = new GroupA();
Expand Down
Expand Up @@ -251,6 +251,9 @@ default void dispatchShouldNotBlockAsynchronousListener() {
default void failingRegisteredListenersShouldNotAbortRegisteredDelivery() {
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doCallRealMethod()
.when(listener).event(any());
eventBus().register(listener, KEY_1);
Expand Down
Expand Up @@ -116,6 +116,7 @@ static void afterAll() {

private RabbitMQEventBus eventBus;
private RabbitMQEventBus eventBus2;
private RabbitMQEventBus eventBus3;
private Sender sender;
private RabbitMQConnectionFactory connectionFactory;
private EventSerializer eventSerializer;
Expand All @@ -132,15 +133,18 @@ void setUp() {

eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
eventBus2 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
eventBus.start();
eventBus2.start();
eventBus3.start();
sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
}

@AfterEach
void tearDown() {
eventBus.stop();
eventBus2.stop();
eventBus3.stop();
ALL_GROUPS.stream()
.map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
.forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
Expand All @@ -158,6 +162,18 @@ public EventBus eventBus2() {
return eventBus2;
}

@Override
public EventBus eventBus3() {
return eventBus3;
}

@Override
@Test
@Disabled("This test is failing by RabbitMQEventBus exponential backoff is not implemented at this time")
public void failingRegisteredListenersShouldNotAbortRegisteredDelivery() {

}

@Override
@Test
@Disabled("This test is failing by design as the different registration keys are handled by distinct messages")
Expand Down Expand Up @@ -304,19 +320,6 @@ void registrationsShouldNotHandleEventsAfterStop() throws Exception {
@Nested
class MultiEventBus {

private RabbitMQEventBus eventBus3;

@BeforeEach
void setUp() {
eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
eventBus3.start();
}

@AfterEach
void tearDown() {
eventBus3.stop();
}

@Test
void multipleEventBusStartShouldCreateOnlyOneEventExchange() {
assertThat(rabbitManagementAPI.listExchanges())
Expand Down

0 comments on commit af95d7b

Please sign in to comment.