Skip to content
Permalink
Browse files
Related to #423 modularized kamelets and composed them to better auto…
…generate connectors from kamelets catalog
  • Loading branch information
valdar committed Oct 25, 2021
1 parent eaecd48 commit ca4e7843529d957e496d2eec82dce0ff2128aba6
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 101 deletions.
@@ -22,7 +22,6 @@
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.camel.AggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
@@ -31,11 +30,8 @@
import org.apache.camel.builder.NoErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.kafkaconnector.CamelConnectorConfig;
import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.CamelSourceTask;
import org.apache.camel.main.SimpleMain;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteTemplateDefinition;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
@@ -46,6 +42,12 @@
import org.slf4j.LoggerFactory;

public class CamelKafkaConnectMain extends SimpleMain {
public static final String KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcMarshal.";
public static final String KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcUnMarshal.";
public static final String KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcAggregator.";
public static final String KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcIdempotent.";
public static final String KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcRemoveHeader.";

private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);

protected volatile ConsumerTemplate consumerTemplate;
@@ -228,36 +230,45 @@ public CamelKafkaConnectMain build(CamelContext camelContext) {
Properties camelProperties = new Properties();
camelProperties.putAll(props);

//error handler
camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
if (errorHandler != null) {
switch (errorHandler) {
case "no":
camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
break;
case "default":
camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
break;
default:
break;
}
}

//dataformats
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
camelProperties.put(KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
camelProperties.put(KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
}

//aggregator
if (!ObjectHelper.isEmpty(aggregationSize)) {
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
}
if (!ObjectHelper.isEmpty(aggregationTimeout)) {
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
}

//idempotency
if (idempotencyEnabled) {
switch (expressionType) {
case "body":
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
break;
case "header":
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
break;
default:
break;
@@ -279,117 +290,92 @@ public CamelKafkaConnectMain build(CamelContext camelContext) {

//remove headers
if (!ObjectHelper.isEmpty(headersExcludePattern)) {
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
camelProperties.put(KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
}

// log filtered properties and set initial camel properties
List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
camelMain.setInitialProperties(camelProperties);

//error handler
camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
if (errorHandler != null) {
switch (errorHandler) {
case "no":
camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
break;
case "default":
camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
break;
default:
break;
}
}

camelMain.configure().addRoutesBuilder(new RouteBuilder() {
public void configure() {

//creating source template
RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
.templateParameter("fromUrl")
.templateParameter("errorHandler", "ckcErrorHandler")

//create marshal template
routeTemplate("ckcMarshal")
.templateParameter("marshal", "dummyDataformat")
.from("kamelet:source")
.marshal("{{marshal}}")
.to("kamelet:sink");

//create unmarshal template
routeTemplate("ckcUnMarshal")
.templateParameter("unmarshal", "dummyDataformat")
.from("kamelet:source")
.marshal("{{unmarshal}}")
.to("kamelet:sink");

//TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
//create aggregator template
routeTemplate("ckcAggregator")
//TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME to ckcAggregationStrategy?
.templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
.templateParameter("aggregationSize", "1")
.templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
.from("kamelet:source")
.aggregate(constant(true))
.aggregationStrategyRef("{{aggregationStrategy}}")
.completionSize("{{aggregationSize}}")
.completionTimeout("{{aggregationTimeout}}")
.to("kamelet:sink")
.end();

//create idempotent template
routeTemplate("ckcIdempotent")
.templateParameter("idempotentExpression", "dummyExpression")
.templateParameter("idempotentRepository", "ckcIdempotentRepository")
.templateParameter("headersExcludePattern", "(?!)");


ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
.errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}");
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}");
}

if (getContext().getRegistry().lookupByName("aggregate") != null) {
AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
rdInTemplateSource = rdInTemplateSource.aggregate(s)
.constant(true)
.completionSize("{{aggregationSize}}")
.completionTimeout("{{aggregationTimeout}}");
}
.from("kamelet:source")
.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}")
.to("kamelet:sink");

if (idempotencyEnabled) {
rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
}
//create removeHeader template
routeTemplate("ckcRemoveHeader")
.templateParameter("headersExcludePattern", "(?!)")
.from("kamelet:source")
.removeHeaders("{{headersExcludePattern}}")
.to("kamelet:sink");

rdInTemplateSource.removeHeaders("{{headersExcludePattern}}")
//creating source template
routeTemplate("ckcSource")
.templateParameter("fromUrl")
.templateParameter("errorHandler", "ckcErrorHandler")
.from("{{fromUrl}}")
.errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"))
.to("kamelet:sink");

//creating sink template
RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
routeTemplate("ckcSink")
.templateParameter("toUrl")
.templateParameter("errorHandler", "ckcErrorHandler")
.templateParameter("marshal", "dummyDataformat")
.templateParameter("unmarshal", "dummyDataformat")

//TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
.templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
.templateParameter("aggregationSize", "1")
.templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))

.templateParameter("idempotentExpression", "dummyExpression")
.templateParameter("idempotentRepository", "ckcIdempotentRepository")
.templateParameter("headersExcludePattern", "(?!)");

.from("kamelet:source")
.errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"))
.to("{{toUrl}}");

ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
.errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
//creating the actual route
ProcessorDefinition<?> rd = from(from);
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
rdInTemplateSink = rdInTemplateSink.marshal("{{marshal}}");
rd = rd.kamelet("ckcMarshal");
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
rdInTemplateSink = rdInTemplateSink.unmarshal("{{unmarshal}}");
rd = rd.kamelet("ckcUnMarshal");
}

if (getContext().getRegistry().lookupByName("aggregate") != null) {
AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
rdInTemplateSink = rdInTemplateSink.aggregate(s)
.constant(true)
.completionSize("{{aggregationSize}}")
.completionTimeout("{{aggregationTimeout}}");
rd = rd.kamelet("ckcAggregator");
}

if (idempotencyEnabled) {
rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
rd = rd.kamelet("ckcIdempotent");
}

rdInTemplateSink.removeHeaders("{{headersExcludePattern}}")
.to("{{toUrl}}");

//creating the actual route
from(from).toD(to);
rd = rd.kamelet("ckcRemoveHeader");
rd.toD(to);
}
});

@@ -16,17 +16,14 @@
*/
package org.apache.camel.kafkaconnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.camel.component.hl7.HL7DataFormat;
import org.apache.camel.component.syslog.SyslogDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -69,11 +66,7 @@ public void testDataFormatNotFound() {
props.put("camel.sink.marshal", "missingDataformat");

CamelSinkTask camelsinkTask = new CamelSinkTask();
camelsinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42);
records.add(record);
assertThrows(ConnectException.class, () -> camelsinkTask.put(records));
assertThrows(ConnectException.class, () -> camelsinkTask.start(props));
// No need to check the stop method. The error is already thrown/caught during startup.
camelsinkTask.stop();
}

0 comments on commit ca4e784

Please sign in to comment.