diff --git a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java index bc02984a87..cedb12d7aa 100644 --- a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java +++ b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java @@ -19,17 +19,12 @@ import java.util.concurrent.ExecutionException; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -38,11 +33,9 @@ * messages */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceTimerITCase extends AbstractKafkaTest { - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceTimerITCase.class); - - private int received; +public class CamelSourceTimerITCase extends CamelSourceTestSupport { private final int expect = 10; + private String topicName; @Override protected String[] getConnectorsInTest() { @@ -51,53 +44,43 @@ protected String[] getConnectorsInTest() { @BeforeEach public void setUp() { - received = 0; + topicName = getTopicForTest(this); } - private boolean checkRecord(ConsumerRecord record) { - received++; - - if (received == expect) { - return false; - } - - return true; + @Override + protected void produceTestData() { + // NO-OP } - private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - LOG.debug("Creating the consumer ..."); - KafkaClient kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); + @Override + protected void verifyMessages(TestMessageConsumer consumer) { + int received = consumer.consumedMessages().size(); - assertEquals(received, expect); + assertEquals(expect, received, "Did not receive as many messages as expected"); } @Test - @Timeout(90) + @Timeout(30) public void testLaunchConnector() throws ExecutionException, InterruptedException { CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withTimerName("launchTest") .withRepeatCount(expect); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } @Test - @Timeout(90) + @Timeout(30) public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException { CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withUrl("launchTestUsingUrl") .append("repeatCount", expect) .buildUrl(); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } }