Skip to content

Commit

Permalink
Convert the Syslog tests to the new reusable sink test base class
Browse files Browse the repository at this point in the history
  • Loading branch information
orpiske committed Feb 3, 2021
1 parent 36f3f9b commit b6a1bb9
Showing 1 changed file with 39 additions and 32 deletions.
Expand Up @@ -16,19 +16,19 @@
*/
package org.apache.camel.kafkaconnector.syslog.sink;

import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
Expand All @@ -39,15 +39,14 @@
* messages
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkSyslogITCase extends AbstractKafkaTest {
public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP);
private static final String TEST_TXT = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";

@RegisterExtension
public static SyslogService syslogService = new SyslogService("udp", "//localhost", FREE_PORT);

private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSyslogITCase.class);

private int received;
private String topicName;
private final int expect = 1;

@Override
Expand All @@ -57,36 +56,44 @@ protected String[] getConnectorsInTest() {

@BeforeEach
public void setUp() {
received = 0;
topicName = getTopicForTest(this);
}

private void runBasicProduceTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
@Override
protected String testMessageContent(int current) {
return TEST_TXT;
}

@Override
protected Map<String, String> messageHeaders(String text, int current) {
return null;
}

LOG.debug("Creating the producer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!");
LOG.debug("Created the producer ...");
@Override
protected void consumeMessages(CountDownLatch latch) {
latch.countDown();
}

assertEquals("<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!", syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
@Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (latch.await(30, TimeUnit.SECONDS)) {
assertEquals(TEST_TXT, syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
} else {
fail("Timed out wait for data to be added to the Kafka cluster");
}
}


@Test
@Timeout(90)
public void testBasicReceive() {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
.basic()
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
.withHost("localhost")
.withPort(FREE_PORT)
.withProtocol("udp");

runBasicProduceTest(connectorPropertyFactory);
} catch (Exception e) {
LOG.error("Syslog test failed: {} {}", e.getMessage(), e);
fail(e.getMessage(), e);
}
public void testBasicReceive() throws Exception {
ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
.basic()
.withTopics(topicName)
.withHost("localhost")
.withPort(FREE_PORT)
.withProtocol("udp");

runTest(connectorPropertyFactory, topicName, expect);
}
}

0 comments on commit b6a1bb9

Please sign in to comment.