Skip to content

Commit

Permalink
DBZ-6895 Always dispatch heartbeats on commit and checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros authored and jpechane committed Sep 19, 2023
1 parent c1d2cf5 commit 3a72993
Showing 1 changed file with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class OpenLogReplicatorStreamingChangeEventSource implements StreamingCha
private OlrNetworkClient client;
private OraclePartition partition;
private OracleOffsetContext offsetContext;
private long transactionEvents = 0;
private boolean transactionEvents = false;
private Scn lastCheckpointScn = Scn.NULL;
private long lastCheckpointIndex;

Expand Down Expand Up @@ -130,12 +130,6 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
}
}

if (client.isConnected() && !context.isRunning()) {
// By the time the connector calls commitOffsets, the client will be disconnected.
// In this case, the last checkpoint SCN won't be flushable, so confirm it now.
confirmLastCheckpointScn();
}

client.disconnect();
LOGGER.info("Client disconnected.");
}
Expand Down Expand Up @@ -199,7 +193,7 @@ private void onBeginEvent(StreamingEvent event) {
offsetContext.setEventScn(event.getCheckpointScn());
offsetContext.setTransactionId(event.getXid());
offsetContext.setSourceTime(event.getTimestamp());
transactionEvents = 0;
transactionEvents = false;

streamingMetrics.setOffsetScn(offsetContext.getScn());
streamingMetrics.setActiveTransactions(1);
Expand Down Expand Up @@ -227,12 +221,21 @@ private void onCommitEvent(StreamingEvent event) throws InterruptedException {

// We may see empty transactions and in this case we don't want to emit a transaction boundary
// record for these cases. Only trigger commit when there are valid changes.
if (transactionEvents > 0) {
if (transactionEvents) {
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, event.getTimestamp());
}

// Commits have checkpoint scn/indices that are part of the current checkpoint block.
// It is safe to update these values just like we do for DML events.
//
// For situations where capture tables are changed in-frequently, enabling heartbeats
// will have a heartbeat emit at commit boundaries even if transaction metadata isn't
// enabled to guarantee checkpoint offset flushes.
updateCheckpoint(event);
dispatcher.alwaysDispatchHeartbeatEvent(partition, offsetContext);
}

private void onCheckpointEvent(StreamingEvent event) {
private void onCheckpointEvent(StreamingEvent event) throws InterruptedException {
offsetContext.setScn(event.getCheckpointScn());
offsetContext.setScnIndex(event.getCheckpointIndex());
offsetContext.setEventScn(event.getCheckpointScn());
Expand All @@ -241,6 +244,13 @@ private void onCheckpointEvent(StreamingEvent event) {

streamingMetrics.setOffsetScn(offsetContext.getScn());
streamingMetrics.setCommittedScn(offsetContext.getScn());

// For checkpoints, we do not emit any type of normal event, so while we do update
// the checkpoint details, these won't be flushed until the next commit flush.
// If the environment has low activity, enabling heartbeats will guarantee that
// checkpoint scn/indices are flushed.
updateCheckpoint(event);
dispatcher.alwaysDispatchHeartbeatEvent(partition, offsetContext);
}

private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutationEvent) throws Exception {
Expand Down Expand Up @@ -274,11 +284,6 @@ private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutatio
throw new DebeziumException("Unexpected DML event type: " + eventType);
}

if (transactionEvents == 0) {
// First data change that is of interest to the connector, emit the transaction start.
dispatcher.dispatchTransactionStartedEvent(partition, event.getXid(), offsetContext, event.getTimestamp());
}

// Update offsets
offsetContext.setScn(event.getCheckpointScn());
offsetContext.setScnIndex(event.getCheckpointIndex());
Expand All @@ -293,7 +298,11 @@ private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutatio

updateCheckpoint(event);

transactionEvents++;
if (!transactionEvents) {
// First data change that is of interest to the connector, emit the transaction start.
dispatcher.dispatchTransactionStartedEvent(partition, event.getXid(), offsetContext, event.getTimestamp());
transactionEvents = true;
}

final Object[] oldValues = toColumnValuesArray(table, mutationEvent.getBefore());
final Object[] newValues = toColumnValuesArray(table, mutationEvent.getAfter());
Expand Down Expand Up @@ -476,6 +485,8 @@ private void processTruncateEvent(StreamingEvent event, SchemaChangeEvent ddlEve
offsetContext.setTransactionId(event.getXid());
offsetContext.tableEvent(tableId, event.getTimestamp());

updateCheckpoint(event);

LOGGER.trace("Dispatching {} (SCN {}) for table {}", Operation.TRUNCATE, event.getScn(), tableId);
dispatcher.dispatchDataChangeEvent(
partition,
Expand Down

0 comments on commit 3a72993

Please sign in to comment.