From 552e44d073580b2565d163e007b3a3144c671bcc Mon Sep 17 00:00:00 2001 From: benwa Date: Fri, 9 Feb 2018 10:50:27 +0700 Subject: [PATCH] JAMES-2334 Add node killing tests for RabbitMQ cluster --- ...va => DockerClusterRabbitMQExtension.java} | 10 +- .../queue/rabbitmq/InMemoryConsumer.java | 14 +- .../queue/rabbitmq/RabbitMQClusterTest.java | 281 +++++++++++++----- 3 files changed, 234 insertions(+), 71 deletions(-) rename server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/{DockerClusterRabbitMQExtention.java => DockerClusterRabbitMQExtension.java} (91%) diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java similarity index 91% rename from server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java rename to server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java index fae201637f4..000fef65c87 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java @@ -28,8 +28,9 @@ import org.testcontainers.containers.Network; import com.google.common.collect.ImmutableList; +import com.rabbitmq.client.Address; -public class DockerClusterRabbitMQExtention implements BeforeEachCallback, AfterEachCallback, ParameterResolver { +public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { public static final String RABBIT_1 = "rabbit1"; public static final String RABBIT_2 = "rabbit2"; @@ -108,5 +109,12 @@ public DockerRabbitMQ getRabbitMQ2() { public DockerRabbitMQ getRabbitMQ3() { return rabbitMQ3; } + + public ImmutableList
getAddresses() { + return ImmutableList.of( + new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()), + new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()), + new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort())); + } } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java index f5f8fa2b391..6dd29affdae 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java @@ -30,15 +30,27 @@ public class InMemoryConsumer extends DefaultConsumer { + @FunctionalInterface + interface Operation { + void perform(); + } + private final ConcurrentLinkedQueue messages; + private final Operation operation; public InMemoryConsumer(Channel channel) { + this(channel, () -> {}); + } + + public InMemoryConsumer(Channel channel, Operation operation) { super(channel); - messages = new ConcurrentLinkedQueue<>(); + this.operation = operation; + this.messages = new ConcurrentLinkedQueue<>(); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + operation.perform(); Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); messages.add(payload); } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java index 4a7dd073669..d3f2cc1a499 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java @@ -30,116 +30,259 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; -import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster; +import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtension.DockerRabbitMQCluster; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.Duration; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -@ExtendWith(DockerClusterRabbitMQExtention.class) +@ExtendWith(DockerClusterRabbitMQExtension.class) class RabbitMQClusterTest { + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class); + private static final String QUEUE = "queue"; - private Connection node1Connection; - private Channel node1Channel; - private Connection node2Connection; - private Channel node2Channel; - - @BeforeEach - void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { - ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); - ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); - node1Connection = node1ConnectionFactory.newConnection(); - node2Connection = node2ConnectionFactory.newConnection(); - node1Channel = node1Connection.createChannel(); - node2Channel = node2Connection.createChannel(); - } + @Nested + class ClusterSharing { + + private ConnectionFactory node1ConnectionFactory; + private ConnectionFactory node2ConnectionFactory; + private Connection node1Connection; + private Connection node2Connection; + private Channel node1Channel; + private Channel node2Channel; + + @BeforeEach + void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { + node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); + node1Connection = node1ConnectionFactory.newConnection(); + node2Connection = node2ConnectionFactory.newConnection(); + node1Channel = node1Connection.createChannel(); + node2Channel = node2Connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection); + } + + @Test + void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception { + String stdout = cluster.getRabbitMQ1().container() + .execInContainer("rabbitmqctl", "cluster_status") + .getStdout(); + + assertThat(stdout) + .contains( + DockerClusterRabbitMQExtension.RABBIT_1, + DockerClusterRabbitMQExtension.RABBIT_2, + DockerClusterRabbitMQExtension.RABBIT_3); + } + + @Test + void queuesShouldBeShared() throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + + List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + @Test + void queuesShouldBeDeclarableOnAnotherNode() throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + + List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } - @AfterEach - void tearDown() { - closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection); } - private void closeQuietly(AutoCloseable... closeables) { - for (AutoCloseable closeable : closeables) { + @Nested + class ClusterNodesFailure { + + private ConnectionFactory node1ConnectionFactory; + private Connection resilientConnection; + private Channel resilientChannel; + private Connection node2Connection; + private Channel node2Channel; + + @BeforeEach + void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { + node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses()); + resilientChannel = resilientConnection.createChannel(); + ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); + node2Connection = node2ConnectionFactory.newConnection(); + node2Channel = node2Connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(resilientConnection, resilientChannel); + } + + @Disabled("For some reason, we are unable to recover topology when reconnecting") + @Test + void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception { + resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 20; + int firstBatchSize = nbMessages / 2; + IntStream.range(0, firstBatchSize) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer); + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize); + + cluster.getRabbitMQ1().stop(); + + IntStream.range(firstBatchSize, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(this::tryPublishWithRetry); + + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + private void tryPublishWithRetry(byte[] bytes) { + Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes)); + } + + private boolean tryPublish(byte[] bytes) { try { - closeable.close(); + resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes); + return true; } catch (Exception e) { - //ignoring exception + LOGGER.error("failed publish", e); + return false; } } - } - @Test - void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception { - String stdout = cluster.getRabbitMQ1().container() - .execInContainer("rabbitmqctl", "cluster_status") - .getStdout(); - - assertThat(stdout) - .contains( - DockerClusterRabbitMQExtention.RABBIT_1, - DockerClusterRabbitMQExtention.RABBIT_2, - DockerClusterRabbitMQExtention.RABBIT_3); - } + @Test + void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception { + ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory(); + cluster.getRabbitMQ3().stop(); - @Test - void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception { - node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + try (Connection connection = node3ConnectionFactory.newConnection(cluster.getAddresses()); + Channel channel = connection.createChannel()) { - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer2); + InMemoryConsumer consumer = new InMemoryConsumer(channel); + channel.basicConsume(QUEUE, consumer); - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); - List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList()); - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } + List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + } - @Test - void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception { - node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + @Test + void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception { + node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer2); + AtomicInteger counter = new AtomicInteger(0); + InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel, + () -> { + if (counter.incrementAndGet() == nbMessages / 2) { + cluster.getRabbitMQ1().stop(); + } + }); + resilientChannel.basicConsume(QUEUE, consumer); - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } - List expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList()); - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + private void closeQuietly(AutoCloseable... closeables) { + Arrays.stream(closeables).forEach(this::closeQuietly); + } + + private void closeQuietly(AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception e) { + //ignore error + } } private byte[] asBytes(String message) { return message.getBytes(StandardCharsets.UTF_8); } - }