Skip to content

Commit

Permalink
Ensure that local containers used during tests are properly shutdown …
Browse files Browse the repository at this point in the history
…after the test is completed
  • Loading branch information
orpiske committed Mar 7, 2020
1 parent c2ffa8b commit fa0d98a
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 16 deletions.
Expand Up @@ -57,4 +57,10 @@ public CassandraClient getClient() {
public void initialize() {
LOG.info("Cassandra server running at address {}", getCQL3Endpoint());
}

@Override
public void shutdown() {
LOG.info("Stopping the Cassandra container");
container.stop();
}
}
Expand Up @@ -18,13 +18,14 @@
package org.apache.camel.kafkaconnector.services.cassandra;

import org.apache.camel.kafkaconnector.clients.cassandra.CassandraClient;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

/**
* Represents an endpoint to a Cassandra instnace
*/
public interface CassandraService extends BeforeAllCallback {
public interface CassandraService extends BeforeAllCallback, AfterAllCallback {

int getCQL3Port();

Expand All @@ -39,6 +40,10 @@ default String getCQL3Endpoint() {
*/
void initialize();

/**
* Shuts down the service after the test has completed
*/
void shutdown();

CassandraClient getClient();

Expand All @@ -47,4 +52,9 @@ default String getCQL3Endpoint() {
default void beforeAll(ExtensionContext extensionContext) throws Exception {
initialize();
}

@Override
default void afterAll(ExtensionContext extensionContext) throws Exception {
shutdown();
}
}
Expand Up @@ -52,4 +52,9 @@ public CassandraClient getClient() {
public void initialize() {
// NO-OP
}

@Override
public void shutdown() {
// NO-OP
}
}
Expand Up @@ -52,6 +52,12 @@ public void initialize() {
LOG.info("ElasticSearch instance running at {}", getHttpHostAddress());
}

@Override
public void shutdown() {
LOG.info("Stopping the ElasticSearch container");
container.stop();
}

@Override
public ElasticSearchClient getClient() {
return new ElasticSearchClient("localhost", container.getMappedPort(ELASTIC_SEARCH_PORT),
Expand Down
Expand Up @@ -18,10 +18,11 @@
package org.apache.camel.kafkaconnector.services.elasticsearch;

import org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

public interface ElasticSearchService extends BeforeAllCallback {
public interface ElasticSearchService extends BeforeAllCallback, AfterAllCallback {

String getHttpHostAddress();

Expand All @@ -30,11 +31,20 @@ public interface ElasticSearchService extends BeforeAllCallback {
*/
void initialize();

/**
* Shuts down the service after the test has completed
*/
void shutdown();

ElasticSearchClient getClient();

@Override
default void beforeAll(ExtensionContext extensionContext) throws Exception {
initialize();
}

@Override
default void afterAll(ExtensionContext extensionContext) throws Exception {
shutdown();
}
}
Expand Up @@ -47,6 +47,11 @@ public void initialize() {
// NO-OP
}

@Override
public void shutdown() {
// NO-OP
}

@Override
public ElasticSearchClient getClient() {
return new ElasticSearchClient(getHost(), getPort(), TestCommon.DEFAULT_ELASTICSEARCH_INDEX);
Expand Down
Expand Up @@ -57,4 +57,10 @@ public String getDefaultEndpoint() {
public void initialize() {
LOG.info("JMS broker running at address {}", container.getDefaultEndpoint());
}

@Override
public void shutdown() {
LOG.info("Stopping JMS broker container");
container.stop();
}
}
Expand Up @@ -20,10 +20,11 @@
import java.util.Properties;

import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

public interface JMSService extends BeforeAllCallback {
public interface JMSService extends BeforeAllCallback, AfterAllCallback {

/**
* Gets the connection properties for accessing the service
Expand All @@ -48,9 +49,19 @@ public interface JMSService extends BeforeAllCallback {
*/
void initialize();

/**
* Shuts down the service after the test has completed
*/
void shutdown();


@Override
default void beforeAll(ExtensionContext extensionContext) throws Exception {
initialize();
}

@Override
default void afterAll(ExtensionContext extensionContext) throws Exception {
shutdown();
}
}
Expand Up @@ -30,6 +30,11 @@ public void initialize() {
// NO-OP
}

@Override
public void shutdown() {
// NO-OP
}

@Override
public Properties getConnectionProperties() {
return PropertyUtils.getProperties();
Expand Down
Expand Up @@ -43,15 +43,14 @@

@Testcontainers
public class CamelSinkCassandraITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCassandraITCase.class);

@RegisterExtension
public CassandraService cassandraService = CassandraServiceFactory.createService();
public static CassandraService cassandraService = CassandraServiceFactory.createService();

private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCassandraITCase.class);

private CassandraClient cassandraClient;
private TestDataDao testDataDao;


private final int expect = 10;
private int received;

Expand Down
Expand Up @@ -47,10 +47,10 @@

@Testcontainers
public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class);

@RegisterExtension
public ElasticSearchService elasticSearch = ElasticSearchServiceFactory.createService();
public static ElasticSearchService elasticSearch = ElasticSearchServiceFactory.createService();

private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class);

private ElasticSearchClient client;

Expand Down
Expand Up @@ -51,10 +51,10 @@
*/
@Testcontainers
public class CamelSinkJMSITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSITCase.class);

@RegisterExtension
public JMSService jmsService = JMSServiceFactory.createService();
public static JMSService jmsService = JMSServiceFactory.createService();

private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSITCase.class);

private int received;
private final int expect = 10;
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.camel.kafkaconnector.services.jms.JMSService;
import org.apache.camel.kafkaconnector.services.jms.JMSServiceFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -46,10 +47,10 @@
*/
@Testcontainers
public class CamelSourceJMSITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);

@RegisterExtension
public JMSService jmsService = JMSServiceFactory.createService();
public static JMSService jmsService = JMSServiceFactory.createService();

private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);

private int received;
private final int expect = 10;
Expand Down

0 comments on commit fa0d98a

Please sign in to comment.