Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-6461 Debezium Server cannot recover from Google Pub/Sub errors #23

Merged
merged 2 commits into from May 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -12,6 +12,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
Expand Down Expand Up @@ -123,6 +125,9 @@ public interface PublisherBuilder {
@ConfigProperty(name = PROP_PREFIX + "retry.rpc.timeout.multiplier", defaultValue = "2.0")
Double rpcTimeoutMultiplier;

@ConfigProperty(name = PROP_PREFIX + "wait.message.computation.timeout.ms", defaultValue = "5000")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfvitale We are going from unlimited. It is really hard to know what is the usual time till the mesage batch is written.
Is not 5 secs value a bit too aggressive? I'd prefer to err on the safe side at least in the beginning and choose 30 or 60 secs.
Also why computation? Would not delivery be better to represent the value use?

Integer waitMessageComputationTimeout;

@ConfigProperty(name = PROP_PREFIX + "address")
Optional<String> address;

Expand Down Expand Up @@ -217,49 +222,59 @@ void close() {
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {

final List<ApiFuture<String>> deliveries = new ArrayList<>();

for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received event '{}'", record);
final String topicName = streamNameMapper.map(record.destination());
Publisher publisher = publishers.computeIfAbsent(topicName, (x) -> publisherBuilder.get(ProjectTopicName.of(projectId, x)));

final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder();

if (orderingEnabled) {
if (record.key() == null) {
pubsubMessage.setOrderingKey(nullKey);
}
else if (record.key() instanceof String) {
pubsubMessage.setOrderingKey((String) record.key());
}
else if (record.key() instanceof byte[]) {
pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key()));
}
}

if (record.value() instanceof String) {
pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value()));
}
else if (record.value() instanceof byte[]) {
pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value()));
}
PubsubMessage message = buildPubSubMessage(record);

pubsubMessage.putAllAttributes(convertHeaders(record));
deliveries.add(publisher.publish(message));

deliveries.add(publisher.publish(pubsubMessage.build()));
committer.markProcessed(record);
}
List<String> messageIds;
try {
messageIds = ApiFutures.allAsList(deliveries).get();
messageIds = ApiFutures.allAsList(deliveries).get(waitMessageComputationTimeout, TimeUnit.MILLISECONDS);
}
catch (ExecutionException e) {
catch (ExecutionException | TimeoutException e) {
throw new DebeziumException(e);
}
LOGGER.trace("Sent messages with ids: {}", messageIds);
committer.markBatchFinished();
}

private PubsubMessage buildPubSubMessage(ChangeEvent<Object, Object> record) {

final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder();

if (orderingEnabled) {
if (record.key() == null) {
pubsubMessage.setOrderingKey(nullKey);
}
else if (record.key() instanceof String) {
pubsubMessage.setOrderingKey((String) record.key());
}
else if (record.key() instanceof byte[]) {
pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key()));
}
}

if (record.value() instanceof String) {
pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value()));
}
else if (record.value() instanceof byte[]) {
pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value()));
}

pubsubMessage.putAllAttributes(convertHeaders(record));

return pubsubMessage.build();
}

@Override
public boolean supportsTombstoneEvents() {
return false;
Expand Down
Expand Up @@ -10,6 +10,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
Expand Down Expand Up @@ -69,6 +71,9 @@ public interface PublisherBuilder {
@ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default")
String nullKey;

@ConfigProperty(name = PROP_PREFIX + "wait.message.computation.timeout.ms", defaultValue = "5000")
Integer waitMessageComputationTimeout;

@Inject
@CustomConsumerBuilder
Instance<PublisherBuilder> customPublisherBuilder;
Expand Down Expand Up @@ -124,43 +129,50 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt

Publisher publisher = publishers.computeIfAbsent(topicName, (topic) -> publisherBuilder.get(topic));

final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder();

if (orderingEnabled) {
if (record.key() == null) {
pubsubMessage.setOrderingKey(nullKey);
}
else if (record.key() instanceof String) {
pubsubMessage.setOrderingKey((String) record.key());
}
else if (record.key() instanceof byte[]) {
pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key()));
}
}

if (record.value() instanceof String) {
pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value()));
}
else if (record.value() instanceof byte[]) {
pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value()));
}

pubsubMessage.putAllAttributes(convertHeaders(record));
PubsubMessage message = buildPubSubMessage(record);

deliveries.add(publisher.publish(pubsubMessage.build()));
deliveries.add(publisher.publish(message));
committer.markProcessed(record);
}
List<String> messageIds;
try {
messageIds = ApiFutures.allAsList(deliveries).get();
messageIds = ApiFutures.allAsList(deliveries).get(waitMessageComputationTimeout, TimeUnit.MILLISECONDS);
}
catch (ExecutionException e) {
catch (ExecutionException | TimeoutException e) {
throw new DebeziumException(e);
}
LOGGER.trace("Sent messages with ids: {}", messageIds);
committer.markBatchFinished();
}

private PubsubMessage buildPubSubMessage(ChangeEvent<Object, Object> record) {

final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder();

if (orderingEnabled) {
if (record.key() == null) {
pubsubMessage.setOrderingKey(nullKey);
}
else if (record.key() instanceof String) {
pubsubMessage.setOrderingKey((String) record.key());
}
else if (record.key() instanceof byte[]) {
pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key()));
}
}

if (record.value() instanceof String) {
pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value()));
}
else if (record.value() instanceof byte[]) {
pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value()));
}

pubsubMessage.putAllAttributes(convertHeaders(record));

return pubsubMessage.build();
}

@Override
public boolean supportsTombstoneEvents() {
return false;
Expand Down