diff --git a/core/pom.xml b/core/pom.xml
index df523c7eb3..c4f35bbee8 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -121,6 +121,11 @@
camel-hl7
test
+
+ org.apache.camel
+ camel-syslog
+ test
+
org.apache.camel
camel-debezium-common
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index b6a654e67e..bfd5bbf463 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -29,6 +29,10 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
public static final String CAMEL_SINK_MARSHAL_CONF = "camel.sink.marshal";
public static final String CAMEL_SINK_MARSHAL_DOC = "The camel dataformat name to use to marshal data to the destination";
+ public static final String CAMEL_SINK_UNMARSHAL_DEFAULT = null;
+ public static final String CAMEL_SINK_UNMARSHAL_CONF = "camel.sink.unmarshal";
+ public static final String CAMEL_SINK_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the topic";
+
public static final String CAMEL_SINK_COMPONENT_DEFAULT = null;
public static final String CAMEL_SINK_COMPONENT_CONF = "camel.sink.component";
@@ -61,7 +65,9 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
+ .define(TOPIC_CONF, Type.STRING, TOPIC_DEFAULT, Importance.HIGH, TOPIC_DOC)
.define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
+ .define(CAMEL_SINK_UNMARSHAL_CONF, Type.STRING, CAMEL_SINK_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_UNMARSHAL_DOC)
.define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
.define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC)
.define(CAMEL_SINK_AGGREGATE_CONF, Type.STRING, CAMEL_SINK_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_DOC)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index ddaba6b928..57a15bae4b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -74,9 +75,13 @@ public void start(Map props) {
String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
- List dataformats = Collections.emptyList();
+ final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
+ List dataformats = new LinkedList<>();
+ if (unmarshaller != null) {
+ dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
+ }
if (marshaller != null) {
- dataformats = Collections.singletonList(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
+ dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
}
final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF);
final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 01d9fd37dc..92878ae0df 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -25,6 +25,14 @@
import org.apache.kafka.common.config.ConfigDef.Type;
public class CamelSourceConnectorConfig extends AbstractConfig {
+ public static final String CAMEL_SOURCE_UNMARSHAL_DEFAULT = null;
+ public static final String CAMEL_SOURCE_UNMARSHAL_CONF = "camel.source.unmarshal";
+ public static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";
+
+ public static final String CAMEL_SOURCE_MARSHAL_DEFAULT = null;
+ public static final String CAMEL_SOURCE_MARSHAL_CONF = "camel.source.marshal";
+ public static final String CAMEL_SOURCE_MARSHAL_DOC = "The camel dataformat name to use to unmarshal data to the topic";
+
public static final String CAMEL_SOURCE_URL_DEFAULT = null;
public static final String CAMEL_SOURCE_URL_CONF = "camel.source.url";
@@ -35,10 +43,6 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
public static final String CAMEL_SOURCE_URL_DOC = "The camel url to configure the source. If this is set " + CAMEL_SOURCE_COMPONENT_CONF
+ " and all the properties starting with " + CamelSourceTask.getCamelSourceEndpointConfigPrefix() + ".<" + CAMEL_SOURCE_COMPONENT_CONF + " value> are ignored.";
- public static final String CAMEL_SOURCE_UNMARSHAL_DEFAULT = null;
- public static final String CAMEL_SOURCE_UNMARSHAL_CONF = "camel.source.unmarshal";
- public static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";
-
public static final String TOPIC_DEFAULT = "test";
public static final String TOPIC_CONF = "topics";
public static final String TOPIC_DOC = "A list of topics to use as output for this connector";
@@ -75,6 +79,7 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC)
.define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
+ .define(CAMEL_SOURCE_MARSHAL_CONF, Type.STRING, CAMEL_SOURCE_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_MARSHAL_DOC)
.define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
.define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index d235c3d4af..acdba59c6f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -84,9 +85,13 @@ public void start(Map props) {
String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
- List dataformats = Collections.emptyList();
+ final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
+ List dataformats = new LinkedList<>();
if (unmarshaller != null) {
- dataformats = Collections.singletonList(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
+ dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
+ }
+ if (marshaller != null) {
+ dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
}
topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index c916666478..6b7abf3bf7 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -25,10 +26,12 @@
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
+import org.apache.camel.model.dataformat.SyslogDataFormat;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -40,6 +43,7 @@ public void testDataFormatSource() {
props.put("camel.source.url", "direct://test");
props.put("topics", "mytopic");
props.put("camel.source.marshal", "syslog");
+ props.put("camel.source.unmarshal", "hl7");
CamelSourceTask camelsourceTask = new CamelSourceTask();
camelsourceTask.start(props);
@@ -52,13 +56,13 @@ public void testDataFormatSink() {
props.put("camel.sink.url", "direct://test");
props.put("camel.sink.kafka.topic", "mytopic");
props.put("camel.sink.unmarshal", "syslog");
+ props.put("camel.source.marshal", "hl7");
CamelSinkTask camelsinkTask = new CamelSinkTask();
camelsinkTask.start(props);
camelsinkTask.stop();
}
-
@Test
public void testDataFormatNotFound() {
Map props = new HashMap<>();
@@ -71,15 +75,34 @@ public void testDataFormatNotFound() {
assertThrows(ConnectException.class, () -> camelsinkTask.stop());
}
-// @Test
-// public void testBothDataFormatConfiguredError() throws Exception {
-// Map props = new HashMap<>();
-//
-// dataformats = Collections.singletonList(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
-//
-// assertThrows(UnsupportedOperationException.class, () -> new CamelMainSupport(props, "direct://start",
-// "log://test", "syslog", "syslog", 10, 500));
-// }
+ @Test
+ public void testMultipleDataFormatConfigured() throws Exception {
+ Map props = new HashMap<>();
+ props.put("camel.source.url", "direct://test");
+ props.put("topics", "mytopic");
+ props.put("camel.source.marshal", "hl7");
+ props.put("camel.source.unmarshal", "syslog");
+
+ List dataformats = new LinkedList<>();
+ dataformats.add(new CamelKafkaConnectDataformat("hl7", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
+ dataformats.add(new CamelKafkaConnectDataformat("syslog", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
+ DefaultCamelContext dcc = new DefaultCamelContext();
+ CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", dataformats, 10, 500, dcc);
+
+ HL7DataFormat hl7Df = new HL7DataFormat();
+ hl7Df.setValidate(false);
+ dcc.getRegistry().bind("hl7", hl7Df);
+
+ SyslogDataFormat syslogDf = new SyslogDataFormat();
+ dcc.getRegistry().bind("syslog", syslogDf);
+
+ cms.start();
+ HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+ assertNotNull(hl7dfLoaded);
+ SyslogDataFormat syslogDfLoaded = dcc.getRegistry().lookupByNameAndType("syslog", SyslogDataFormat.class);
+ assertNotNull(syslogDfLoaded);
+ cms.stop();
+ }
@Test
public void testDataFormatLookUpInRegistry() throws Exception {