diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
similarity index 91%
rename from server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
rename to server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
index fae201637f4..000fef65c87 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
@@ -28,8 +28,9 @@
import org.testcontainers.containers.Network;
import com.google.common.collect.ImmutableList;
+import com.rabbitmq.client.Address;
-public class DockerClusterRabbitMQExtention implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
public static final String RABBIT_1 = "rabbit1";
public static final String RABBIT_2 = "rabbit2";
@@ -108,5 +109,12 @@ public DockerRabbitMQ getRabbitMQ2() {
public DockerRabbitMQ getRabbitMQ3() {
return rabbitMQ3;
}
+
+ public ImmutableList
getAddresses() {
+ return ImmutableList.of(
+ new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()),
+ new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()),
+ new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort()));
+ }
}
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
index f5f8fa2b391..6dd29affdae 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
@@ -30,15 +30,27 @@
public class InMemoryConsumer extends DefaultConsumer {
+ @FunctionalInterface
+ interface Operation {
+ void perform();
+ }
+
private final ConcurrentLinkedQueue messages;
+ private final Operation operation;
public InMemoryConsumer(Channel channel) {
+ this(channel, () -> {});
+ }
+
+ public InMemoryConsumer(Channel channel, Operation operation) {
super(channel);
- messages = new ConcurrentLinkedQueue<>();
+ this.operation = operation;
+ this.messages = new ConcurrentLinkedQueue<>();
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ operation.perform();
Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
messages.add(payload);
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index 4a7dd073669..d3f2cc1a499 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -30,116 +30,259 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
-import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
+import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtension.DockerRabbitMQCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.Duration;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-@ExtendWith(DockerClusterRabbitMQExtention.class)
+@ExtendWith(DockerClusterRabbitMQExtension.class)
class RabbitMQClusterTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class);
+
private static final String QUEUE = "queue";
- private Connection node1Connection;
- private Channel node1Channel;
- private Connection node2Connection;
- private Channel node2Channel;
-
- @BeforeEach
- void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
- ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
- ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
- node1Connection = node1ConnectionFactory.newConnection();
- node2Connection = node2ConnectionFactory.newConnection();
- node1Channel = node1Connection.createChannel();
- node2Channel = node2Connection.createChannel();
- }
+ @Nested
+ class ClusterSharing {
+
+ private ConnectionFactory node1ConnectionFactory;
+ private ConnectionFactory node2ConnectionFactory;
+ private Connection node1Connection;
+ private Connection node2Connection;
+ private Channel node1Channel;
+ private Channel node2Channel;
+
+ @BeforeEach
+ void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+ node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+ node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+ node1Connection = node1ConnectionFactory.newConnection();
+ node2Connection = node2ConnectionFactory.newConnection();
+ node1Channel = node1Connection.createChannel();
+ node2Channel = node2Connection.createChannel();
+ }
+
+ @AfterEach
+ void tearDown() {
+ closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
+ }
+
+ @Test
+ void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
+ String stdout = cluster.getRabbitMQ1().container()
+ .execInContainer("rabbitmqctl", "cluster_status")
+ .getStdout();
+
+ assertThat(stdout)
+ .contains(
+ DockerClusterRabbitMQExtension.RABBIT_1,
+ DockerClusterRabbitMQExtension.RABBIT_2,
+ DockerClusterRabbitMQExtension.RABBIT_3);
+ }
+
+ @Test
+ void queuesShouldBeShared() throws Exception {
+ node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+
+ InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer2);
+
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+
+ List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
+
+ @Test
+ void queuesShouldBeDeclarableOnAnotherNode() throws Exception {
+ node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+ InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer2);
+
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+
+ List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
- @AfterEach
- void tearDown() {
- closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
}
- private void closeQuietly(AutoCloseable... closeables) {
- for (AutoCloseable closeable : closeables) {
+ @Nested
+ class ClusterNodesFailure {
+
+ private ConnectionFactory node1ConnectionFactory;
+ private Connection resilientConnection;
+ private Channel resilientChannel;
+ private Connection node2Connection;
+ private Channel node2Channel;
+
+ @BeforeEach
+ void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+ node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+ resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses());
+ resilientChannel = resilientConnection.createChannel();
+ ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+ node2Connection = node2ConnectionFactory.newConnection();
+ node2Channel = node2Connection.createChannel();
+ }
+
+ @AfterEach
+ void tearDown() {
+ closeQuietly(resilientConnection, resilientChannel);
+ }
+
+ @Disabled("For some reason, we are unable to recover topology when reconnecting")
+ @Test
+ void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception {
+ resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ int nbMessages = 20;
+ int firstBatchSize = nbMessages / 2;
+ IntStream.range(0, firstBatchSize)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+ InMemoryConsumer consumer = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer);
+ awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize);
+
+ cluster.getRabbitMQ1().stop();
+
+ IntStream.range(firstBatchSize, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(this::tryPublishWithRetry);
+
+ awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
+
+ List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
+
+ private void tryPublishWithRetry(byte[] bytes) {
+ Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes));
+ }
+
+ private boolean tryPublish(byte[] bytes) {
try {
- closeable.close();
+ resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes);
+ return true;
} catch (Exception e) {
- //ignoring exception
+ LOGGER.error("failed publish", e);
+ return false;
}
}
- }
- @Test
- void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
- String stdout = cluster.getRabbitMQ1().container()
- .execInContainer("rabbitmqctl", "cluster_status")
- .getStdout();
-
- assertThat(stdout)
- .contains(
- DockerClusterRabbitMQExtention.RABBIT_1,
- DockerClusterRabbitMQExtention.RABBIT_2,
- DockerClusterRabbitMQExtention.RABBIT_3);
- }
+ @Test
+ void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception {
+ ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory();
+ cluster.getRabbitMQ3().stop();
- @Test
- void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
- node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
- node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
- node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+ try (Connection connection = node3ConnectionFactory.newConnection(cluster.getAddresses());
+ Channel channel = connection.createChannel()) {
- int nbMessages = 10;
- IntStream.range(0, nbMessages)
- .mapToObj(i -> asBytes(String.valueOf(i)))
- .forEach(Throwing.consumer(
- bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
- node2Channel.basicConsume(QUEUE, consumer2);
+ InMemoryConsumer consumer = new InMemoryConsumer(channel);
+ channel.basicConsume(QUEUE, consumer);
- awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+ awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
- List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
- assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
- }
+ List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
+ }
- @Test
- void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
- node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
- node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
- node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+ @Test
+ void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception {
+ node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
- int nbMessages = 10;
- IntStream.range(0, nbMessages)
- .mapToObj(i -> asBytes(String.valueOf(i)))
- .forEach(Throwing.consumer(
- bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
- node2Channel.basicConsume(QUEUE, consumer2);
+ AtomicInteger counter = new AtomicInteger(0);
+ InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel,
+ () -> {
+ if (counter.incrementAndGet() == nbMessages / 2) {
+ cluster.getRabbitMQ1().stop();
+ }
+ });
+ resilientChannel.basicConsume(QUEUE, consumer);
- awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+ awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
+
+ List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
- List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
- assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
+
+ private void closeQuietly(AutoCloseable... closeables) {
+ Arrays.stream(closeables).forEach(this::closeQuietly);
+ }
+
+ private void closeQuietly(AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ //ignore error
+ }
}
private byte[] asBytes(String message) {
return message.getBytes(StandardCharsets.UTF_8);
}
-
}