Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added idempotency test using header expressions #963

Merged
merged 2 commits into from Feb 3, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,6 +17,8 @@

package org.apache.camel.kafkaconnector.sjms2.sink;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -28,6 +30,7 @@
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;

import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
Expand All @@ -53,6 +56,11 @@
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
@FunctionalInterface
interface Producer {
void producerMessages();
}

@RegisterExtension
public static MessagingService jmsService = MessagingServiceBuilder
.newBuilder(DispatchRouterContainer::new)
Expand All @@ -62,6 +70,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkIdempotentJMSITCase.class);

private String topic;
private String destinationName;
private int received;
private final int expect = 10;

Expand All @@ -83,7 +92,9 @@ protected String[] getConnectorsInTest() {
public void setUp() {
LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
received = 0;
topic = TestUtils.getDefaultTestTopic(this.getClass());

topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100);
destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
}

private boolean checkRecord(Message jmsMessage) {
Expand Down Expand Up @@ -111,7 +122,7 @@ private void consumeJMSMessages() {
jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
jmsClient.start();

try (MessageConsumer consumer = jmsClient.createConsumer(SJMS2Common.DEFAULT_JMS_QUEUE)) {
try (MessageConsumer consumer = jmsClient.createConsumer(destinationName)) {
// number of retries until stale
int retries = 10;

Expand Down Expand Up @@ -140,7 +151,7 @@ private void consumeJMSMessages() {
}
}

private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
private void runTest(ConnectorPropertyFactory connectorPropertyFactory, Producer producer) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);

Expand All @@ -149,14 +160,7 @@ private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws E
LOG.debug("Creating the consumer ...");
service.submit(() -> consumeJMSMessages());

KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());

for (int i = 0; i < expect; i++) {
LOG.debug("Sending message 1/2");
kafkaClient.produce(topic, "Sink test message " + i);
LOG.debug("Sending message 2/2");
kafkaClient.produce(topic, "Sink test message " + i);
}
producer.producerMessages();

LOG.debug("Waiting for the messages to be processed");
service.shutdown();
Expand All @@ -168,6 +172,39 @@ private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws E
}
}

private void produceMessagesNoProperties() {
try {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());

for (int i = 0; i < expect; i++) {
LOG.debug("Sending message 1/2");
kafkaClient.produce(topic, "Sink test message " + i);
LOG.debug("Sending message 2/2");
kafkaClient.produce(topic, "Sink test message " + i);
}
} catch (Exception e) {
fail(e.getMessage());
}
}

private void produceMessagesWithProperties() {
try {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());

for (int i = 0; i < expect; i++) {
Map<String, String> headers = new HashMap<>();
int randomNumber = TestUtils.randomWithRange(1, 1000);

headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "MessageNumber", String.valueOf(i));

kafkaClient.produce(topic, "Sink test message " + randomNumber, headers);
kafkaClient.produce(topic, "Sink test message " + randomNumber + 1, headers);
}
} catch (Exception e) {
fail(e.getMessage());
}
}

@Test
@Timeout(90)
public void testIdempotentBodySendReceive() {
Expand All @@ -176,13 +213,36 @@ public void testIdempotentBodySendReceive() {
.basic()
.withTopics(topic)
.withConnectionProperties(connectionProperties())
.withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
.withDestinationName(destinationName)
.withIdempotency()
.withRepositoryType("memory")
.withExpressionType("body")
.end();

runTest(connectorPropertyFactory);
runTest(connectorPropertyFactory, this::produceMessagesNoProperties);

} catch (Exception e) {
LOG.error("JMS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
}

@Test
@Timeout(90)
public void testIdempotentHeaderSendReceive() {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
.withTopics(topic)
.withConnectionProperties(connectionProperties())
.withDestinationName(destinationName)
.withIdempotency()
.withRepositoryType("memory")
.withExpressionType("header")
.withExpressionHeader("MessageNumber")
.end();

runTest(connectorPropertyFactory, this::produceMessagesWithProperties);

} catch (Exception e) {
LOG.error("JMS test failed: {}", e.getMessage(), e);
Expand Down