Skip to content

Commit

Permalink
Added idempotency test for SJMS2 using header expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
orpiske authored and oscerd committed Feb 3, 2021
1 parent cc371d7 commit 3ffb61a
Showing 1 changed file with 69 additions and 11 deletions.
Expand Up @@ -17,6 +17,8 @@

package org.apache.camel.kafkaconnector.sjms2.sink;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -28,6 +30,7 @@
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;

import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
Expand All @@ -53,6 +56,11 @@
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
@FunctionalInterface
interface Producer {
void producerMessages();
}

@RegisterExtension
public static MessagingService jmsService = MessagingServiceBuilder
.newBuilder(DispatchRouterContainer::new)
Expand Down Expand Up @@ -84,7 +92,8 @@ protected String[] getConnectorsInTest() {
public void setUp() {
LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
received = 0;
topic = TestUtils.getDefaultTestTopic(this.getClass());

topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100);
destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
}

Expand Down Expand Up @@ -142,7 +151,7 @@ private void consumeJMSMessages() {
}
}

private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
private void runTest(ConnectorPropertyFactory connectorPropertyFactory, Producer producer) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);

Expand All @@ -151,14 +160,7 @@ private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws E
LOG.debug("Creating the consumer ...");
service.submit(() -> consumeJMSMessages());

KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());

for (int i = 0; i < expect; i++) {
LOG.debug("Sending message 1/2");
kafkaClient.produce(topic, "Sink test message " + i);
LOG.debug("Sending message 2/2");
kafkaClient.produce(topic, "Sink test message " + i);
}
producer.producerMessages();

LOG.debug("Waiting for the messages to be processed");
service.shutdown();
Expand All @@ -170,6 +172,39 @@ private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws E
}
}

private void produceMessagesNoProperties() {
try {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());

for (int i = 0; i < expect; i++) {
LOG.debug("Sending message 1/2");
kafkaClient.produce(topic, "Sink test message " + i);
LOG.debug("Sending message 2/2");
kafkaClient.produce(topic, "Sink test message " + i);
}
} catch (Exception e) {
fail(e.getMessage());
}
}

private void produceMessagesWithProperties() {
try {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());

for (int i = 0; i < expect; i++) {
Map<String, String> headers = new HashMap<>();
int randomNumber = TestUtils.randomWithRange(1, 1000);

headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "MessageNumber", String.valueOf(i));

kafkaClient.produce(topic, "Sink test message " + randomNumber, headers);
kafkaClient.produce(topic, "Sink test message " + randomNumber + 1, headers);
}
} catch (Exception e) {
fail(e.getMessage());
}
}

@Test
@Timeout(90)
public void testIdempotentBodySendReceive() {
Expand All @@ -184,7 +219,30 @@ public void testIdempotentBodySendReceive() {
.withExpressionType("body")
.end();

runTest(connectorPropertyFactory);
runTest(connectorPropertyFactory, this::produceMessagesNoProperties);

} catch (Exception e) {
LOG.error("JMS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
}

@Test
@Timeout(90)
public void testIdempotentHeaderSendReceive() {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
.withTopics(topic)
.withConnectionProperties(connectionProperties())
.withDestinationName(destinationName)
.withIdempotency()
.withRepositoryType("memory")
.withExpressionType("header")
.withExpressionHeader("MessageNumber")
.end();

runTest(connectorPropertyFactory, this::produceMessagesWithProperties);

} catch (Exception e) {
LOG.error("JMS test failed: {}", e.getMessage(), e);
Expand Down

0 comments on commit 3ffb61a

Please sign in to comment.