Skip to content

Commit

Permalink
throw exception if we close engine before snapshot is complete + incr…
Browse files Browse the repository at this point in the history
…ease timeout for subsequent records (#4730)

* throw exception if we close engine before snapshot is complete + increase timeout for subsequent records

* add comment + bump postgres version to use new changes
  • Loading branch information
subodh1810 committed Jul 13, 2021
1 parent 7decb12 commit 9b1aa3c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.3.6",
"dockerImageTag": "0.3.7",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres",
"icon": "postgresql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.6
dockerImageTag: 0.3.7
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
icon: postgresql.svg
- sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -54,13 +55,14 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);

private static final WaitTime FIRST_RECORD_WAIT_TIME_MINUTES = new WaitTime(5, TimeUnit.MINUTES);
private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(5, TimeUnit.SECONDS);
private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(1, TimeUnit.MINUTES);

private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
private final CdcTargetPosition targetPosition;
private final Supplier<Boolean> publisherStatusSupplier;
private final VoidCallable requestClose;
private boolean receivedFirstRecord;
private boolean hasSnapshotFinished;

public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> queue,
CdcTargetPosition targetPosition,
Expand All @@ -71,6 +73,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> q
this.publisherStatusSupplier = publisherStatusSupplier;
this.requestClose = requestClose;
this.receivedFirstRecord = false;
this.hasSnapshotFinished = true;
}

@Override
Expand All @@ -90,13 +93,17 @@ protected ChangeEvent<String, String> computeNext() {
// if within the timeout, the consumer could not get a record, it is time to tell the producer to
// shutdown.
if (next == null) {
LOGGER.info("Closing cause next is returned as null");
requestClose();
LOGGER.info("no record found. polling again.");
continue;
}

JsonNode eventAsJson = Jsons.deserialize(next.value());
hasSnapshotFinished = hasSnapshotFinished(eventAsJson);

// if the last record matches the target file position, it is time to tell the producer to shutdown.
if (shouldSignalClose(next)) {
if (shouldSignalClose(eventAsJson)) {
requestClose();
}
receivedFirstRecord = true;
Expand All @@ -105,14 +112,35 @@ protected ChangeEvent<String, String> computeNext() {
return endOfData();
}

private boolean hasSnapshotFinished(JsonNode eventAsJson) {
SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase());
return SnapshotMetadata.TRUE != snapshot;
}

/**
* Debezium was built as an ever running process which keeps on listening for new changes on DB and
* immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order
* to determine when to stop debezium engine we rely on few factors 1. TargetPosition logic. At the
* beginning of the sync we define a target position in the logs of the DB. This can be an LSN or
* anything specific to the DB which can help us identify that we have reached a specific position
* in the log based replication When we start processing records from debezium, we extract the the
* log position from the metadata of the record and compare it with our target that we defined at
* the beginning of the sync. If we have reached the target position, we shutdown the debezium
* engine 2. The TargetPosition logic might not always work and in order to tackle that we have
* another logic where if we do not receive records from debezium for a given duration, we ask
* debezium engine to shutdown 3. We also take the Snapshot into consideration, when a connector is
* running for the first time, we let it complete the snapshot and only after the completion of
* snapshot we should shutdown the engine. If we are closing the engine before completion of
* snapshot, we throw an exception
*/
@Override
public void close() throws Exception {
requestClose.call();
throwExceptionIfSnapshotNotFinished();
}

private boolean shouldSignalClose(ChangeEvent<String, String> event) {

return targetPosition.reachedTargetPosition(Jsons.deserialize(event.value()));
private boolean shouldSignalClose(JsonNode eventAsJson) {
return targetPosition.reachedTargetPosition(eventAsJson);
}

private void requestClose() {
Expand All @@ -121,6 +149,13 @@ private void requestClose() {
} catch (Exception e) {
throw new RuntimeException(e);
}
throwExceptionIfSnapshotNotFinished();
}

private void throwExceptionIfSnapshotNotFinished() {
if (!hasSnapshotFinished) {
throw new RuntimeException("Closing down debezium engine but snapshot has not finished");
}
}

private static class WaitTime {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.6
LABEL io.airbyte.version=0.3.7
LABEL io.airbyte.name=airbyte/source-postgres

0 comments on commit 9b1aa3c

Please sign in to comment.