diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java index f91bb68c..63b4873f 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java @@ -12,6 +12,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -123,6 +125,9 @@ public interface PublisherBuilder { @ConfigProperty(name = PROP_PREFIX + "retry.rpc.timeout.multiplier", defaultValue = "2.0") Double rpcTimeoutMultiplier; + @ConfigProperty(name = PROP_PREFIX + "wait.message.delivery.timeout.ms", defaultValue = "30000") + Integer waitMessageDeliveryTimeout; + @ConfigProperty(name = PROP_PREFIX + "address") Optional address; @@ -217,49 +222,59 @@ void close() { @Override public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { + final List> deliveries = new ArrayList<>(); + for (ChangeEvent record : records) { LOGGER.trace("Received event '{}'", record); final String topicName = streamNameMapper.map(record.destination()); Publisher publisher = publishers.computeIfAbsent(topicName, (x) -> publisherBuilder.get(ProjectTopicName.of(projectId, x))); - final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder(); - - if (orderingEnabled) { - if (record.key() == null) { - pubsubMessage.setOrderingKey(nullKey); - } - else if (record.key() instanceof String) { - pubsubMessage.setOrderingKey((String) record.key()); - } - else if (record.key() instanceof byte[]) { - pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key())); - } - } - - if (record.value() instanceof String) { - pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value())); - } - else if (record.value() instanceof byte[]) { - pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value())); - } + PubsubMessage message = buildPubSubMessage(record); - pubsubMessage.putAllAttributes(convertHeaders(record)); + deliveries.add(publisher.publish(message)); - deliveries.add(publisher.publish(pubsubMessage.build())); committer.markProcessed(record); } List messageIds; try { - messageIds = ApiFutures.allAsList(deliveries).get(); + messageIds = ApiFutures.allAsList(deliveries).get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS); } - catch (ExecutionException e) { + catch (ExecutionException | TimeoutException e) { throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); committer.markBatchFinished(); } + private PubsubMessage buildPubSubMessage(ChangeEvent record) { + + final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder(); + + if (orderingEnabled) { + if (record.key() == null) { + pubsubMessage.setOrderingKey(nullKey); + } + else if (record.key() instanceof String) { + pubsubMessage.setOrderingKey((String) record.key()); + } + else if (record.key() instanceof byte[]) { + pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key())); + } + } + + if (record.value() instanceof String) { + pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value())); + } + else if (record.value() instanceof byte[]) { + pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value())); + } + + pubsubMessage.putAllAttributes(convertHeaders(record)); + + return pubsubMessage.build(); + } + @Override public boolean supportsTombstoneEvents() { return false; diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java index 8427504b..6a07a23c 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java @@ -10,6 +10,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -69,6 +71,9 @@ public interface PublisherBuilder { @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; + @ConfigProperty(name = PROP_PREFIX + "wait.message.delivery.timeout.ms", defaultValue = "30000") + Integer waitMessageDeliveryTimeout; + @Inject @CustomConsumerBuilder Instance customPublisherBuilder; @@ -124,43 +129,50 @@ public void handleBatch(List> records, RecordCommitt Publisher publisher = publishers.computeIfAbsent(topicName, (topic) -> publisherBuilder.get(topic)); - final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder(); - - if (orderingEnabled) { - if (record.key() == null) { - pubsubMessage.setOrderingKey(nullKey); - } - else if (record.key() instanceof String) { - pubsubMessage.setOrderingKey((String) record.key()); - } - else if (record.key() instanceof byte[]) { - pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key())); - } - } - - if (record.value() instanceof String) { - pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value())); - } - else if (record.value() instanceof byte[]) { - pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value())); - } - - pubsubMessage.putAllAttributes(convertHeaders(record)); + PubsubMessage message = buildPubSubMessage(record); - deliveries.add(publisher.publish(pubsubMessage.build())); + deliveries.add(publisher.publish(message)); committer.markProcessed(record); } List messageIds; try { - messageIds = ApiFutures.allAsList(deliveries).get(); + messageIds = ApiFutures.allAsList(deliveries).get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS); } - catch (ExecutionException e) { + catch (ExecutionException | TimeoutException e) { throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); committer.markBatchFinished(); } + private PubsubMessage buildPubSubMessage(ChangeEvent record) { + + final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder(); + + if (orderingEnabled) { + if (record.key() == null) { + pubsubMessage.setOrderingKey(nullKey); + } + else if (record.key() instanceof String) { + pubsubMessage.setOrderingKey((String) record.key()); + } + else if (record.key() instanceof byte[]) { + pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key())); + } + } + + if (record.value() instanceof String) { + pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value())); + } + else if (record.value() instanceof byte[]) { + pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value())); + } + + pubsubMessage.putAllAttributes(convertHeaders(record)); + + return pubsubMessage.build(); + } + @Override public boolean supportsTombstoneEvents() { return false;