diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java index 07466126961..2f6e9cb1d3d 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java @@ -21,16 +21,22 @@ import static com.jayway.awaitility.Awaitility.await; import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT; +import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1; +import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_2; +import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_3; import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS; +import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A; import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.jayway.awaitility.core.ConditionFactory; public interface EventBusConcurrentTestContract { @@ -42,6 +48,8 @@ public interface EventBusConcurrentTestContract { int OPERATION_COUNT = 30; int TOTAL_DISPATCH_OPERATIONS = THREAD_COUNT * OPERATION_COUNT; + Set ALL_KEYS = ImmutableSet.of(KEY_1, KEY_2, KEY_3); + static EventBusTestFixture.MailboxListenerCountingSuccessfulExecution newCountingListener() { return new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution(); } @@ -75,10 +83,65 @@ default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithSingleE .of(countingListener1, countingListener2, countingListener3))) .isEqualTo(totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS)); } + + @Test + default void concurrentDispatchKeyShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception { + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener(); + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener(); + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener(); + eventBus().register(countingListener1, KEY_1); + eventBus().register(countingListener2, KEY_2); + eventBus().register(countingListener3, KEY_3); + + int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3 + int totalEventBus = 1; + + ConcurrentTestRunner.builder() + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS)) + .threadCount(THREAD_COUNT) + .operationCount(OPERATION_COUNT) + .runSuccessfullyWithin(FIVE_SECONDS); + + AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList + .of(countingListener1, countingListener2, countingListener3))) + .isEqualTo(totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS)); + } + + @Test + default void concurrentDispatchShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception { + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener(); + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener(); + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener(); + + eventBus().register(countingListener1, new EventBusTestFixture.GroupA()); + eventBus().register(countingListener2, new EventBusTestFixture.GroupB()); + eventBus().register(countingListener3, new EventBusTestFixture.GroupC()); + + int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC + int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS; + + eventBus().register(countingListener1, KEY_1); + eventBus().register(countingListener2, KEY_2); + eventBus().register(countingListener3, KEY_3); + int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3 + int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS; + + ConcurrentTestRunner.builder() + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS)) + .threadCount(THREAD_COUNT) + .operationCount(OPERATION_COUNT) + .runSuccessfullyWithin(FIVE_SECONDS); + + AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList + .of(countingListener1, countingListener2, countingListener3))) + .isEqualTo(totalEventDeliveredGlobally + totalEventDeliveredByKeys)); + } } interface MultiEventBusConcurrentContract extends EventBusContract.MultipleEventBusContract { + EventBus eventBus3(); + @Test default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception { EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener(); @@ -105,5 +168,69 @@ default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithMultipl .of(countingListener1, countingListener2, countingListener3))) .isEqualTo(totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS)); } + + @Test + default void concurrentDispatchKeyShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception { + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener(); + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener(); + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener(); + + eventBus().register(countingListener1, KEY_1); + eventBus().register(countingListener2, KEY_2); + eventBus().register(countingListener3, KEY_3); + + eventBus2().register(countingListener1, KEY_1); + eventBus2().register(countingListener2, KEY_2); + eventBus2().register(countingListener3, KEY_3); + + int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3 + int totalEventBus = 2; // eventBus1 + eventBus2 + + ConcurrentTestRunner.builder() + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS)) + .threadCount(THREAD_COUNT) + .operationCount(OPERATION_COUNT) + .runSuccessfullyWithin(FIVE_SECONDS); + + AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList + .of(countingListener1, countingListener2, countingListener3))) + .isEqualTo(totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS)); + } + + @Test + default void concurrentDispatchShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception { + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener(); + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener(); + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener(); + + eventBus2().register(countingListener1, GROUP_A); + eventBus2().register(countingListener2, new EventBusTestFixture.GroupB()); + eventBus2().register(countingListener3, new EventBusTestFixture.GroupC()); + int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC + int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS; + + eventBus().register(countingListener1, KEY_1); + eventBus().register(countingListener2, KEY_2); + + eventBus2().register(countingListener1, KEY_1); + eventBus2().register(countingListener2, KEY_2); + + eventBus3().register(countingListener3, KEY_1); + eventBus3().register(countingListener3, KEY_2); + + int totalKeyListenerRegistrations = 2; // KEY1 + KEY2 + int totalEventBus = 3; // eventBus1 + eventBus2 + eventBus3 + int totalEventDeliveredByKeys = totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS; + + ConcurrentTestRunner.builder() + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS)) + .threadCount(THREAD_COUNT) + .operationCount(OPERATION_COUNT) + .runSuccessfullyWithin(FIVE_SECONDS); + + AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList + .of(countingListener1, countingListener2, countingListener3))) + .isEqualTo(totalEventDeliveredGlobally + totalEventDeliveredByKeys)); + } } } diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java index 90f5ff55c26..e6920c0acbb 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java @@ -92,9 +92,11 @@ class GroupC extends Group {} int FIVE_HUNDRED_MS = 500; MailboxId ID_1 = TEST_ID; MailboxId ID_2 = TestId.of(24); + MailboxId ID_3 = TestId.of(36); ImmutableSet NO_KEYS = ImmutableSet.of(); MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(ID_1); MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2); + MailboxIdRegistrationKey KEY_3 = new MailboxIdRegistrationKey(ID_3); List> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class, GroupC.class); GroupA GROUP_A = new GroupA(); diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java index f50a66de837..142117421ed 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java @@ -251,6 +251,9 @@ default void dispatchShouldNotBlockAsynchronousListener() { default void failingRegisteredListenersShouldNotAbortRegisteredDelivery() { EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution()); doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) .doCallRealMethod() .when(listener).event(any()); eventBus().register(listener, KEY_1); diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index aa4c5c6accb..52a6e50a199 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -116,6 +116,7 @@ static void afterAll() { private RabbitMQEventBus eventBus; private RabbitMQEventBus eventBus2; + private RabbitMQEventBus eventBus3; private Sender sender; private RabbitMQConnectionFactory connectionFactory; private EventSerializer eventSerializer; @@ -132,8 +133,10 @@ void setUp() { eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter); eventBus2 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter); + eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter); eventBus.start(); eventBus2.start(); + eventBus3.start(); sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)); } @@ -141,6 +144,7 @@ void setUp() { void tearDown() { eventBus.stop(); eventBus2.stop(); + eventBus3.stop(); ALL_GROUPS.stream() .map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString()) .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block()); @@ -158,6 +162,18 @@ public EventBus eventBus2() { return eventBus2; } + @Override + public EventBus eventBus3() { + return eventBus3; + } + + @Override + @Test + @Disabled("This test is failing by RabbitMQEventBus exponential backoff is not implemented at this time") + public void failingRegisteredListenersShouldNotAbortRegisteredDelivery() { + + } + @Override @Test @Disabled("This test is failing by design as the different registration keys are handled by distinct messages") @@ -304,19 +320,6 @@ void registrationsShouldNotHandleEventsAfterStop() throws Exception { @Nested class MultiEventBus { - private RabbitMQEventBus eventBus3; - - @BeforeEach - void setUp() { - eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter); - eventBus3.start(); - } - - @AfterEach - void tearDown() { - eventBus3.stop(); - } - @Test void multipleEventBusStartShouldCreateOnlyOneEventExchange() { assertThat(rabbitManagementAPI.listExchanges())