From 6bac0c31c299355a3e7817eb185af6a1e77131b6 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 15 May 2023 14:13:24 +0200 Subject: [PATCH 1/2] DBZ-6461 Add properties `debezium.sink.pubsub.wait.message.computation.timeout.ms` and `debezium.sink.pubsublite.wait.message.computation.timeout.ms` to configure the wait time on message processing by Google PubSub --- .../server/pubsub/PubSubChangeConsumer.java | 63 ++++++++++++------- .../pubsub/PubSubLiteChangeConsumer.java | 62 ++++++++++-------- 2 files changed, 76 insertions(+), 49 deletions(-) diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java index f91bb68c..64e36c32 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java @@ -12,6 +12,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -123,6 +125,9 @@ public interface PublisherBuilder { @ConfigProperty(name = PROP_PREFIX + "retry.rpc.timeout.multiplier", defaultValue = "2.0") Double rpcTimeoutMultiplier; + @ConfigProperty(name = PROP_PREFIX + "wait.message.computation.timeout.ms", defaultValue = "5000") + Integer waitMessageComputationTimeout; + @ConfigProperty(name = PROP_PREFIX + "address") Optional address; @@ -217,49 +222,59 @@ void close() { @Override public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { + final List> deliveries = new ArrayList<>(); + for (ChangeEvent record : records) { LOGGER.trace("Received event '{}'", record); final String topicName = streamNameMapper.map(record.destination()); Publisher publisher = publishers.computeIfAbsent(topicName, (x) -> publisherBuilder.get(ProjectTopicName.of(projectId, x))); - final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder(); - - if (orderingEnabled) { - if (record.key() == null) { - pubsubMessage.setOrderingKey(nullKey); - } - else if (record.key() instanceof String) { - pubsubMessage.setOrderingKey((String) record.key()); - } - else if (record.key() instanceof byte[]) { - pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key())); - } - } - - if (record.value() instanceof String) { - pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value())); - } - else if (record.value() instanceof byte[]) { - pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value())); - } + PubsubMessage message = buildPubSubMessage(record); - pubsubMessage.putAllAttributes(convertHeaders(record)); + deliveries.add(publisher.publish(message)); - deliveries.add(publisher.publish(pubsubMessage.build())); committer.markProcessed(record); } List messageIds; try { - messageIds = ApiFutures.allAsList(deliveries).get(); + messageIds = ApiFutures.allAsList(deliveries).get(waitMessageComputationTimeout, TimeUnit.MILLISECONDS); } - catch (ExecutionException e) { + catch (ExecutionException | TimeoutException e) { throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); committer.markBatchFinished(); } + private PubsubMessage buildPubSubMessage(ChangeEvent record) { + + final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder(); + + if (orderingEnabled) { + if (record.key() == null) { + pubsubMessage.setOrderingKey(nullKey); + } + else if (record.key() instanceof String) { + pubsubMessage.setOrderingKey((String) record.key()); + } + else if (record.key() instanceof byte[]) { + pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key())); + } + } + + if (record.value() instanceof String) { + pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value())); + } + else if (record.value() instanceof byte[]) { + pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value())); + } + + pubsubMessage.putAllAttributes(convertHeaders(record)); + + return pubsubMessage.build(); + } + @Override public boolean supportsTombstoneEvents() { return false; diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java index 8427504b..0ca5d3ad 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java @@ -10,6 +10,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -69,6 +71,9 @@ public interface PublisherBuilder { @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; + @ConfigProperty(name = PROP_PREFIX + "wait.message.computation.timeout.ms", defaultValue = "5000") + Integer waitMessageComputationTimeout; + @Inject @CustomConsumerBuilder Instance customPublisherBuilder; @@ -124,43 +129,50 @@ public void handleBatch(List> records, RecordCommitt Publisher publisher = publishers.computeIfAbsent(topicName, (topic) -> publisherBuilder.get(topic)); - final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder(); - - if (orderingEnabled) { - if (record.key() == null) { - pubsubMessage.setOrderingKey(nullKey); - } - else if (record.key() instanceof String) { - pubsubMessage.setOrderingKey((String) record.key()); - } - else if (record.key() instanceof byte[]) { - pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key())); - } - } - - if (record.value() instanceof String) { - pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value())); - } - else if (record.value() instanceof byte[]) { - pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value())); - } - - pubsubMessage.putAllAttributes(convertHeaders(record)); + PubsubMessage message = buildPubSubMessage(record); - deliveries.add(publisher.publish(pubsubMessage.build())); + deliveries.add(publisher.publish(message)); committer.markProcessed(record); } List messageIds; try { - messageIds = ApiFutures.allAsList(deliveries).get(); + messageIds = ApiFutures.allAsList(deliveries).get(waitMessageComputationTimeout, TimeUnit.MILLISECONDS); } - catch (ExecutionException e) { + catch (ExecutionException | TimeoutException e) { throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); committer.markBatchFinished(); } + private PubsubMessage buildPubSubMessage(ChangeEvent record) { + + final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder(); + + if (orderingEnabled) { + if (record.key() == null) { + pubsubMessage.setOrderingKey(nullKey); + } + else if (record.key() instanceof String) { + pubsubMessage.setOrderingKey((String) record.key()); + } + else if (record.key() instanceof byte[]) { + pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key())); + } + } + + if (record.value() instanceof String) { + pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value())); + } + else if (record.value() instanceof byte[]) { + pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value())); + } + + pubsubMessage.putAllAttributes(convertHeaders(record)); + + return pubsubMessage.build(); + } + @Override public boolean supportsTombstoneEvents() { return false; From 4641ff0561c533e49bba36ed121c4ed886b45eeb Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 16 May 2023 12:38:26 +0200 Subject: [PATCH 2/2] DBZ-6461 Rename to delivery timeout; Increase default value --- .../io/debezium/server/pubsub/PubSubChangeConsumer.java | 6 +++--- .../io/debezium/server/pubsub/PubSubLiteChangeConsumer.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java index 64e36c32..63b4873f 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java @@ -125,8 +125,8 @@ public interface PublisherBuilder { @ConfigProperty(name = PROP_PREFIX + "retry.rpc.timeout.multiplier", defaultValue = "2.0") Double rpcTimeoutMultiplier; - @ConfigProperty(name = PROP_PREFIX + "wait.message.computation.timeout.ms", defaultValue = "5000") - Integer waitMessageComputationTimeout; + @ConfigProperty(name = PROP_PREFIX + "wait.message.delivery.timeout.ms", defaultValue = "30000") + Integer waitMessageDeliveryTimeout; @ConfigProperty(name = PROP_PREFIX + "address") Optional address; @@ -238,7 +238,7 @@ public void handleBatch(List> records, RecordCommitt } List messageIds; try { - messageIds = ApiFutures.allAsList(deliveries).get(waitMessageComputationTimeout, TimeUnit.MILLISECONDS); + messageIds = ApiFutures.allAsList(deliveries).get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS); } catch (ExecutionException | TimeoutException e) { throw new DebeziumException(e); diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java index 0ca5d3ad..6a07a23c 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java @@ -71,8 +71,8 @@ public interface PublisherBuilder { @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; - @ConfigProperty(name = PROP_PREFIX + "wait.message.computation.timeout.ms", defaultValue = "5000") - Integer waitMessageComputationTimeout; + @ConfigProperty(name = PROP_PREFIX + "wait.message.delivery.timeout.ms", defaultValue = "30000") + Integer waitMessageDeliveryTimeout; @Inject @CustomConsumerBuilder @@ -136,7 +136,7 @@ public void handleBatch(List> records, RecordCommitt } List messageIds; try { - messageIds = ApiFutures.allAsList(deliveries).get(waitMessageComputationTimeout, TimeUnit.MILLISECONDS); + messageIds = ApiFutures.allAsList(deliveries).get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS); } catch (ExecutionException | TimeoutException e) { throw new DebeziumException(e);