From a7a398ba597da35ed405fb92aee6e0ee15827ca1 Mon Sep 17 00:00:00 2001 From: Charles Date: Thu, 20 May 2021 14:28:32 -0700 Subject: [PATCH] add explanatory comments for cdc (#3496) --- .../postgres/DebeziumRecordIterator.java | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordIterator.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordIterator.java index 4b33ab655b66..2507e2faec63 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordIterator.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordIterator.java @@ -49,7 +49,7 @@ * this signal either when the publisher had not produced a new record for a long time or when it * has processed at least all of the records that were present in the database when the source was * started. Because the publisher might publish more records between the consumer sending this - * signal and the publisher acutally shutting down, the consumer must stay alive as long as the + * signal and the publisher actually shutting down, the consumer must stay alive as long as the * publisher is not closed or if there are any new records for it to process (even if the publisher * is closed). */ @@ -78,9 +78,11 @@ public DebeziumRecordIterator(LinkedBlockingQueue> q @Override protected ChangeEvent computeNext() { - // keep trying until the publisher is closed or until the queue is empty. the latter case is - // possible when the publisher has shutdown but the consumer has not yet processed all messages it - // emitted. + /* + * keep trying until the publisher is closed or until the queue is empty. the latter case is + * possible when the publisher has shutdown but the consumer has not yet processed all messages it + * emitted. + */ while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) { final ChangeEvent next; try { @@ -89,15 +91,19 @@ protected ChangeEvent computeNext() { throw new RuntimeException(e); } - // if within the timeout, the consumer could not get a record, it is time to tell the producer to - // shutdown. + // if within the allotted time the consumer could not get a record, tell the producer to shutdown. if (next == null) { requestClose(); LOGGER.info("no record found. polling again."); continue; } - // if the last record matches the target lsn, it is time to tell the producer to shutdown. + /* + * if the last record matches the target LSN, it is time to tell the producer to shutdown. note: + * that it is possible for the producer to emit more events after the shutdown is signaled. we + * guarantee we get up to a certain LSN but we don't necessarily stop exactly at it. we can go past + * it a little bit. + */ if (shouldSignalClose(next)) { requestClose(); } @@ -112,6 +118,20 @@ public void close() throws Exception { requestClose.call(); } + /** + * Determine whether the given event is at or above the LSN we are looking to stop at. The logic + * here is a little nuanced. When running in "snapshot" mode, the LSN in all of the events is the + * LSN at the time that Debezium ran the query to get the records (not the LSN of when the record + * was last updated). So we need to handle records emitted from a snapshot record specially. + * Therefore the logic is, if the LSN is below the target LSN then we should keep going (this is + * easy; same for snapshot and non-snapshot). If the LSN is greater than or equal to the target we + * check to see if the record is a snapshot record. If it is not a snapshot record we should stop. + * If it is a snapshot record (and it is not the last snapshot record) then we should keep going. If + * it is the last snapshot record, then we should stop. + * + * @param event - event with LSN to check. + * @return whether or not the event is at or above the LSN we are looking for. + */ private boolean shouldSignalClose(ChangeEvent event) { final PgLsn eventLsn = extractLsn(event); @@ -126,6 +146,12 @@ private boolean shouldSignalClose(ChangeEvent event) { private SnapshotMetadata getSnapshotMetadata(ChangeEvent event) { try { + /* + * Debezium emits EmbeddedEngineChangeEvent, but that class is not public and it is hidden behind + * the ChangeEvent iface. The EmbeddedEngineChangeEvent contains the information about whether the + * record was emitted in snapshot mode or not, which we need to determine whether to stop producing + * records or not. Thus we use reflection to access that hidden information. + */ final Method sourceRecordMethod = event.getClass().getMethod("sourceRecord"); sourceRecordMethod.setAccessible(true); final SourceRecord sourceRecord = (SourceRecord) sourceRecordMethod.invoke(event);