Skip to content

Commit

Permalink
Converted the Timer source test case to use the reusable source base …
Browse files Browse the repository at this point in the history
…class
  • Loading branch information
orpiske committed Feb 9, 2021
1 parent dd01e50 commit 313cbd6
Showing 1 changed file with 18 additions and 35 deletions.
Expand Up @@ -19,17 +19,12 @@

import java.util.concurrent.ExecutionException;

import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand All @@ -38,11 +33,9 @@
* messages
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSourceTimerITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceTimerITCase.class);

private int received;
public class CamelSourceTimerITCase extends CamelSourceTestSupport {
private final int expect = 10;
private String topicName;

@Override
protected String[] getConnectorsInTest() {
Expand All @@ -51,53 +44,43 @@ protected String[] getConnectorsInTest() {

@BeforeEach
public void setUp() {
received = 0;
topicName = getTopicForTest(this);
}

private boolean checkRecord(ConsumerRecord<String, String> record) {
received++;

if (received == expect) {
return false;
}

return true;
@Override
protected void produceTestData() {
// NO-OP
}

private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);

LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
LOG.debug("Created the consumer ...");
@Override
protected void verifyMessages(TestMessageConsumer<?> consumer) {
int received = consumer.consumedMessages().size();

assertEquals(received, expect);
assertEquals(expect, received, "Did not receive as many messages as expected");
}

@Test
@Timeout(90)
@Timeout(30)
public void testLaunchConnector() throws ExecutionException, InterruptedException {
CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withKafkaTopic(topicName)
.withTimerName("launchTest")
.withRepeatCount(expect);

runTest(connectorPropertyFactory);
runTest(connectorPropertyFactory, topicName, expect);
}

@Test
@Timeout(90)
@Timeout(30)
public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withKafkaTopic(topicName)
.withUrl("launchTestUsingUrl")
.append("repeatCount", expect)
.buildUrl();

runTest(connectorPropertyFactory);
runTest(connectorPropertyFactory, topicName, expect);
}
}

0 comments on commit 313cbd6

Please sign in to comment.