Skip to content
Permalink
Browse files
Related to #423 : caonverted source and sink to use camel-kamelets.
  • Loading branch information
valdar committed Oct 25, 2021
1 parent 4e25a3f commit a7437ad504b6d39d31157db60b31b4b2d6932385
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 121 deletions.
@@ -53,12 +53,20 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<artifactId>camel-kamelet</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-languages</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-xml-jaxb</artifactId>
</dependency>

<!-- Tools -->
<dependency>
@@ -42,6 +42,8 @@
import org.slf4j.LoggerFactory;

public class CamelSinkTask extends SinkTask {
public static final String KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSink.";

public static final String KAFKA_RECORD_KEY_HEADER = "camel.kafka.connector.record.key";
public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -119,8 +121,9 @@ public void start(Map<String, String> props) {
CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
CAMEL_SINK_PATH_PROPERTIES_PREFIX);
}
actualProps.put(KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl", remoteUrl);

cms = CamelKafkaConnectMain.builder(LOCAL_URL, remoteUrl)
cms = CamelKafkaConnectMain.builder(LOCAL_URL, "kamelet:ckcSink")
.withProperties(actualProps)
.withUnmarshallDataFormat(unmarshaller)
.withMarshallDataFormat(marshaller)
@@ -49,6 +49,7 @@


public class CamelSourceTask extends SourceTask {
public static final String KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSource.";
public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";

@@ -145,8 +146,9 @@ public Integer get() {
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
}
actualProps.put(KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "fromUrl", remoteUrl);

cms = CamelKafkaConnectMain.builder(remoteUrl, localUrl)
cms = CamelKafkaConnectMain.builder("kamelet:ckcSource", localUrl)
.withProperties(actualProps)
.withUnmarshallDataFormat(unmarshaller)
.withMarshallDataFormat(marshaller)
@@ -171,6 +173,7 @@ public Integer get() {
consumer.start();

cms.start();

LOG.info("CamelSourceTask connector task started");
} catch (Exception e) {
throw new ConnectException("Failed to create and start Camel context", e);

0 comments on commit a7437ad

Please sign in to comment.