Skip to content

Commit

Permalink
fix #194 : Support both marshal and unmarshal in source and sinks.
Browse files Browse the repository at this point in the history
  • Loading branch information
valdar committed Jul 29, 2020
1 parent 3589ae5 commit 042242c
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 18 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Expand Up @@ -121,6 +121,11 @@
<artifactId>camel-hl7</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-syslog</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-debezium-common</artifactId>
Expand Down
Expand Up @@ -29,6 +29,10 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
public static final String CAMEL_SINK_MARSHAL_CONF = "camel.sink.marshal";
public static final String CAMEL_SINK_MARSHAL_DOC = "The camel dataformat name to use to marshal data to the destination";

public static final String CAMEL_SINK_UNMARSHAL_DEFAULT = null;
public static final String CAMEL_SINK_UNMARSHAL_CONF = "camel.sink.unmarshal";
public static final String CAMEL_SINK_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the topic";

public static final String CAMEL_SINK_COMPONENT_DEFAULT = null;
public static final String CAMEL_SINK_COMPONENT_CONF = "camel.sink.component";

Expand Down Expand Up @@ -61,7 +65,9 @@ public class CamelSinkConnectorConfig extends AbstractConfig {

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
.define(TOPIC_CONF, Type.STRING, TOPIC_DEFAULT, Importance.HIGH, TOPIC_DOC)
.define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
.define(CAMEL_SINK_UNMARSHAL_CONF, Type.STRING, CAMEL_SINK_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_UNMARSHAL_DOC)
.define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
.define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC)
.define(CAMEL_SINK_AGGREGATE_CONF, Type.STRING, CAMEL_SINK_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_DOC)
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -74,9 +75,13 @@ public void start(Map<String, String> props) {

String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
List<CamelKafkaConnectDataformat> dataformats = Collections.emptyList();
final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>();
if (unmarshaller != null) {
dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
}
if (marshaller != null) {
dataformats = Collections.singletonList(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
}
final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF);
final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF);
Expand Down
Expand Up @@ -25,6 +25,14 @@
import org.apache.kafka.common.config.ConfigDef.Type;

public class CamelSourceConnectorConfig extends AbstractConfig {
public static final String CAMEL_SOURCE_UNMARSHAL_DEFAULT = null;
public static final String CAMEL_SOURCE_UNMARSHAL_CONF = "camel.source.unmarshal";
public static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";

public static final String CAMEL_SOURCE_MARSHAL_DEFAULT = null;
public static final String CAMEL_SOURCE_MARSHAL_CONF = "camel.source.marshal";
public static final String CAMEL_SOURCE_MARSHAL_DOC = "The camel dataformat name to use to unmarshal data to the topic";

public static final String CAMEL_SOURCE_URL_DEFAULT = null;
public static final String CAMEL_SOURCE_URL_CONF = "camel.source.url";

Expand All @@ -35,10 +43,6 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
public static final String CAMEL_SOURCE_URL_DOC = "The camel url to configure the source. If this is set " + CAMEL_SOURCE_COMPONENT_CONF
+ " and all the properties starting with " + CamelSourceTask.getCamelSourceEndpointConfigPrefix() + ".<" + CAMEL_SOURCE_COMPONENT_CONF + " value> are ignored.";

public static final String CAMEL_SOURCE_UNMARSHAL_DEFAULT = null;
public static final String CAMEL_SOURCE_UNMARSHAL_CONF = "camel.source.unmarshal";
public static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";

public static final String TOPIC_DEFAULT = "test";
public static final String TOPIC_CONF = "topics";
public static final String TOPIC_DOC = "A list of topics to use as output for this connector";
Expand Down Expand Up @@ -75,6 +79,7 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC)
.define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
.define(CAMEL_SOURCE_MARSHAL_CONF, Type.STRING, CAMEL_SOURCE_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_MARSHAL_DOC)
.define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
.define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -84,9 +85,13 @@ public void start(Map<String, String> props) {

String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
List<CamelKafkaConnectDataformat> dataformats = Collections.emptyList();
final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>();
if (unmarshaller != null) {
dataformats = Collections.singletonList(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
}
if (marshaller != null) {
dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
}
topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);

Expand Down
Expand Up @@ -18,17 +18,20 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.camel.component.hl7.HL7DataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
import org.apache.camel.model.dataformat.SyslogDataFormat;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -40,6 +43,7 @@ public void testDataFormatSource() {
props.put("camel.source.url", "direct://test");
props.put("topics", "mytopic");
props.put("camel.source.marshal", "syslog");
props.put("camel.source.unmarshal", "hl7");

CamelSourceTask camelsourceTask = new CamelSourceTask();
camelsourceTask.start(props);
Expand All @@ -52,13 +56,13 @@ public void testDataFormatSink() {
props.put("camel.sink.url", "direct://test");
props.put("camel.sink.kafka.topic", "mytopic");
props.put("camel.sink.unmarshal", "syslog");
props.put("camel.source.marshal", "hl7");

CamelSinkTask camelsinkTask = new CamelSinkTask();
camelsinkTask.start(props);
camelsinkTask.stop();
}


@Test
public void testDataFormatNotFound() {
Map<String, String> props = new HashMap<>();
Expand All @@ -71,15 +75,34 @@ public void testDataFormatNotFound() {
assertThrows(ConnectException.class, () -> camelsinkTask.stop());
}

// @Test
// public void testBothDataFormatConfiguredError() throws Exception {
// Map<String, String> props = new HashMap<>();
//
// dataformats = Collections.singletonList(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
//
// assertThrows(UnsupportedOperationException.class, () -> new CamelMainSupport(props, "direct://start",
// "log://test", "syslog", "syslog", 10, 500));
// }
@Test
public void testMultipleDataFormatConfigured() throws Exception {
Map<String, String> props = new HashMap<>();
props.put("camel.source.url", "direct://test");
props.put("topics", "mytopic");
props.put("camel.source.marshal", "hl7");
props.put("camel.source.unmarshal", "syslog");

List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>();
dataformats.add(new CamelKafkaConnectDataformat("hl7", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
dataformats.add(new CamelKafkaConnectDataformat("syslog", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
DefaultCamelContext dcc = new DefaultCamelContext();
CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", dataformats, 10, 500, dcc);

HL7DataFormat hl7Df = new HL7DataFormat();
hl7Df.setValidate(false);
dcc.getRegistry().bind("hl7", hl7Df);

SyslogDataFormat syslogDf = new SyslogDataFormat();
dcc.getRegistry().bind("syslog", syslogDf);

cms.start();
HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
assertNotNull(hl7dfLoaded);
SyslogDataFormat syslogDfLoaded = dcc.getRegistry().lookupByNameAndType("syslog", SyslogDataFormat.class);
assertNotNull(syslogDfLoaded);
cms.stop();
}

@Test
public void testDataFormatLookUpInRegistry() throws Exception {
Expand Down

0 comments on commit 042242c

Please sign in to comment.