diff --git a/kafka-connector/README.md b/kafka-connector/README.md
index 4a3abaa3..f3aa5ac5 100644
--- a/kafka-connector/README.md
+++ b/kafka-connector/README.md
@@ -91,6 +91,7 @@ Connector supports the following configs:
| kafka.partition.scheme | round_robin, hash_key, hash_value, kafka_partitioner | round_robin | The scheme for assigning a message to a partition in Kafka. The scheme "round_robin" assigns partitions in a round robin fashion, while the schemes "hash_key" and "hash_value" find the partition by hashing the message key and message value respectively. "kafka_partitioner" scheme delegates partitioning logic to kafka producer, which by default detects number of partitions automatically and performs either murmur hash based partition mapping or round robin depending on whether message key is provided or not.|
| gcp.credentials.file.path | String | Optional | The file path, which stores GCP credentials.| If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
| gcp.credentials.json | String | Optional | GCP credentials JSON blob | If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
+| kafka.record.headers | Boolean | false | Use Kafka record headers to store Pub/Sub message attributes |
#### Sink Connector
@@ -105,6 +106,8 @@ Connector supports the following configs:
| maxTotalTimeoutMs | Integer | 60000| The total timeout for a call to publish (including retries) to Cloud Pub/Sub. |
| gcp.credentials.file.path | String | Optional | The file path, which stores GCP credentials.| If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
| gcp.credentials.json | String | Optional | GCP credentials JSON blob | If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
+| metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Cloud Pub/Sub. |
+| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Cloud Pub/Sub. |
#### Schema Support and Data Model
@@ -137,6 +140,16 @@ The sink connector handles the conversion in the following way:
together into a ByteString object.
* In all cases, the Kafka key value is stored in the Pubsub message's
attributes as a string, currently "key".
+
+> **IMPORTANT NOTICE:** There are three limitations to keep in mind when using Pubsub
+message attributes as stated on its [documentation](https://cloud.google.com/pubsub/quotas#resource_limits)
+>* *"Attributes per message: 100"*
+>* *"Attribute key size: 256 bytes"*
+>* *"Attribute value size: 1024 bytes"*
+>
+>If you enable copy of Kafka headers as Pubsub message attribute (it is disabled by default), the connector will copy
+>only those headers meeting these limitations and will skip those that do not.
+
The source connector takes a similar approach in handling the conversion
from a Pubsub message into a SourceRecord with a relevant Schema.
diff --git a/kafka-connector/pom.xml b/kafka-connector/pom.xml
index f7641a83..57fb7840 100644
--- a/kafka-connector/pom.xml
+++ b/kafka-connector/pom.xml
@@ -59,7 +59,7 @@
org.apache.kafka
connect-api
- 0.10.2.1
+ 1.1.0
provided
diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java
index f70a6e38..0ffbc5cd 100644
--- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java
+++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java
@@ -52,6 +52,7 @@ public class CloudPubSubSinkConnector extends SinkConnector {
public static final String CPS_MESSAGE_BODY_NAME = "messageBodyName";
public static final String DEFAULT_MESSAGE_BODY_NAME = "cps_message_body";
public static final String PUBLISH_KAFKA_METADATA = "metadata.publish";
+ public static final String PUBLISH_KAFKA_HEADERS = "headers.publish";
private Map props;
@Override
@@ -148,6 +149,12 @@ public ConfigDef config() {
Importance.MEDIUM,
"When true, include the Kafka topic, partition, offset, and timestamp as message "
+ "attributes when a message is published to Cloud Pub/Sub.")
+ .define(
+ PUBLISH_KAFKA_HEADERS,
+ Type.BOOLEAN,
+ false,
+ Importance.MEDIUM,
+ "When true, include any headers as attributes when a message is published to Cloud Pub/Sub.")
.define(CPS_MESSAGE_BODY_NAME,
Type.STRING,
DEFAULT_MESSAGE_BODY_NAME,
diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java
index a7464dc1..e242a1fd 100644
--- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java
+++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java
@@ -44,6 +44,9 @@
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
@@ -71,6 +74,7 @@ public class CloudPubSubSinkTask extends SinkTask {
private int maxTotalTimeoutMs;
private int maxShutdownTimeoutMs;
private boolean includeMetadata;
+ private boolean includeHeaders;
private ConnectorCredentialsProvider gcpCredentialsProvider;
private com.google.cloud.pubsub.v1.Publisher publisher;
@@ -117,6 +121,7 @@ public void start(Map props) {
(Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_SHUTDOWN_TIMEOUT_MS);
messageBodyName = (String) validatedProps.get(CloudPubSubSinkConnector.CPS_MESSAGE_BODY_NAME);
includeMetadata = (Boolean) validatedProps.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_METADATA);
+ includeHeaders = (Boolean) validatedProps.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_HEADERS);
gcpCredentialsProvider = new ConnectorCredentialsProvider();
String credentialsPath = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
@@ -143,9 +148,9 @@ public void start(Map props) {
@Override
public void put(Collection sinkRecords) {
log.debug("Received " + sinkRecords.size() + " messages to send to CPS.");
- PubsubMessage.Builder builder = PubsubMessage.newBuilder();
for (SinkRecord record : sinkRecords) {
log.trace("Received record: " + record.toString());
+ PubsubMessage.Builder builder = PubsubMessage.newBuilder();
Map attributes = new HashMap<>();
ByteString value = handleValue(record.valueSchema(), record.value(), attributes);
if (record.key() != null) {
@@ -159,11 +164,34 @@ public void put(Collection sinkRecords) {
attributes.put(ConnectorUtils.KAFKA_OFFSET_ATTRIBUTE, Long.toString(record.kafkaOffset()));
attributes.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, record.timestamp().toString());
}
+ if (includeHeaders) {
+ for (Header header : getRecordHeaders(record)) {
+ attributes.put(header.key(), header.value().toString());
+ }
+ }
PubsubMessage message = builder.setData(value).putAllAttributes(attributes).build();
publishMessage(record.topic(), record.kafkaPartition(), message);
}
}
+ private Iterable extends Header> getRecordHeaders(SinkRecord record) {
+ ConnectHeaders headers = new ConnectHeaders();
+ if(record.headers() != null) {
+ int headerCount = 0;
+ for (Header header : record.headers()) {
+ if (header.key().getBytes().length < 257 &&
+ String.valueOf(header.value()).getBytes().length < 1025) {
+ headers.add(header);
+ headerCount++;
+ }
+ if (headerCount > 100) {
+ break;
+ }
+ }
+ }
+ return headers;
+ }
+
private ByteString handleValue(Schema schema, Object value, Map attributes) {
if (schema == null) {
String str = value.toString();
diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java
index d2f68d34..3c1e7d78 100644
--- a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java
+++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java
@@ -57,6 +57,7 @@ public class CloudPubSubSourceConnector extends SourceConnector {
public static final int DEFAULT_CPS_MAX_BATCH_SIZE = 100;
public static final int DEFAULT_KAFKA_PARTITIONS = 1;
public static final String DEFAULT_KAFKA_PARTITION_SCHEME = "round_robin";
+ public static final String USE_KAFKA_HEADERS = "kafka.record.headers";
/** Defines the accepted values for the {@link #KAFKA_PARTITION_SCHEME_CONFIG}. */
public enum PartitionScheme {
@@ -224,7 +225,13 @@ public ConfigDef config() {
Type.STRING,
null,
Importance.HIGH,
- "GCP JSON credentials");
+ "GCP JSON credentials")
+ .define(
+ USE_KAFKA_HEADERS,
+ Type.BOOLEAN,
+ false,
+ Importance.LOW,
+ "Use Kafka record headers to store Pub/Sub message attributes");
}
/**
diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java
index 08124f2a..6ed8eb20 100644
--- a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java
+++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java
@@ -42,6 +42,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
@@ -67,12 +69,13 @@ public class CloudPubSubSourceTask extends SourceTask {
// Keeps track of the current partition to publish to if the partition scheme is round robin.
private int currentRoundRobinPartition = -1;
// Keep track of all ack ids that have not been sent correctly acked yet.
- private Set deliveredAckIds = Collections.synchronizedSet(new HashSet());
+ private final Set deliveredAckIds = Collections.synchronizedSet(new HashSet());
private Set ackIds = Collections.synchronizedSet(new HashSet());
private CloudPubSubSubscriber subscriber;
private Set ackIdsInFlight = Collections.synchronizedSet(new HashSet());
private final Set standardAttributes = new HashSet<>();
private ConnectorCredentialsProvider gcpCredentialsProvider;
+ private boolean useKafkaHeaders;
public CloudPubSubSourceTask() {}
@@ -106,6 +109,7 @@ public void start(Map props) {
kafkaPartitionScheme =
PartitionScheme.getEnum(
(String) validatedProps.get(CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG));
+ useKafkaHeaders = (Boolean) validatedProps.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS);
gcpCredentialsProvider = new ConnectorCredentialsProvider();
String gcpCredentialsFilePath = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
@@ -155,7 +159,7 @@ public List poll() throws InterruptedException {
continue;
}
ackIds.add(ackId);
- Map messageAttributes = message.getAttributes();
+ Map messageAttributes = message.getAttributesMap();
String key = messageAttributes.get(kafkaMessageKeyAttribute);
Long timestamp = getLongValue(messageAttributes.get(kafkaMessageTimestampAttribute));
if (timestamp == null){
@@ -169,40 +173,11 @@ public List poll() throws InterruptedException {
Map ack = Collections.singletonMap(cpsSubscription, ackId);
SourceRecord record = null;
if (hasCustomAttributes) {
- SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct().field(
- ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
- Schema.BYTES_SCHEMA);
-
- for (Entry attribute :
- messageAttributes.entrySet()) {
- if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
- valueSchemaBuilder.field(attribute.getKey(),
- Schema.STRING_SCHEMA);
- }
+ if (useKafkaHeaders) {
+ record = createRecordWithHeaders(messageAttributes, ack, key, messageBytes, timestamp);
+ } else {
+ record = createRecordWithStruct(messageAttributes, ack, key, messageBytes, timestamp);
}
-
- Schema valueSchema = valueSchemaBuilder.build();
- Struct value =
- new Struct(valueSchema)
- .put(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
- messageBytes);
- for (Field field : valueSchema.fields()) {
- if (!field.name().equals(
- ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD)) {
- value.put(field.name(), messageAttributes.get(field.name()));
- }
- }
- record =
- new SourceRecord(
- null,
- ack,
- kafkaTopic,
- selectPartition(key, value),
- Schema.OPTIONAL_STRING_SCHEMA,
- key,
- valueSchema,
- value,
- timestamp);
} else {
record =
new SourceRecord(
@@ -225,6 +200,66 @@ record =
}
}
+ private SourceRecord createRecordWithHeaders(Map messageAttributes, Map ack,
+ String key, byte[] messageBytes, Long timestamp) {
+ ConnectHeaders headers = new ConnectHeaders();
+ for (Entry attribute :
+ messageAttributes.entrySet()) {
+ if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
+ headers.addString(attribute.getKey(), attribute.getValue());
+ }
+ }
+
+ return new SourceRecord(
+ null,
+ ack,
+ kafkaTopic,
+ selectPartition(key, messageBytes),
+ Schema.OPTIONAL_STRING_SCHEMA,
+ key,
+ Schema.BYTES_SCHEMA,
+ messageBytes,
+ timestamp,
+ headers);
+ }
+
+ private SourceRecord createRecordWithStruct(Map messageAttributes, Map ack,
+ String key, byte[] messageBytes, Long timestamp) {
+ SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct().field(
+ ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
+ Schema.BYTES_SCHEMA);
+
+ for (Entry attribute :
+ messageAttributes.entrySet()) {
+ if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
+ valueSchemaBuilder.field(attribute.getKey(),
+ Schema.STRING_SCHEMA);
+ }
+ }
+
+ Schema valueSchema = valueSchemaBuilder.build();
+ Struct value =
+ new Struct(valueSchema)
+ .put(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
+ messageBytes);
+ for (Field field : valueSchema.fields()) {
+ if (!field.name().equals(
+ ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD)) {
+ value.put(field.name(), messageAttributes.get(field.name()));
+ }
+ }
+ return new SourceRecord(
+ null,
+ ack,
+ kafkaTopic,
+ selectPartition(key, value),
+ Schema.OPTIONAL_STRING_SCHEMA,
+ key,
+ valueSchema,
+ value,
+ timestamp);
+ }
+
@Override
public void commit() throws InterruptedException {
ackMessages();
diff --git a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java
index 35f370ff..1400c4a2 100644
--- a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java
+++ b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java
@@ -19,7 +19,6 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -27,8 +26,6 @@
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.pubsub.kafka.common.ConnectorUtils;
import com.google.pubsub.v1.PubsubMessage;
@@ -385,6 +382,133 @@ public void testKafkaMetadata() {
assertEquals(requestArgs, expectedMessages);
}
+ /**
+ * Tests that when requested, Kafka headers are included in the messages published to Cloud
+ * Pub/Sub.
+ */
+ @Test
+ public void testKafkaHeaders() {
+ props.put(CloudPubSubSinkConnector.PUBLISH_KAFKA_HEADERS, "true");
+ task.start(props);
+ List records = new ArrayList();
+ SinkRecord record = new SinkRecord(
+ KAFKA_TOPIC,
+ 4,
+ STRING_SCHEMA,
+ KAFKA_MESSAGE_KEY,
+ BYTE_STRING_SCHEMA,
+ KAFKA_MESSAGE1,
+ 1000,
+ 50000L,
+ TimestampType.CREATE_TIME);
+ record.headers().addString("myHeader", "myValue");
+ records.add(record);
+ record = new SinkRecord(
+ KAFKA_TOPIC,
+ 4,
+ STRING_SCHEMA,
+ KAFKA_MESSAGE_KEY,
+ BYTE_STRING_SCHEMA,
+ KAFKA_MESSAGE2,
+ 1001,
+ 50001L,
+ TimestampType.CREATE_TIME);
+ record.headers().addString("yourHeader", "yourValue");
+ records.add(record);
+ task.put(records);
+ ArgumentCaptor captor = ArgumentCaptor.forClass(PubsubMessage.class);
+ verify(publisher, times(2)).publish(captor.capture());
+ List requestArgs = captor.getAllValues();
+
+
+ List expectedMessages = new ArrayList<>();
+ Map attributes1 = new HashMap<>();
+ attributes1.put(ConnectorUtils.CPS_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY);
+ attributes1.put("myHeader", "myValue");
+ expectedMessages.add(
+ PubsubMessage.newBuilder().putAllAttributes(attributes1).setData(KAFKA_MESSAGE1).build());
+ Map attributes2 = new HashMap<>();
+ attributes2.put(ConnectorUtils.CPS_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY);
+ attributes2.put("yourHeader", "yourValue");
+ expectedMessages.add(
+ PubsubMessage.newBuilder().putAllAttributes(attributes2).setData(KAFKA_MESSAGE2).build());
+
+ assertEquals(expectedMessages, requestArgs);
+ }
+
+ /**
+ * Tests that when requested, Kafka headers are included in the messages published to Cloud
+ * Pub/Sub but if some of these headers are unsupported either if its key has more than 256 bytes long
+ * or its value has more than 1024 bytes it will be discarded.
+ */
+ @Test
+ public void testUnsupportedKafkaHeaders() {
+ props.put(CloudPubSubSinkConnector.PUBLISH_KAFKA_HEADERS, "true");
+ task.start(props);
+ String veryLongHeaderName;
+ String veryLongValue;
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < 257; i++) {
+ stringBuilder.append("-");
+ }
+ veryLongHeaderName = stringBuilder.toString();
+ stringBuilder.setLength(0);
+ for (int i = 0; i < 1025; i++) {
+ stringBuilder.append(".");
+ }
+ veryLongValue = stringBuilder.toString();
+ stringBuilder.setLength(0);
+ List records = new ArrayList();
+ SinkRecord record = new SinkRecord(
+ KAFKA_TOPIC,
+ 4,
+ STRING_SCHEMA,
+ KAFKA_MESSAGE_KEY,
+ BYTE_STRING_SCHEMA,
+ KAFKA_MESSAGE1,
+ 1000,
+ 50000L,
+ TimestampType.CREATE_TIME);
+ record.headers().addString("myHeader", "myValue");
+ record.headers().addString(veryLongHeaderName, "anotherValue");
+ record.headers().addString("anotherHeader", veryLongValue);
+ record.headers().addString(veryLongHeaderName, veryLongValue);
+ records.add(record);
+ record = new SinkRecord(
+ KAFKA_TOPIC,
+ 4,
+ STRING_SCHEMA,
+ KAFKA_MESSAGE_KEY,
+ BYTE_STRING_SCHEMA,
+ KAFKA_MESSAGE2,
+ 1001,
+ 50001L,
+ TimestampType.CREATE_TIME);
+ record.headers().addString("yourHeader", "yourValue");
+ records.add(record);
+ task.put(records);
+ ArgumentCaptor captor = ArgumentCaptor.forClass(PubsubMessage.class);
+ verify(publisher, times(2)).publish(captor.capture());
+ List requestArgs = captor.getAllValues();
+
+
+ List expectedMessages = new ArrayList<>();
+ Map attributes1 = new HashMap<>();
+ attributes1.put(ConnectorUtils.CPS_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY);
+ attributes1.put("myHeader", "myValue");
+ expectedMessages.add(
+ PubsubMessage.newBuilder().putAllAttributes(attributes1).setData(KAFKA_MESSAGE1).build());
+ Map attributes2 = new HashMap<>();
+ attributes2.put(ConnectorUtils.CPS_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY);
+ attributes2.put("yourHeader", "yourValue");
+ expectedMessages.add(
+ PubsubMessage.newBuilder().putAllAttributes(attributes2).setData(KAFKA_MESSAGE2).build());
+
+ assertEquals(257, veryLongHeaderName.getBytes().length);
+ assertEquals(1025, veryLongValue.getBytes().length);
+ assertEquals(expectedMessages, requestArgs);
+ }
+
/**
* Tests that if a Future that is being processed in flush() failed with an exception and then a
* second Future is processed successfully in a subsequent flush, then the subsequent flush
diff --git a/kafka-connector/src/test/java/com/google/pubsub/kafka/source/CloudPubSubSourceTaskTest.java b/kafka-connector/src/test/java/com/google/pubsub/kafka/source/CloudPubSubSourceTaskTest.java
index 7b8ecd88..3d580bd2 100644
--- a/kafka-connector/src/test/java/com/google/pubsub/kafka/source/CloudPubSubSourceTaskTest.java
+++ b/kafka-connector/src/test/java/com/google/pubsub/kafka/source/CloudPubSubSourceTaskTest.java
@@ -43,6 +43,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Before;
import org.junit.Test;
@@ -257,7 +258,45 @@ public void testPollWithMessageTimestampAttribute() throws Exception{
/**
* Tests when the message retrieved from Cloud Pub/Sub have several attributes, including
- * one that matches {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
+ * one that matches {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE} and uses Kafka Record Headers to store them
+ */
+ @Test
+ public void testPollWithMultipleAttributesAndRecordHeaders() throws Exception {
+ props.put(CloudPubSubSourceConnector.USE_KAFKA_HEADERS, "true");
+ task.start(props);
+ Map attributes = new HashMap<>();
+ attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
+ attributes.put("attribute1", "attribute_value1");
+ attributes.put("attribute2", "attribute_value2");
+ ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
+ PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
+ when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
+ List result = task.poll();
+ verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
+ assertEquals(1, result.size());
+
+ ConnectHeaders headers = new ConnectHeaders();
+ headers.addString("attribute1", "attribute_value1");
+ headers.addString("attribute2", "attribute_value2");
+
+ SourceRecord expected =
+ new SourceRecord(
+ null,
+ null,
+ KAFKA_TOPIC,
+ 0,
+ Schema.OPTIONAL_STRING_SCHEMA,
+ KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
+ Schema.BYTES_SCHEMA,
+ KAFKA_VALUE,
+ Long.parseLong(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE),
+ headers);
+ assertRecordsEqual(expected, result.get(0));
+ }
+
+ /**
+ * Tests when the message retrieved from Cloud Pub/Sub have several attributes, including
+ * one that matches {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE}
*/
@Test
public void testPollWithMultipleAttributes() throws Exception {