Skip to content

Commit

Permalink
JAMES-2334 Introduce a dockerized rabbitMQ cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa authored and mbaechler committed May 31, 2018
1 parent 8767d00 commit 4439551
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 7 deletions.
5 changes: 5 additions & 0 deletions server/queue/queue-rabbitmq/pom.xml
Expand Up @@ -53,6 +53,11 @@
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
@@ -0,0 +1,112 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.queue.rabbitmq;

import org.apache.commons.codec.digest.DigestUtils;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.testcontainers.containers.Network;

import com.google.common.collect.ImmutableList;

public class DockerClusterRabbitMQExtention implements BeforeEachCallback, AfterEachCallback, ParameterResolver {

public static final String RABBIT_1 = "rabbit1";
public static final String RABBIT_2 = "rabbit2";
public static final String RABBIT_3 = "rabbit3";
private DockerRabbitMQCluster cluster;
private Network network;

@Override
public void beforeEach(ExtensionContext context) throws Exception {
String cookie = DigestUtils.sha1Hex("secret cookie here");

network = Network.NetworkImpl.builder()
.enableIpv6(false)
.createNetworkCmdModifiers(ImmutableList.of())
.build();

DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network);
DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network);
DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network);

rabbitMQ1.start();
rabbitMQ2.start();
rabbitMQ3.start();

rabbitMQ2.join(rabbitMQ1);
rabbitMQ3.join(rabbitMQ1);

rabbitMQ2.startApp();
rabbitMQ3.startApp();

cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
}

@Override
public void afterEach(ExtensionContext context) throws Exception {
cluster.stop();
network.close();
}

@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class);
}

@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
return cluster;
}

public static class DockerRabbitMQCluster {

private final DockerRabbitMQ rabbitMQ1;
private final DockerRabbitMQ rabbitMQ2;
private final DockerRabbitMQ rabbitMQ3;

public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
this.rabbitMQ1 = rabbitMQ1;
this.rabbitMQ2 = rabbitMQ2;
this.rabbitMQ3 = rabbitMQ3;
}

public void stop() {
rabbitMQ1.stop();
rabbitMQ2.stop();
rabbitMQ3.stop();
}

public DockerRabbitMQ getRabbitMQ1() {
return rabbitMQ1;
}

public DockerRabbitMQ getRabbitMQ2() {
return rabbitMQ2;
}

public DockerRabbitMQ getRabbitMQ3() {
return rabbitMQ3;
}
}
}
Expand Up @@ -18,30 +18,50 @@
****************************************************************/
package org.apache.james.queue.rabbitmq;

import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;

import com.rabbitmq.client.ConnectionFactory;

public class DockerRabbitMQ {
private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class);

private static final String DEFAULT_RABBIT_NODE = "my-rabbit";
private static final int DEFAULT_RABBITMQ_PORT = 5672;
private static final String DEFAULT_RABBITMQ_HOSTNAME = "my-rabbit";
private static final String DEFAULT_RABBITMQ_USERNAME = "guest";
private static final String DEFAULT_RABBITMQ_PASSWORD = "guest";
private static final String RABBITMQ_ERLANG_COOKIE = "RABBITMQ_ERLANG_COOKIE";
private static final String RABBITMQ_NODENAME = "RABBITMQ_NODENAME";

private final GenericContainer<?> container;
private final Optional<String> nodeName;

private GenericContainer<?> container;
public static DockerRabbitMQ withCookieAndNodeName(String hostName, String erlangCookie, String nodeName, Network network) {
return new DockerRabbitMQ(Optional.ofNullable(hostName), Optional.ofNullable(erlangCookie), Optional.ofNullable(nodeName),
Optional.of(network));
}

public static DockerRabbitMQ withoutCookie() {
return new DockerRabbitMQ(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
}

@SuppressWarnings("resource")
public DockerRabbitMQ() {
private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) {
container = new GenericContainer<>("rabbitmq:3.7.5")
.withCreateContainerCmdModifier(cmd -> cmd.withHostName(DEFAULT_RABBITMQ_HOSTNAME))
.withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse("localhost")))
.withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE)))
.withExposedPorts(DEFAULT_RABBITMQ_PORT)
.waitingFor(RabbitMQWaitStrategy.withDefaultTimeout(this))
.withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String()));
net.ifPresent(container::withNetwork);
erlangCookie.ifPresent(cookie -> container.withEnv(RABBITMQ_ERLANG_COOKIE, cookie));
nodeName.ifPresent(name -> container.withEnv(RABBITMQ_NODENAME, name));
this.nodeName = nodeName;
}

public String getHostIp() {
Expand Down Expand Up @@ -81,4 +101,29 @@ public void restart() {
DockerClientFactory.instance().client()
.restartContainerCmd(container.getContainerId());
}

public GenericContainer<?> container() {
return container;
}

public String node() {
return nodeName.get();
}

public void join(DockerRabbitMQ rabbitMQ) throws Exception {
container()
.execInContainer("rabbitmqctl", "stop_app")
.getStdout();
container()
.execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node())
.getStdout();
}

public void startApp() throws Exception {
String stdout = container()
.execInContainer("rabbitmqctl", "start_app")
.getStdout();
System.out.println(stdout);

}
}
Expand Up @@ -18,9 +18,7 @@
****************************************************************/
package org.apache.james.queue.rabbitmq;

import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
Expand All @@ -33,7 +31,7 @@ public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCal

@Override
public void beforeEach(ExtensionContext context) {
rabbitMQ = new DockerRabbitMQ();
rabbitMQ = DockerRabbitMQ.withoutCookie();
rabbitMQ.start();
}

Expand Down
@@ -0,0 +1,43 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.queue.rabbitmq;

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(DockerClusterRabbitMQExtention.class)
class RabbitMQClusterTest {

@Test
void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
String stdout = cluster.getRabbitMQ1().container()
.execInContainer("rabbitmqctl", "cluster_status")
.getStdout();

assertThat(stdout)
.contains(
DockerClusterRabbitMQExtention.RABBIT_1,
DockerClusterRabbitMQExtention.RABBIT_2,
DockerClusterRabbitMQExtention.RABBIT_3);
}

}

0 comments on commit 4439551

Please sign in to comment.