Skip to content

Commit

Permalink
Start mosquitto container via tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dimabarbul committed Nov 17, 2023
1 parent eb8755f commit a40a966
Show file tree
Hide file tree
Showing 21 changed files with 369 additions and 183 deletions.
Expand Up @@ -43,6 +43,7 @@
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.GenericMqttPublish;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.subscribe.GenericMqttSubscribe;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.subscribe.GenericMqttSubscription;
import org.eclipse.ditto.internal.utils.test.docker.mosquitto.MosquittoResource;
import org.eclipse.ditto.internal.utils.tracing.DittoTracingInitResource;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.things.model.ThingId;
Expand Down Expand Up @@ -85,9 +86,9 @@ public static Collection<MqttVersion> mqttVersions() {
public static final DittoTracingInitResource DITTO_TRACING_INIT_RESOURCE =
DittoTracingInitResource.disableDittoTracing();

// @ClassRule
// public static final MongoDbResource MONGO_RESOURCE = new MongoDbResource();
// private static DittoMongoClient mongoClient;
@ClassRule
public static final MosquittoResource MOSQUITTO_RESOURCE = new MosquittoResource("mosquitto.conf");

private static final ConnectionId CONNECTION_ID = ConnectionId.of("connection");
private static final String CLIENT_ID_DITTO = "ditto";
private static final String TOPIC_NAME = "data";
Expand All @@ -110,7 +111,6 @@ public static Collection<MqttVersion> mqttVersions() {

private ActorSystem actorSystem;
private TestProbe commandForwarderProbe;
// private MongoCollection<Document> thingsCollection;

@BeforeClass
public static void beforeClass() {
Expand Down Expand Up @@ -149,17 +149,6 @@ public void after() {
}
}

/*@AfterClass
public static void stopMongoResource() {
try {
if (mongoClient != null) {
mongoClient.close();
}
} catch (final IllegalStateException e) {
System.err.println("IllegalStateException during shutdown of MongoDB: " + e.getMessage());
}
}*/

@Test
public void testSingleTopic() {
new TestKit(actorSystem) {{
Expand Down Expand Up @@ -320,7 +309,7 @@ private static Connection getConnection(final String[]... sourcesTopics) {
return ConnectivityModelFactory.newConnectionBuilder(CONNECTION_ID,
mqttVersion.equals(MqttVersion.MQTT_3_1_1) ? ConnectionType.MQTT : ConnectionType.MQTT_5,
ConnectivityStatus.CLOSED,
"tcp://localhost:1883")
"tcp://" + MOSQUITTO_RESOURCE.getBindIp() + ":" + MOSQUITTO_RESOURCE.getPort())
.specificConfig(Map.of(
"clientId", CLIENT_ID_DITTO,
"cleanSession", "false",
Expand Down Expand Up @@ -365,7 +354,7 @@ private static void sleep(int seconds) {
}

private static GenericBlockingMqttClient getMqttClient(final String clientId) {
return GenericBlockingMqttClientBuilder.newInstance(mqttVersion, "localhost", 1883)
return GenericBlockingMqttClientBuilder.newInstance(mqttVersion, MOSQUITTO_RESOURCE.getBindIp(), MOSQUITTO_RESOURCE.getPort())
.clientIdentifier(clientId)
.build();
}
Expand Down
3 changes: 3 additions & 0 deletions connectivity/service/src/test/resources/mosquitto.conf
@@ -0,0 +1,3 @@
listener 1883
persistence true
allow_anonymous true
Expand Up @@ -28,7 +28,7 @@
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.persistence.mongo.assertions.MongoIndexAssertions;
import org.eclipse.ditto.internal.utils.test.mongo.MongoDbResource;
import org.eclipse.ditto.internal.utils.test.docker.mongo.MongoDbResource;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.operations.PersistenceOperationsConfig;
import org.eclipse.ditto.internal.utils.test.mongo.MongoDbResource;
import org.eclipse.ditto.internal.utils.test.docker.mongo.MongoDbResource;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.bson.Document;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.test.mongo.MongoDbResource;
import org.eclipse.ditto.internal.utils.test.docker.mongo.MongoDbResource;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.bson.Document;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.test.mongo.MongoDbResource;
import org.eclipse.ditto.internal.utils.test.docker.mongo.MongoDbResource;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down
Expand Up @@ -10,17 +10,19 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.test.mongo;
package org.eclipse.ditto.internal.utils.test.docker;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.CreateContainerCmd;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.HostConfig;
Expand All @@ -33,25 +35,20 @@
/**
* Responsible for creating and configuring the mongo db docker container that should be started for tests.
*/
final class MongoContainerFactory implements Closeable {
public abstract class ContainerFactory implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoContainerFactory.class);
private static final String MONGO_IMAGE_NAME = "mongo";
private static final String DEFAULT_MONGO_VERSION = "4.2";
private static final int MONGO_INTERNAL_PORT = 27017;
private static final PortBinding MONGO_PORT_BINDING_TO_RANDOM_PORT =
new PortBinding(Ports.Binding.empty(), ExposedPort.tcp(MONGO_INTERNAL_PORT));
private static final List<String> MONGO_COMMANDS = List.of("mongod", "--storageEngine", "wiredTiger");
private static final Logger LOGGER = LoggerFactory.getLogger(ContainerFactory.class);

private static final MongoContainerFactory INSTANCE = new MongoContainerFactory(DEFAULT_MONGO_VERSION);
private static final String UNIX_DOCKER_HOST = "unix:///var/run/docker.sock";
private static final String WINDOWS_DOCKER_HOST = "npipe:////./pipe/docker_engine";

private final String mongoImageIdentifier;
private final String imageIdentifier;
private final int[] ports;
private final DockerClient dockerClient;

private MongoContainerFactory(final String mongoVersion) {
mongoImageIdentifier = MONGO_IMAGE_NAME + ":" + mongoVersion;
protected ContainerFactory(final String imageIdentifier, final int... ports) {
this.imageIdentifier = imageIdentifier;
this.ports = ports;
final String dockerHost = OsDetector.isWindows() ? WINDOWS_DOCKER_HOST : UNIX_DOCKER_HOST;
LOGGER.info("Connecting to docker daemon on <{}>.", dockerHost);
final DefaultDockerClientConfig config =
Expand All @@ -62,37 +59,32 @@ private MongoContainerFactory(final String mongoVersion) {
.build();
dockerClient = DockerClientImpl.getInstance(config, httpClient);

LOGGER.info("Checking if Mongo image <{}> needs to be pulled.", mongoImageIdentifier);
if (isMongoImageAbsent()) {
pullMongoImage();
LOGGER.info("Checking if image <{}> needs to be pulled.", imageIdentifier);
if (isImageAbsent()) {
pullImage();
}
}

/**
* @return returns the singleton instance of this factory.
*/
static MongoContainerFactory getInstance() {
return INSTANCE;
}

static MongoContainerFactory of(final String mongoVersion) {
return new MongoContainerFactory(mongoVersion);
protected CreateContainerCmd configureContainer(CreateContainerCmd createContainerCmd) {
return createContainerCmd;
}

/**
* Creates the mongo docker container with all required configuration and returns it.
* Creates the docker container with all required configuration and returns it.
* It's not started after it has been returned.
*
* @return the created {@link DockerContainer}.
*/
DockerContainer createMongoContainer() {
return getMongoImageId()
DockerContainer createContainer() {
return getImageId()
.map(imageId -> {
LOGGER.info("Creating container based on image with ID <{}>.", imageId);
return dockerClient.createContainerCmd(imageId)
.withCmd(MONGO_COMMANDS)
final var createContainerCmd = dockerClient.createContainerCmd(imageId)
.withHostConfig(
HostConfig.newHostConfig().withPortBindings(MONGO_PORT_BINDING_TO_RANDOM_PORT))
HostConfig.newHostConfig().withPortBindings(Arrays.stream(ports)
.mapToObj(p -> new PortBinding(Ports.Binding.empty(), ExposedPort.tcp(p)))
.collect(Collectors.toList())));
return configureContainer(createContainerCmd)
.exec()
.getId();
})
Expand All @@ -102,36 +94,33 @@ DockerContainer createMongoContainer() {
);
}

private boolean isMongoImageAbsent() {
final Optional<String> mongoImageId = getMongoImageId();
mongoImageId.ifPresentOrElse(imageId -> {
LOGGER.info("Mongo image <{}> is already present with ID <{}>", mongoImageIdentifier, imageId);
private boolean isImageAbsent() {
final Optional<String> imageId = getImageId();
imageId.ifPresentOrElse(id -> {
LOGGER.info("Image <{}> is already present with ID <{}>", imageIdentifier, id);
}, () -> {
LOGGER.info("Mongo image <{}> is not present, yet.", mongoImageIdentifier);
LOGGER.info("Image <{}> is not present, yet.", imageIdentifier);
});
return mongoImageId.isEmpty();
return imageId.isEmpty();
}

private Optional<String> getMongoImageId() {
private Optional<String> getImageId() {
try {
return Optional.ofNullable(dockerClient.inspectImageCmd(mongoImageIdentifier).exec().getId());
return Optional.ofNullable(dockerClient.inspectImageCmd(imageIdentifier).exec().getId());
} catch (final NotFoundException e) {
return Optional.empty();
}
}

private void pullMongoImage() {
LOGGER.info("Pulling <{}>.", mongoImageIdentifier);
private void pullImage() {
LOGGER.info("Pulling <{}>.", imageIdentifier);
final DockerImagePullHandler dockerImagePullHandler = DockerImagePullHandler.newInstance();
dockerClient.pullImageCmd(mongoImageIdentifier).exec(dockerImagePullHandler);
dockerClient.pullImageCmd(imageIdentifier).exec(dockerImagePullHandler);
dockerImagePullHandler.getImagePullFuture().join();
}

@Override
public void close() throws IOException {
// never close the default instance
if (this != INSTANCE) {
dockerClient.close();
}
dockerClient.close();
}
}
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.test.mongo;
package org.eclipse.ditto.internal.utils.test.docker;

import java.util.Arrays;
import java.util.Optional;
Expand All @@ -27,7 +27,7 @@
/**
* Provides an easy way to start, stop and remove the once created docker container.
*/
final class DockerContainer {
public final class DockerContainer {

private static final Logger LOGGER = LoggerFactory.getLogger(DockerContainer.class);
private static final Integer DOCKER_STOP_TIMEOUT_SECONDS = 5;
Expand Down
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.test.mongo;
package org.eclipse.ditto.internal.utils.test.docker;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -27,7 +27,7 @@
/**
* Allows to watch completion of image pulling process and logs its progress.
*/
final class DockerImagePullHandler implements ResultCallback<PullResponseItem> {
public final class DockerImagePullHandler implements ResultCallback<PullResponseItem> {

private static final Logger LOGGER = LoggerFactory.getLogger(DockerImagePullHandler.class);

Expand All @@ -39,7 +39,7 @@ private DockerImagePullHandler() {
imagePullFuture = new CompletableFuture<>();
}

static DockerImagePullHandler newInstance() {
public static DockerImagePullHandler newInstance() {
return new DockerImagePullHandler();
}

Expand Down
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.test.docker;

import java.io.Closeable;
import java.io.IOException;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import org.junit.rules.ExternalResource;

/**
* External docker resource for utilization within tests.
* If the environment variable "HOSTING_ENVIRONMENT" is set to "IDE", then the resource will use localhost:{PORT}
* instead of starting its own container.
*/
@NotThreadSafe
public abstract class DockerResource extends ExternalResource {

private static final String CONTAINER_ALREADY_STARTED = "Container has already been started.";
private static final String HOSTING_ENVIRONMENT_ENV_VARIABLE_NAME = "HOSTING_ENVIRONMENT";
private static final String IDE_HOSTING_ENVIRONMENT = "IDE";

private final ContainerFactory containerFactory;
@Nullable private DockerContainer container;

public DockerResource(ContainerFactory containerFactory) {
this.containerFactory = containerFactory;
}

@Override
protected void before() {
if (container != null) {
throw new IllegalStateException(CONTAINER_ALREADY_STARTED);
}
if (!IDE_HOSTING_ENVIRONMENT.equals(System.getenv(HOSTING_ENVIRONMENT_ENV_VARIABLE_NAME))) {
container = containerFactory.createContainer();
container.start();
}
}

@Override
protected void after() {
if (container != null) {
container.stop();
container.remove();
container = null;
}
try {
containerFactory.close();
} catch (final IOException e) {
throw new AssertionError(e);
}
}

/**
* @return the port on which the container listens.
*/
protected int getPort(final int port) {
if (container == null) {
return port;
}
return container.getPort(port);
}

/**
* @return the IP on which the container was bound.
*/
public String getBindIp() {
if (container == null) {
return "localhost";
}
return container.getHostname();
}

}

0 comments on commit a40a966

Please sign in to comment.