Skip to content

Commit

Permalink
add explanatory comments for cdc (#3496)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed May 20, 2021
1 parent 4205f09 commit a7a398b
Showing 1 changed file with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* this signal either when the publisher had not produced a new record for a long time or when it
* has processed at least all of the records that were present in the database when the source was
* started. Because the publisher might publish more records between the consumer sending this
* signal and the publisher acutally shutting down, the consumer must stay alive as long as the
* signal and the publisher actually shutting down, the consumer must stay alive as long as the
* publisher is not closed or if there are any new records for it to process (even if the publisher
* is closed).
*/
Expand Down Expand Up @@ -78,9 +78,11 @@ public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> q

@Override
protected ChangeEvent<String, String> computeNext() {
// keep trying until the publisher is closed or until the queue is empty. the latter case is
// possible when the publisher has shutdown but the consumer has not yet processed all messages it
// emitted.
/*
* keep trying until the publisher is closed or until the queue is empty. the latter case is
* possible when the publisher has shutdown but the consumer has not yet processed all messages it
* emitted.
*/
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
final ChangeEvent<String, String> next;
try {
Expand All @@ -89,15 +91,19 @@ protected ChangeEvent<String, String> computeNext() {
throw new RuntimeException(e);
}

// if within the timeout, the consumer could not get a record, it is time to tell the producer to
// shutdown.
// if within the allotted time the consumer could not get a record, tell the producer to shutdown.
if (next == null) {
requestClose();
LOGGER.info("no record found. polling again.");
continue;
}

// if the last record matches the target lsn, it is time to tell the producer to shutdown.
/*
* if the last record matches the target LSN, it is time to tell the producer to shutdown. note:
* that it is possible for the producer to emit more events after the shutdown is signaled. we
* guarantee we get up to a certain LSN but we don't necessarily stop exactly at it. we can go past
* it a little bit.
*/
if (shouldSignalClose(next)) {
requestClose();
}
Expand All @@ -112,6 +118,20 @@ public void close() throws Exception {
requestClose.call();
}

/**
* Determine whether the given event is at or above the LSN we are looking to stop at. The logic
* here is a little nuanced. When running in "snapshot" mode, the LSN in all of the events is the
* LSN at the time that Debezium ran the query to get the records (not the LSN of when the record
* was last updated). So we need to handle records emitted from a snapshot record specially.
* Therefore the logic is, if the LSN is below the target LSN then we should keep going (this is
* easy; same for snapshot and non-snapshot). If the LSN is greater than or equal to the target we
* check to see if the record is a snapshot record. If it is not a snapshot record we should stop.
* If it is a snapshot record (and it is not the last snapshot record) then we should keep going. If
* it is the last snapshot record, then we should stop.
*
* @param event - event with LSN to check.
* @return whether or not the event is at or above the LSN we are looking for.
*/
private boolean shouldSignalClose(ChangeEvent<String, String> event) {
final PgLsn eventLsn = extractLsn(event);

Expand All @@ -126,6 +146,12 @@ private boolean shouldSignalClose(ChangeEvent<String, String> event) {

private SnapshotMetadata getSnapshotMetadata(ChangeEvent<String, String> event) {
try {
/*
* Debezium emits EmbeddedEngineChangeEvent, but that class is not public and it is hidden behind
* the ChangeEvent iface. The EmbeddedEngineChangeEvent contains the information about whether the
* record was emitted in snapshot mode or not, which we need to determine whether to stop producing
* records or not. Thus we use reflection to access that hidden information.
*/
final Method sourceRecordMethod = event.getClass().getMethod("sourceRecord");
sourceRecordMethod.setAccessible(true);
final SourceRecord sourceRecord = (SourceRecord) sourceRecordMethod.invoke(event);
Expand Down

0 comments on commit a7a398b

Please sign in to comment.