Skip to content

Commit

Permalink
Workaround to KafkaEmbedded memory leaks using Kafka Connect.
Browse files Browse the repository at this point in the history
  • Loading branch information
valdar committed Jun 15, 2020
1 parent fea1abb commit 3252ff7
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 12 deletions.
Expand Up @@ -70,6 +70,7 @@ public void setUp() {
@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));

if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) {
fail("Failed to delete queue");
}
Expand Down Expand Up @@ -143,12 +144,12 @@ public void testBasicSendReceiveUsingUrl() throws ExecutionException, Interrupte
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withUrl(AWSCommon.DEFAULT_SQS_QUEUE)
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.appendIfAvailable("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.appendIfAvailable("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();

runTest(connectorPropertyFactory);
}
Expand Down
Expand Up @@ -24,10 +24,13 @@
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
public abstract class AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaTest.class);

@RegisterExtension
public final KafkaService kafkaService;
Expand Down Expand Up @@ -60,7 +63,11 @@ public KafkaConnectService getKafkaConnectService() {
}

protected void deleteKafkaTopic(String topic) {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.deleteTopic(topic);
try {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.deleteTopic(topic);
} catch (Throwable t) {
LOG.warn("Topic not deleted (probably the Kafka test cluster was already shutting down?).", t);
}
}
}
Expand Up @@ -25,6 +25,7 @@
import org.apache.camel.kafkaconnector.common.PluginPathHelper;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,8 +71,10 @@ private void buildCluster() {

@Override
public String getBootstrapServers() {

return cluster.kafka().bootstrapServers();
if (started) {
return cluster.kafka().bootstrapServers();
}
return null;
}

@Override
Expand All @@ -95,6 +98,15 @@ public void shutdown() {
}
}

@Override
public void beforeTestExecution(ExtensionContext extensionContext) throws Exception {
initialize();
}

@Override
public void afterTestExecution(ExtensionContext context) throws Exception {
shutdown();
}

// WARNING: this may come uninitialized
public EmbeddedConnectCluster getCluster() {
Expand Down
Expand Up @@ -18,13 +18,15 @@
package org.apache.camel.kafkaconnector.common.services.kafka;

import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

/**
* Provides an interface for any type of Kafka service: remote instances, local container, etc
*/
public interface KafkaService extends BeforeAllCallback, AfterAllCallback {
public interface KafkaService extends BeforeAllCallback, BeforeTestExecutionCallback, AfterAllCallback, AfterTestExecutionCallback {


/**
Expand All @@ -44,14 +46,23 @@ public interface KafkaService extends BeforeAllCallback, AfterAllCallback {
*/
void shutdown();


@Override
default void beforeAll(ExtensionContext extensionContext) throws Exception {
initialize();
}

@Override
default void beforeTestExecution(ExtensionContext extensionContext) throws Exception {
//no op
}

@Override
default void afterAll(ExtensionContext extensionContext) throws Exception {
shutdown();
}

@Override
default void afterTestExecution(ExtensionContext context) throws Exception {
//no op
}
}

0 comments on commit 3252ff7

Please sign in to comment.