Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a map Camel Properties to Kafka headers option to make the behavi… #927

Merged
merged 2 commits into from Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -43,6 +43,10 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF = "camel.remove.headers.pattern";
public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC = "The pattern of the headers we want to exclude from the exchange";

public static final Boolean CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT = true;
public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_CONF = "camel.map.properties";
public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_DOC = "If set to true, the connector will transform the exchange properties into kafka headers.";

public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0;
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries";
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler";
Expand Down
Expand Up @@ -67,7 +67,8 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
.define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC);
.define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC)
.define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC);

public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
Expand Down
Expand Up @@ -59,6 +59,7 @@ public class CamelSinkTask extends SinkTask {
private ProducerTemplate producer;
private Endpoint localEndpoint;
private LoggingLevel loggingLevel = LoggingLevel.OFF;
private boolean mapProperties;

@Override
public String version() {
Expand Down Expand Up @@ -101,6 +102,7 @@ public void start(Map<String, String> props) {
final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);

CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
Expand Down Expand Up @@ -173,7 +175,9 @@ public void put(Collection<SinkRecord> sinkRecords) {
if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
} else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {
mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
if (mapProperties) {
mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
}
}
}

Expand Down
Expand Up @@ -103,7 +103,8 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
.define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC);
.define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC)
.define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC);;

public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
Expand Down
Expand Up @@ -58,6 +58,7 @@ public class CamelSourceTask extends SourceTask {
private Long maxPollDuration;
private String camelMessageHeaderKey;
private LoggingLevel loggingLevel = LoggingLevel.OFF;
private boolean mapProperties;

@Override
public String version() {
Expand Down Expand Up @@ -101,6 +102,7 @@ public void start(Map<String, String> props) {
final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);

topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");

Expand Down Expand Up @@ -188,8 +190,11 @@ public synchronized List<SourceRecord> poll() {
if (exchange.getMessage().hasHeaders()) {
setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
}
if (exchange.hasProperties()) {
setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);

if (mapProperties) {
if (exchange.hasProperties()) {
setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
}
}

TaskHelper.logRecordContent(LOG, loggingLevel, record);
Expand Down
Expand Up @@ -37,6 +37,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -670,6 +671,64 @@ public void testOnlyBodyUsingMultipleComponentProperties() {

sinkTask.stop();
}

@Test
public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");

CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);

Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
Double myDouble = new Double("100");
int myInteger = 100;
Long myLong = new Long("100");

List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
record.headers().addBoolean(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyBoolean", true);
record.headers().addByte(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyByte", myByte);
record.headers().addFloat(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyFloat", myFloat);
record.headers().addShort(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyShort", myShort);
record.headers().addDouble(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyDouble", myDouble);
record.headers().addInt(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyInteger", myInteger);
record.headers().addLong(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyLong", myLong);
record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte);
record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat);
record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort);
record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble);
record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
records.add(record);
sinkTask.put(records);

ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertFalse(exchange.getProperties().containsKey("MyBoolean"));
assertFalse(exchange.getProperties().containsKey("MyByte"));
assertFalse(exchange.getProperties().containsKey("MyFloat"));
assertFalse(exchange.getProperties().containsKey("MyShort"));
assertFalse(exchange.getProperties().containsKey("MyDouble"));
assertFalse(exchange.getProperties().containsKey("MyInteger"));
assertFalse(exchange.getProperties().containsKey("MyLong"));
assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class));
assertEquals(myShort, exchange.getIn().getHeader("MyShort", Short.class));
assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));

sinkTask.stop();
}

@Test
public void testIfExchangeFailsShouldThrowConnectException() {
Expand Down
Expand Up @@ -331,6 +331,30 @@ public void testSourceByteArrayHeader() {
sourceTask.stop();
}
}

@Test
public void testSourceByteArrayProperty() {
CamelSourceTask sourceTask = new CamelSourceTask();
sourceTask.start(mapOf(
CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct",
CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false",
CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start"
));

sourceTask.getCms().getProducerTemplate().sendBodyAndProperty(DIRECT_URI, "test", "byteArray", new Byte[] {
1, 2
});

try {
List<SourceRecord> results = sourceTask.poll();
assertThat(results).hasSize(1);

assertEquals(0, results.get(0).headers().size());
} finally {
sourceTask.stop();
}
}

@Test
public void testSourceDateHeader() {
Expand Down