From 390e961067296edd9299d5c011a67e621f0854be Mon Sep 17 00:00:00 2001 From: Jordan Bull Date: Wed, 16 Jun 2021 15:50:23 -0700 Subject: [PATCH] Add new parameters to sink connector controlling maxOutstandingRequestBytes with blocking flow controls --- kafka-connector/README.md | 2 + .../kafka/sink/CloudPubSubSinkConnector.java | 16 ++++++++ .../kafka/sink/CloudPubSubSinkTask.java | 38 +++++++++++++++---- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/kafka-connector/README.md b/kafka-connector/README.md index 0d2a8546..8f4a6b7c 100644 --- a/kafka-connector/README.md +++ b/kafka-connector/README.md @@ -124,6 +124,8 @@ Connector supports the following configs: | cps.endpoint | String | "pubsub.googleapis.com:443" | The [Cloud Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. | | maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Cloud Pub/Sub. | | maxBufferBytes | Long | 10000000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Cloud Pub/Sub. | +| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of outstanding bytes for incomplete and unsent messages before the publisher will block further publishing. | +| maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of incomplete and unsent messages before the publisher will block further publishing. | | maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Cloud Pub/Sub. | | maxRequestTimeoutMs | Integer | 10000 | The timeout for individual publish requests to Cloud Pub/Sub. | | maxTotalTimeoutMs | Integer | 60000| The total timeout for a call to publish (including retries) to Cloud Pub/Sub. | diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java index 00c7e443..a3456650 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java @@ -41,6 +41,8 @@ public class CloudPubSubSinkConnector extends SinkConnector { public static final String MAX_BUFFER_SIZE_CONFIG = "maxBufferSize"; public static final String MAX_BUFFER_BYTES_CONFIG = "maxBufferBytes"; + public static final String MAX_OUTSTANDING_REQUEST_BYTES = "maxOutstandingRequestBytes"; + public static final String MAX_OUTSTANDING_MESSAGES = "maxOutstandingMessages"; public static final String MAX_DELAY_THRESHOLD_MS = "delayThresholdMs"; public static final String MAX_REQUEST_TIMEOUT_MS = "maxRequestTimeoutMs"; public static final String MAX_TOTAL_TIMEOUT_MS = "maxTotalTimeoutMs"; @@ -51,6 +53,8 @@ public class CloudPubSubSinkConnector extends SinkConnector { public static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000; public static final int DEFAULT_TOTAL_TIMEOUT_MS = 60000; public static final int DEFAULT_SHUTDOWN_TIMEOUT_MS = 60000; + public static final long DEFAULT_MAX_OUTSTANDING_REQUEST_BYTES = Long.MAX_VALUE; + public static final long DEFAULT_MAX_OUTSTANDING_MESSAGES = Long.MAX_VALUE; public static final String CPS_MESSAGE_BODY_NAME = "messageBodyName"; public static final String DEFAULT_MESSAGE_BODY_NAME = "cps_message_body"; public static final String PUBLISH_KAFKA_METADATA = "metadata.publish"; @@ -163,6 +167,18 @@ public ConfigDef config() { Importance.MEDIUM, "The maximum number of bytes that can be received for the messages on a topic " + "partition before publishing the messages to Cloud Pub/Sub.") + .define(MAX_OUTSTANDING_REQUEST_BYTES, + Type.LONG, + DEFAULT_MAX_OUTSTANDING_REQUEST_BYTES, + Importance.MEDIUM, + "The maximum outstanding bytes from incomplete requests before the task blocks." + ) + .define(MAX_OUTSTANDING_MESSAGES, + Type.LONG, + DEFAULT_MAX_OUTSTANDING_MESSAGES, + Importance.MEDIUM, + "The maximum outstanding incomplete messages before the task blocks." + ) .define( MAX_DELAY_THRESHOLD_MS, Type.INT, diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index 6a60bf89..d3421241 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -22,6 +22,8 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.pubsub.v1.Publisher; import com.google.common.annotations.VisibleForTesting; @@ -51,7 +53,6 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Header; -import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -74,6 +75,8 @@ public class CloudPubSubSinkTask extends SinkTask { private String messageBodyName; private long maxBufferSize; private long maxBufferBytes; + private long maxOutstandingRequestBytes; + private long maxOutstandingMessages; private int maxDelayThresholdMs; private int maxRequestTimeoutMs; private int maxTotalTimeoutMs; @@ -84,6 +87,8 @@ public class CloudPubSubSinkTask extends SinkTask { private ConnectorCredentialsProvider gcpCredentialsProvider; private com.google.cloud.pubsub.v1.Publisher publisher; + + /** Holds a list of the publishing futures that have not been processed for a single partition. */ private class OutstandingFuturesForPartition { public List> futures = new ArrayList<>(); @@ -118,6 +123,10 @@ public void start(Map props) { cpsEndpoint = validatedProps.get(ConnectorUtils.CPS_ENDPOINT).toString(); maxBufferSize = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG); maxBufferBytes = (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG); + maxOutstandingRequestBytes = + (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_REQUEST_BYTES); + maxOutstandingMessages = + (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_MESSAGES); maxDelayThresholdMs = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_DELAY_THRESHOLD_MS); maxRequestTimeoutMs = @@ -387,15 +396,24 @@ private void addPendingMessageFuture(String topic, Integer partition, ApiFuture< private void createPublisher() { ProjectTopicName fullTopic = ProjectTopicName.of(cpsProject, cpsTopic); + + BatchingSettings.Builder batchingSettings = BatchingSettings.newBuilder() + .setDelayThreshold(Duration.ofMillis(maxDelayThresholdMs)) + .setElementCountThreshold(maxBufferSize) + .setRequestByteThreshold(maxBufferBytes); + + if (useFlowControl()) { + batchingSettings.setFlowControlSettings(FlowControlSettings.newBuilder() + .setMaxOutstandingRequestBytes(maxOutstandingRequestBytes) + .setMaxOutstandingElementCount(maxOutstandingMessages) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build()); + } + com.google.cloud.pubsub.v1.Publisher.Builder builder = com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic) .setCredentialsProvider(gcpCredentialsProvider) - .setBatchingSettings( - BatchingSettings.newBuilder() - .setDelayThreshold(Duration.ofMillis(maxDelayThresholdMs)) - .setElementCountThreshold(maxBufferSize) - .setRequestByteThreshold(maxBufferBytes) - .build()) + .setBatchingSettings(batchingSettings.build()) .setRetrySettings( RetrySettings.newBuilder() // All values that are not configurable come from the defaults for the publisher @@ -420,6 +438,12 @@ private void createPublisher() { } } + private boolean useFlowControl() { + // only enable flow control if at least one flow control config has been set + return maxOutstandingRequestBytes != CloudPubSubSinkConnector.DEFAULT_MAX_OUTSTANDING_REQUEST_BYTES + || maxOutstandingRequestBytes != CloudPubSubSinkConnector.DEFAULT_MAX_OUTSTANDING_MESSAGES; + } + @Override public void stop() { log.info("Stopping CloudPubSubSinkTask");