Skip to content

Commit

Permalink
JAMES-2334 Add node killing tests for RabbitMQ cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa authored and mbaechler committed May 31, 2018
1 parent 00adfa8 commit 552e44d
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 71 deletions.
Expand Up @@ -28,8 +28,9 @@
import org.testcontainers.containers.Network;

import com.google.common.collect.ImmutableList;
import com.rabbitmq.client.Address;

public class DockerClusterRabbitMQExtention implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {

public static final String RABBIT_1 = "rabbit1";
public static final String RABBIT_2 = "rabbit2";
Expand Down Expand Up @@ -108,5 +109,12 @@ public DockerRabbitMQ getRabbitMQ2() {
public DockerRabbitMQ getRabbitMQ3() {
return rabbitMQ3;
}

public ImmutableList<Address> getAddresses() {
return ImmutableList.of(
new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()),
new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()),
new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort()));
}
}
}
Expand Up @@ -30,15 +30,27 @@

public class InMemoryConsumer extends DefaultConsumer {

@FunctionalInterface
interface Operation {
void perform();
}

private final ConcurrentLinkedQueue<Integer> messages;
private final Operation operation;

public InMemoryConsumer(Channel channel) {
this(channel, () -> {});
}

public InMemoryConsumer(Channel channel, Operation operation) {
super(channel);
messages = new ConcurrentLinkedQueue<>();
this.operation = operation;
this.messages = new ConcurrentLinkedQueue<>();
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
operation.perform();
Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
messages.add(payload);
}
Expand Down
Expand Up @@ -30,116 +30,259 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtension.DockerRabbitMQCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

@ExtendWith(DockerClusterRabbitMQExtention.class)
@ExtendWith(DockerClusterRabbitMQExtension.class)
class RabbitMQClusterTest {

private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class);

private static final String QUEUE = "queue";

private Connection node1Connection;
private Channel node1Channel;
private Connection node2Connection;
private Channel node2Channel;

@BeforeEach
void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
node1Connection = node1ConnectionFactory.newConnection();
node2Connection = node2ConnectionFactory.newConnection();
node1Channel = node1Connection.createChannel();
node2Channel = node2Connection.createChannel();
}
@Nested
class ClusterSharing {

private ConnectionFactory node1ConnectionFactory;
private ConnectionFactory node2ConnectionFactory;
private Connection node1Connection;
private Connection node2Connection;
private Channel node1Channel;
private Channel node2Channel;

@BeforeEach
void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
node1Connection = node1ConnectionFactory.newConnection();
node2Connection = node2ConnectionFactory.newConnection();
node1Channel = node1Connection.createChannel();
node2Channel = node2Connection.createChannel();
}

@AfterEach
void tearDown() {
closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
}

@Test
void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
String stdout = cluster.getRabbitMQ1().container()
.execInContainer("rabbitmqctl", "cluster_status")
.getStdout();

assertThat(stdout)
.contains(
DockerClusterRabbitMQExtension.RABBIT_1,
DockerClusterRabbitMQExtension.RABBIT_2,
DockerClusterRabbitMQExtension.RABBIT_3);
}

@Test
void queuesShouldBeShared() throws Exception {
node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);

int nbMessages = 10;
IntStream.range(0, nbMessages)
.mapToObj(i -> asBytes(String.valueOf(i)))
.forEach(Throwing.consumer(
bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));


InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
node2Channel.basicConsume(QUEUE, consumer2);

awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);

List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}

@Test
void queuesShouldBeDeclarableOnAnotherNode() throws Exception {
node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);

int nbMessages = 10;
IntStream.range(0, nbMessages)
.mapToObj(i -> asBytes(String.valueOf(i)))
.forEach(Throwing.consumer(
bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));

InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
node2Channel.basicConsume(QUEUE, consumer2);

awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);

List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}

@AfterEach
void tearDown() {
closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
}

private void closeQuietly(AutoCloseable... closeables) {
for (AutoCloseable closeable : closeables) {
@Nested
class ClusterNodesFailure {

private ConnectionFactory node1ConnectionFactory;
private Connection resilientConnection;
private Channel resilientChannel;
private Connection node2Connection;
private Channel node2Channel;

@BeforeEach
void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses());
resilientChannel = resilientConnection.createChannel();
ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
node2Connection = node2ConnectionFactory.newConnection();
node2Channel = node2Connection.createChannel();
}

@AfterEach
void tearDown() {
closeQuietly(resilientConnection, resilientChannel);
}

@Disabled("For some reason, we are unable to recover topology when reconnecting")
@Test
void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception {
resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);

int nbMessages = 20;
int firstBatchSize = nbMessages / 2;
IntStream.range(0, firstBatchSize)
.mapToObj(i -> asBytes(String.valueOf(i)))
.forEach(Throwing.consumer(
bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));

InMemoryConsumer consumer = new InMemoryConsumer(node2Channel);
node2Channel.basicConsume(QUEUE, consumer);
awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize);

cluster.getRabbitMQ1().stop();

IntStream.range(firstBatchSize, nbMessages)
.mapToObj(i -> asBytes(String.valueOf(i)))
.forEach(this::tryPublishWithRetry);

awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);

List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}

private void tryPublishWithRetry(byte[] bytes) {
Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes));
}

private boolean tryPublish(byte[] bytes) {
try {
closeable.close();
resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes);
return true;
} catch (Exception e) {
//ignoring exception
LOGGER.error("failed publish", e);
return false;
}
}
}

@Test
void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
String stdout = cluster.getRabbitMQ1().container()
.execInContainer("rabbitmqctl", "cluster_status")
.getStdout();

assertThat(stdout)
.contains(
DockerClusterRabbitMQExtention.RABBIT_1,
DockerClusterRabbitMQExtention.RABBIT_2,
DockerClusterRabbitMQExtention.RABBIT_3);
}
@Test
void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception {
ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory();
cluster.getRabbitMQ3().stop();

@Test
void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
try (Connection connection = node3ConnectionFactory.newConnection(cluster.getAddresses());
Channel channel = connection.createChannel()) {

int nbMessages = 10;
IntStream.range(0, nbMessages)
.mapToObj(i -> asBytes(String.valueOf(i)))
.forEach(Throwing.consumer(
bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);

int nbMessages = 10;
IntStream.range(0, nbMessages)
.mapToObj(i -> asBytes(String.valueOf(i)))
.forEach(Throwing.consumer(
bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));

InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
node2Channel.basicConsume(QUEUE, consumer2);
InMemoryConsumer consumer = new InMemoryConsumer(channel);
channel.basicConsume(QUEUE, consumer);

awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);

List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}
List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}
}

@Test
void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
@Test
void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception {
node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);

int nbMessages = 10;
IntStream.range(0, nbMessages)
.mapToObj(i -> asBytes(String.valueOf(i)))
.forEach(Throwing.consumer(
bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
int nbMessages = 10;
IntStream.range(0, nbMessages)
.mapToObj(i -> asBytes(String.valueOf(i)))
.forEach(Throwing.consumer(
bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));

InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
node2Channel.basicConsume(QUEUE, consumer2);
AtomicInteger counter = new AtomicInteger(0);
InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel,
() -> {
if (counter.incrementAndGet() == nbMessages / 2) {
cluster.getRabbitMQ1().stop();
}
});
resilientChannel.basicConsume(QUEUE, consumer);

awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);

List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}

List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}

private void closeQuietly(AutoCloseable... closeables) {
Arrays.stream(closeables).forEach(this::closeQuietly);
}

private void closeQuietly(AutoCloseable closeable) {
try {
closeable.close();
} catch (Exception e) {
//ignore error
}
}

private byte[] asBytes(String message) {
return message.getBytes(StandardCharsets.UTF_8);
}

}

0 comments on commit 552e44d

Please sign in to comment.