Skip to content

Commit

Permalink
Merge pull request #388 from orpiske/aggregation-test
Browse files Browse the repository at this point in the history
Adds basic aggregation test (github issue #291)
  • Loading branch information
oscerd committed Aug 20, 2020
2 parents 9b5d415 + d20fcdd commit 0fa09ac
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 44 deletions.
Expand Up @@ -30,4 +30,12 @@ public T withSourceUrl(String sourceUrl) {

return (T) this;
}

public T withAggregate(String aggregate, int size, int timeout) {
withBeans("aggregate", classRef(aggregate));
getProperties().put("camel.beans.aggregation.size", size);
getProperties().put("camel.beans.aggregation.timeout", timeout);

return (T) this;
}
}
Expand Up @@ -24,6 +24,7 @@
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -82,6 +83,21 @@ public KafkaClient(String bootstrapServer) {
consumer = new KafkaConsumer<>(consumerPropertyFactory.getProperties());
}

/**
* Consumes message from the given topic
*
* @param topic the topic to consume the messages from
* @param recordConsumer the a function to consume the received messages
*/
public void consumeAvailable(String topic, Consumer<ConsumerRecord<K, V>> recordConsumer) {
consumer.subscribe(Arrays.asList(topic));

ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<K, V> record : records) {
recordConsumer.accept(record);
}
}


/**
* Consumes message from the given topic until the predicate returns false
Expand Down
Expand Up @@ -34,6 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.fail;

/**
* A basic multi-protocol JMS client
*/
Expand Down Expand Up @@ -235,4 +237,56 @@ public void send(final String queue, int data) throws JMSException {
capturingClose(producer);
}
}


public static void produceMessages(JMSClient jmsProducer, String queue, int count, Function<Integer, String> supplier) {
try {
jmsProducer.start();
for (int i = 0; i < count; i++) {
jmsProducer.send(queue, supplier.apply(i));
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}

public static void produceMessages(JMSClient jmsProducer, String queue, int count, String baseText) {
try {
jmsProducer.start();
for (int i = 0; i < count; i++) {
jmsProducer.send(queue, baseText + " " + i);
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}

public static void produceMessages(JMSClient jmsProducer, String queue, int count) {
try {
jmsProducer.start();
for (int i = 0; i < count; i++) {
jmsProducer.send(queue, i);
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}
}
Expand Up @@ -19,8 +19,6 @@

import java.util.concurrent.ExecutionException;

import javax.jms.JMSException;

import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
Expand Down Expand Up @@ -55,6 +53,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {

private int received;
private final int expect = 10;
private JMSClient jmsClient;

@Override
protected String[] getConnectorsInTest() {
Expand All @@ -64,6 +63,7 @@ protected String[] getConnectorsInTest() {
@BeforeEach
public void setUp() {
received = 0;
jmsClient = jmsService.getClient();
}

private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
Expand All @@ -77,53 +77,13 @@ private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
return true;
}

private void produceMessages(String queue, String baseText) {
JMSClient jmsProducer = null;

try {
jmsProducer = jmsService.getClient();

jmsProducer.start();
for (int i = 0; i < expect; i++) {
jmsProducer.send(queue, baseText + " " + i);
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}

private void produceMessages(String queue) {
JMSClient jmsProducer = null;

try {
jmsProducer = jmsService.getClient();

jmsProducer.start();
for (int i = 0; i < expect; i++) {
jmsProducer.send(queue, i);
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}

public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);

produceMessages(SJMS2Common.DEFAULT_JMS_QUEUE, "Test string message");
JMSClient.produceMessages(jmsClient, SJMS2Common.DEFAULT_JMS_QUEUE, expect, "Test string message");

LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
Expand Down Expand Up @@ -184,7 +144,7 @@ public void testIntSendReceive() {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);

produceMessages(jmsQueueName);
JMSClient.produceMessages(jmsClient, jmsQueueName, expect);

LOG.debug("Creating the consumer ...");
KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
Expand All @@ -198,4 +158,6 @@ public void testIntSendReceive() {
}

}


}
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.sjms2.source;

import java.util.concurrent.ExecutionException;

import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.camel.kafkaconnector.sjms2.services.JMSService;
import org.apache.camel.kafkaconnector.sjms2.services.JMSServiceFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
@RegisterExtension
public static JMSService jmsService = JMSServiceFactory.createService();

private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);

private int received;
private final int sentSize = 10;
private final int expect = 1;
private JMSClient jmsClient;
private String receivedMessage = "";
private String expectedMessage = "";
private String queueName;


@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-sjms2-kafka-connector"};
}

@BeforeEach
public void setUp() {
received = 0;
jmsClient = jmsService.getClient();

for (int i = 0; i < sentSize - 1; i++) {
expectedMessage += "hello;\n";
}

expectedMessage += "hello;";
queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100);
}

private void checkRecord(ConsumerRecord<String, String> record) {
receivedMessage += record.value();
LOG.debug("Received: {}", receivedMessage);

received++;
}

private static String textToSend(Integer i) {
return "hello;";
}


public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);

JMSClient.produceMessages(jmsClient, queueName, sentSize,
CamelSourceJMSWithAggregation::textToSend);

LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.consumeAvailable(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
LOG.debug("Created the consumer ...");

assertEquals(expect, received, "Didn't process the expected amount of messages");
assertEquals(expectedMessage, receivedMessage, "The messages don't match");
}

@Test
@Timeout(90)
public void testBasicSendReceive() {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withDestinationName(queueName)
.withConnectionProperties(jmsService.getConnectionProperties())
.withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize,
1000);

runBasicStringTest(connectorPropertyFactory);
} catch (Exception e) {
LOG.error("JMS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
}
}

0 comments on commit 0fa09ac

Please sign in to comment.