Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy Kafka headers to PubSub attributes #200

Merged
merged 6 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions kafka-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Connector supports the following configs:
| kafka.partition.scheme | round_robin, hash_key, hash_value, kafka_partitioner | round_robin | The scheme for assigning a message to a partition in Kafka. The scheme "round_robin" assigns partitions in a round robin fashion, while the schemes "hash_key" and "hash_value" find the partition by hashing the message key and message value respectively. "kafka_partitioner" scheme delegates partitioning logic to kafka producer, which by default detects number of partitions automatically and performs either murmur hash based partition mapping or round robin depending on whether message key is provided or not.|
| gcp.credentials.file.path | String | Optional | The file path, which stores GCP credentials.| If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
| gcp.credentials.json | String | Optional | GCP credentials JSON blob | If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
| kafka.record.headers | Boolean | false | Use Kafka record headers to store Pub/Sub message attributes |

#### Sink Connector

Expand All @@ -105,6 +106,8 @@ Connector supports the following configs:
| maxTotalTimeoutMs | Integer | 60000| The total timeout for a call to publish (including retries) to Cloud Pub/Sub. |
| gcp.credentials.file.path | String | Optional | The file path, which stores GCP credentials.| If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
| gcp.credentials.json | String | Optional | GCP credentials JSON blob | If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
| metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Cloud Pub/Sub. |
| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Cloud Pub/Sub. |

#### Schema Support and Data Model

Expand Down Expand Up @@ -137,6 +140,16 @@ The sink connector handles the conversion in the following way:
together into a ByteString object.
* In all cases, the Kafka key value is stored in the Pubsub message's
attributes as a string, currently "key".

> **IMPORTANT NOTICE:** There are three limitations to keep in mind when using Pubsub
message attributes as stated on its [documentation](https://cloud.google.com/pubsub/quotas#resource_limits)
>* *"Attributes per message: 100"*
>* *"Attribute key size: 256 bytes"*
>* *"Attribute value size: 1024 bytes"*
>
>If you enable copy of Kafka headers as Pubsub message attribute (it is disabled by default), the connector will copy
>only those headers meeting these limitations and will skip those that do not.


The source connector takes a similar approach in handling the conversion
from a Pubsub message into a SourceRecord with a relevant Schema.
Expand Down
2 changes: 1 addition & 1 deletion kafka-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>0.10.2.1</version>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class CloudPubSubSinkConnector extends SinkConnector {
public static final String CPS_MESSAGE_BODY_NAME = "messageBodyName";
public static final String DEFAULT_MESSAGE_BODY_NAME = "cps_message_body";
public static final String PUBLISH_KAFKA_METADATA = "metadata.publish";
public static final String PUBLISH_KAFKA_HEADERS = "headers.publish";
private Map<String, String> props;

@Override
Expand Down Expand Up @@ -148,6 +149,12 @@ public ConfigDef config() {
Importance.MEDIUM,
"When true, include the Kafka topic, partition, offset, and timestamp as message "
+ "attributes when a message is published to Cloud Pub/Sub.")
.define(
PUBLISH_KAFKA_HEADERS,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"When true, include any headers as attributes when a message is published to Cloud Pub/Sub.")
.define(CPS_MESSAGE_BODY_NAME,
Type.STRING,
DEFAULT_MESSAGE_BODY_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand Down Expand Up @@ -71,6 +74,7 @@ public class CloudPubSubSinkTask extends SinkTask {
private int maxTotalTimeoutMs;
private int maxShutdownTimeoutMs;
private boolean includeMetadata;
private boolean includeHeaders;
private ConnectorCredentialsProvider gcpCredentialsProvider;
private com.google.cloud.pubsub.v1.Publisher publisher;

Expand Down Expand Up @@ -117,6 +121,7 @@ public void start(Map<String, String> props) {
(Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_SHUTDOWN_TIMEOUT_MS);
messageBodyName = (String) validatedProps.get(CloudPubSubSinkConnector.CPS_MESSAGE_BODY_NAME);
includeMetadata = (Boolean) validatedProps.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_METADATA);
includeHeaders = (Boolean) validatedProps.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_HEADERS);
gcpCredentialsProvider = new ConnectorCredentialsProvider();
String credentialsPath = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
Expand All @@ -143,9 +148,9 @@ public void start(Map<String, String> props) {
@Override
public void put(Collection<SinkRecord> sinkRecords) {
log.debug("Received " + sinkRecords.size() + " messages to send to CPS.");
PubsubMessage.Builder builder = PubsubMessage.newBuilder();
for (SinkRecord record : sinkRecords) {
log.trace("Received record: " + record.toString());
PubsubMessage.Builder builder = PubsubMessage.newBuilder();
Map<String, String> attributes = new HashMap<>();
ByteString value = handleValue(record.valueSchema(), record.value(), attributes);
if (record.key() != null) {
Expand All @@ -159,11 +164,34 @@ public void put(Collection<SinkRecord> sinkRecords) {
attributes.put(ConnectorUtils.KAFKA_OFFSET_ATTRIBUTE, Long.toString(record.kafkaOffset()));
attributes.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, record.timestamp().toString());
}
if (includeHeaders) {
for (Header header : getRecordHeaders(record)) {
attributes.put(header.key(), header.value().toString());
cyanezp marked this conversation as resolved.
Show resolved Hide resolved
}
}
PubsubMessage message = builder.setData(value).putAllAttributes(attributes).build();
publishMessage(record.topic(), record.kafkaPartition(), message);
}
}

private Iterable<? extends Header> getRecordHeaders(SinkRecord record) {
ConnectHeaders headers = new ConnectHeaders();
if(record.headers() != null) {
int headerCount = 0;
for (Header header : record.headers()) {
if (header.key().getBytes().length < 257 &&
String.valueOf(header.value()).getBytes().length < 1025) {
headers.add(header);
headerCount++;
}
if (headerCount > 100) {
break;
}
}
}
return headers;
}

private ByteString handleValue(Schema schema, Object value, Map<String, String> attributes) {
if (schema == null) {
String str = value.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class CloudPubSubSourceConnector extends SourceConnector {
public static final int DEFAULT_CPS_MAX_BATCH_SIZE = 100;
public static final int DEFAULT_KAFKA_PARTITIONS = 1;
public static final String DEFAULT_KAFKA_PARTITION_SCHEME = "round_robin";
public static final String USE_KAFKA_HEADERS = "kafka.record.headers";

/** Defines the accepted values for the {@link #KAFKA_PARTITION_SCHEME_CONFIG}. */
public enum PartitionScheme {
Expand Down Expand Up @@ -224,7 +225,13 @@ public ConfigDef config() {
Type.STRING,
null,
Importance.HIGH,
"GCP JSON credentials");
"GCP JSON credentials")
.define(
USE_KAFKA_HEADERS,
Type.BOOLEAN,
false,
Importance.LOW,
"Use Kafka record headers to store Pub/Sub message attributes");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
Expand All @@ -67,12 +69,13 @@ public class CloudPubSubSourceTask extends SourceTask {
// Keeps track of the current partition to publish to if the partition scheme is round robin.
private int currentRoundRobinPartition = -1;
// Keep track of all ack ids that have not been sent correctly acked yet.
private Set<String> deliveredAckIds = Collections.synchronizedSet(new HashSet<String>());
private final Set<String> deliveredAckIds = Collections.synchronizedSet(new HashSet<String>());
private Set<String> ackIds = Collections.synchronizedSet(new HashSet<String>());
private CloudPubSubSubscriber subscriber;
private Set<String> ackIdsInFlight = Collections.synchronizedSet(new HashSet<String>());
private final Set<String> standardAttributes = new HashSet<>();
private ConnectorCredentialsProvider gcpCredentialsProvider;
private boolean useKafkaHeaders;

public CloudPubSubSourceTask() {}

Expand Down Expand Up @@ -106,6 +109,7 @@ public void start(Map<String, String> props) {
kafkaPartitionScheme =
PartitionScheme.getEnum(
(String) validatedProps.get(CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG));
useKafkaHeaders = (Boolean) validatedProps.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS);
gcpCredentialsProvider = new ConnectorCredentialsProvider();
String gcpCredentialsFilePath = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
Expand Down Expand Up @@ -155,7 +159,7 @@ public List<SourceRecord> poll() throws InterruptedException {
continue;
}
ackIds.add(ackId);
Map<String, String> messageAttributes = message.getAttributes();
Map<String, String> messageAttributes = message.getAttributesMap();
String key = messageAttributes.get(kafkaMessageKeyAttribute);
Long timestamp = getLongValue(messageAttributes.get(kafkaMessageTimestampAttribute));
if (timestamp == null){
Expand All @@ -169,40 +173,11 @@ public List<SourceRecord> poll() throws InterruptedException {
Map<String,String> ack = Collections.singletonMap(cpsSubscription, ackId);
SourceRecord record = null;
if (hasCustomAttributes) {
SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct().field(
ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
Schema.BYTES_SCHEMA);

for (Entry<String, String> attribute :
messageAttributes.entrySet()) {
if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
valueSchemaBuilder.field(attribute.getKey(),
Schema.STRING_SCHEMA);
}
if (useKafkaHeaders) {
record = createRecordWithHeaders(messageAttributes, ack, key, messageBytes, timestamp);
} else {
record = createRecordWithStruct(messageAttributes, ack, key, messageBytes, timestamp);
}

Schema valueSchema = valueSchemaBuilder.build();
cyanezp marked this conversation as resolved.
Show resolved Hide resolved
Struct value =
new Struct(valueSchema)
.put(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
messageBytes);
for (Field field : valueSchema.fields()) {
if (!field.name().equals(
ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD)) {
value.put(field.name(), messageAttributes.get(field.name()));
}
}
record =
new SourceRecord(
null,
ack,
kafkaTopic,
selectPartition(key, value),
Schema.OPTIONAL_STRING_SCHEMA,
key,
valueSchema,
value,
timestamp);
} else {
record =
new SourceRecord(
Expand All @@ -225,6 +200,66 @@ record =
}
}

private SourceRecord createRecordWithHeaders(Map<String, String> messageAttributes, Map<String,String> ack,
String key, byte[] messageBytes, Long timestamp) {
ConnectHeaders headers = new ConnectHeaders();
for (Entry<String, String> attribute :
messageAttributes.entrySet()) {
if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
headers.addString(attribute.getKey(), attribute.getValue());
}
}

return new SourceRecord(
null,
ack,
kafkaTopic,
selectPartition(key, messageBytes),
Schema.OPTIONAL_STRING_SCHEMA,
key,
Schema.BYTES_SCHEMA,
messageBytes,
timestamp,
headers);
}

private SourceRecord createRecordWithStruct(Map<String, String> messageAttributes, Map<String,String> ack,
String key, byte[] messageBytes, Long timestamp) {
SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct().field(
ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
Schema.BYTES_SCHEMA);

for (Entry<String, String> attribute :
messageAttributes.entrySet()) {
if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
valueSchemaBuilder.field(attribute.getKey(),
Schema.STRING_SCHEMA);
}
}

Schema valueSchema = valueSchemaBuilder.build();
Struct value =
new Struct(valueSchema)
.put(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
messageBytes);
for (Field field : valueSchema.fields()) {
if (!field.name().equals(
ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD)) {
value.put(field.name(), messageAttributes.get(field.name()));
}
}
return new SourceRecord(
null,
ack,
kafkaTopic,
selectPartition(key, value),
Schema.OPTIONAL_STRING_SCHEMA,
key,
valueSchema,
value,
timestamp);
}

@Override
public void commit() throws InterruptedException {
ackMessages();
Expand Down