Merge pull request #431 from codexetreme/master
adding a key transformer to the AWS2-SQS connector so that the CamelH…
oscerd committed Sep 7, 2020
2 parents 72760e0 + cc301c2 commit f53631943940ee9bec026a888076306f8a312c29
Showing 1 changed file with 38 additions and 0 deletions.
@@ -0,0 +1,38 @@
package org.apache.camel.kafkaconnector.aws2sqs.transforms;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Map;

public class SQSKeySetterTransform<R extends ConnectRecord<R>> implements Transformation<R> {

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM, "Fetch the Camel.CamelAwsSqsMessageId header and set it as the key for the kafka record");

public R apply(R record) {
Headers headers = record.headers();
String key = (String) headers.lastWithName("CamelHeader.CamelAwsSqsMessageId").value();
return record.newRecord(record.topic(), record.kafkaPartition(), null, key, Schema.STRING_SCHEMA, record.value(), record.timestamp());

public ConfigDef config() {
return CONFIG_DEF;

public void close() {


public void configure(Map<String, ?> map) {


