Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds basic aggregation test (github issue #291) #388

Merged
merged 1 commit into from Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -30,4 +30,12 @@ public T withSourceUrl(String sourceUrl) {

return (T) this;
}

public T withAggregate(String aggregate, int size, int timeout) {
withBeans("aggregate", classRef(aggregate));
getProperties().put("camel.beans.aggregation.size", size);
getProperties().put("camel.beans.aggregation.timeout", timeout);

return (T) this;
}
}
Expand Up @@ -24,6 +24,7 @@
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -82,6 +83,21 @@ public KafkaClient(String bootstrapServer) {
consumer = new KafkaConsumer<>(consumerPropertyFactory.getProperties());
}

/**
* Consumes message from the given topic
*
* @param topic the topic to consume the messages from
* @param recordConsumer the a function to consume the received messages
*/
public void consumeAvailable(String topic, Consumer<ConsumerRecord<K, V>> recordConsumer) {
consumer.subscribe(Arrays.asList(topic));

ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<K, V> record : records) {
recordConsumer.accept(record);
}
}


/**
* Consumes message from the given topic until the predicate returns false
Expand Down
Expand Up @@ -34,6 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.fail;

/**
* A basic multi-protocol JMS client
*/
Expand Down Expand Up @@ -235,4 +237,56 @@ public void send(final String queue, int data) throws JMSException {
capturingClose(producer);
}
}


public static void produceMessages(JMSClient jmsProducer, String queue, int count, Function<Integer, String> supplier) {
try {
jmsProducer.start();
for (int i = 0; i < count; i++) {
jmsProducer.send(queue, supplier.apply(i));
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}

public static void produceMessages(JMSClient jmsProducer, String queue, int count, String baseText) {
try {
jmsProducer.start();
for (int i = 0; i < count; i++) {
jmsProducer.send(queue, baseText + " " + i);
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}

public static void produceMessages(JMSClient jmsProducer, String queue, int count) {
try {
jmsProducer.start();
for (int i = 0; i < count; i++) {
jmsProducer.send(queue, i);
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}
}
Expand Up @@ -19,8 +19,6 @@

import java.util.concurrent.ExecutionException;

import javax.jms.JMSException;

import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
Expand Down Expand Up @@ -55,6 +53,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {

private int received;
private final int expect = 10;
private JMSClient jmsClient;

@Override
protected String[] getConnectorsInTest() {
Expand All @@ -64,6 +63,7 @@ protected String[] getConnectorsInTest() {
@BeforeEach
public void setUp() {
received = 0;
jmsClient = jmsService.getClient();
}

private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
Expand All @@ -77,53 +77,13 @@ private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
return true;
}

private void produceMessages(String queue, String baseText) {
JMSClient jmsProducer = null;

try {
jmsProducer = jmsService.getClient();

jmsProducer.start();
for (int i = 0; i < expect; i++) {
jmsProducer.send(queue, baseText + " " + i);
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}

private void produceMessages(String queue) {
JMSClient jmsProducer = null;

try {
jmsProducer = jmsService.getClient();

jmsProducer.start();
for (int i = 0; i < expect; i++) {
jmsProducer.send(queue, i);
}
} catch (JMSException e) {
LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} catch (Exception e) {
LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e);
fail(e.getMessage());
} finally {
jmsProducer.stop();
}
}

public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);

produceMessages(SJMS2Common.DEFAULT_JMS_QUEUE, "Test string message");
JMSClient.produceMessages(jmsClient, SJMS2Common.DEFAULT_JMS_QUEUE, expect, "Test string message");

LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
Expand Down Expand Up @@ -184,7 +144,7 @@ public void testIntSendReceive() {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);

produceMessages(jmsQueueName);
JMSClient.produceMessages(jmsClient, jmsQueueName, expect);

LOG.debug("Creating the consumer ...");
KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
Expand All @@ -198,4 +158,6 @@ public void testIntSendReceive() {
}

}


}
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.sjms2.source;

import java.util.concurrent.ExecutionException;

import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.camel.kafkaconnector.sjms2.services.JMSService;
import org.apache.camel.kafkaconnector.sjms2.services.JMSServiceFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
@RegisterExtension
public static JMSService jmsService = JMSServiceFactory.createService();

private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);

private int received;
private final int sentSize = 10;
private final int expect = 1;
private JMSClient jmsClient;
private String receivedMessage = "";
private String expectedMessage = "";
private String queueName;


@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-sjms2-kafka-connector"};
}

@BeforeEach
public void setUp() {
received = 0;
jmsClient = jmsService.getClient();

for (int i = 0; i < sentSize - 1; i++) {
expectedMessage += "hello;\n";
}

expectedMessage += "hello;";
queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100);
}

private void checkRecord(ConsumerRecord<String, String> record) {
receivedMessage += record.value();
LOG.debug("Received: {}", receivedMessage);

received++;
}

private static String textToSend(Integer i) {
return "hello;";
}


public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);

JMSClient.produceMessages(jmsClient, queueName, sentSize,
CamelSourceJMSWithAggregation::textToSend);

LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.consumeAvailable(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
LOG.debug("Created the consumer ...");

assertEquals(expect, received, "Didn't process the expected amount of messages");
assertEquals(expectedMessage, receivedMessage, "The messages don't match");
}

@Test
@Timeout(90)
public void testBasicSendReceive() {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withDestinationName(queueName)
.withConnectionProperties(jmsService.getConnectionProperties())
.withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize,
1000);

runBasicStringTest(connectorPropertyFactory);
} catch (Exception e) {
LOG.error("JMS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
}
}