Skip to content

Commit

Permalink
[source-postgres] State counter on postgres (#34724)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
Co-authored-by: Joe Bell <joseph.bell@airbyte.io>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: SatishChGit <satishchinthanippu@gmail.com>
Co-authored-by: evantahler <evan@airbyte.io>
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
Co-authored-by: Joe Reuter <joe@airbyte.io>
Co-authored-by: Catherine Noll <clnoll@users.noreply.github.com>
Co-authored-by: Anton Karpets <anton.karpets@globallogic.com>
Co-authored-by: Christo Grabowski <108154848+ChristoGrab@users.noreply.github.com>
Co-authored-by: Akash Kulkarni <akash@airbyte.io>
Co-authored-by: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com>
Co-authored-by: Gireesh Sreepathi <gisripa@gmail.com>
Co-authored-by: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com>
Co-authored-by: Ella Rohm-Ensing <erohmensing@gmail.com>
Co-authored-by: Daryna Ishchenko <80129833+darynaishchenko@users.noreply.github.com>
Co-authored-by: Baz <oleksandr.bazarnov@globallogic.com>
Co-authored-by: Patrick Nilan <nilan.patrick@gmail.com>
Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Marius Posta <marius@airbyte.io>
Co-authored-by: Chandler Prall <chandler.prall@gmail.com>
Co-authored-by: pmossman <pmossman@users.noreply.github.com>
Co-authored-by: Anatolii Yatsuk <35109939+tolik0@users.noreply.github.com>
Co-authored-by: Cole Snodgrass <cole@airbyte.io>
Co-authored-by: Anatolii Yatsuk <tolikyatsuk@gmail.com>
Co-authored-by: bgroff <bgroff@users.noreply.github.com>
Co-authored-by: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com>
Co-authored-by: Ryan Waskewich <156025126+rwask@users.noreply.github.com>
Co-authored-by: maxi297 <maxi297@users.noreply.github.com>
Co-authored-by: Pedro S. Lopez <pedroslopez@me.com>
Co-authored-by: Håkon Åmdal <hakon@aamdal.com>
Co-authored-by: Roman Yermilov [GL] <86300758+roman-yermilov-gl@users.noreply.github.com>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Lake Mossman <lake@airbyte.io>
Co-authored-by: lmossman <lmossman@users.noreply.github.com>
Co-authored-by: Brian Lai <51336873+brianjlai@users.noreply.github.com>
Co-authored-by: Subodh Kant Chaturvedi <subodh1810@gmail.com>
Co-authored-by: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com>
Co-authored-by: Sajarin <sajarindider@gmail.com>
Co-authored-by: Edward Gao <edward.gao@airbyte.io>
Co-authored-by: nguyenaiden <duy@airbyte.io>
Co-authored-by: Natalie Kwong <38087517+nataliekwong@users.noreply.github.com>
Co-authored-by: terencecho <3916587+terencecho@users.noreply.github.com>
Co-authored-by: Alex Birdsall <ambirdsall@gmail.com>
  • Loading branch information
Show file tree
Hide file tree
Showing 16 changed files with 206 additions and 225 deletions.
@@ -1 +1 @@
version=0.23.16
version=0.23.17
Expand Up @@ -11,6 +11,8 @@ public class DebeziumIteratorConstants {
public static final String SYNC_CHECKPOINT_DURATION_PROPERTY = "sync_checkpoint_seconds";
public static final String SYNC_CHECKPOINT_RECORDS_PROPERTY = "sync_checkpoint_records";

// TODO: Move these variables to a separate class IteratorConstants, as they will be used in state
// iterators for non debezium cases too.
public static final Duration SYNC_CHECKPOINT_DURATION = Duration.ofMinutes(15);
public static final Integer SYNC_CHECKPOINT_RECORDS = 10_000;

Expand Down
Expand Up @@ -141,6 +141,13 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {

protected abstract void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages);

// TODO: this assertion should be added into test cases in this class, we will need to implement
// corresponding iterator for other connectors before
// doing so.
protected void assertExpectedStateMessageCountMatches(final List<AirbyteStateMessage> stateMessages, long totalCount) {
// Do nothing.
}

@BeforeEach
protected void setup() {
testdb = createTestDatabase();
Expand Down Expand Up @@ -350,6 +357,7 @@ void testExistingData() throws Exception {

assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages);
assertExpectedStateMessages(stateMessages);
assertExpectedStateMessageCountMatches(stateMessages, MODEL_RECORDS.size());
}

protected void compareTargetPositionFromTheRecordsWithTargetPostionGeneratedBeforeSync(final CdcTargetPosition targetPosition,
Expand Down Expand Up @@ -377,6 +385,7 @@ public void testDelete() throws Exception {
extractRecordMessages(actualRecords2));
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertExpectedStateMessagesFromIncrementalSync(stateMessages2);
assertExpectedStateMessageCountMatches(stateMessages2, 1);
assertEquals(1, recordMessages2.size());
assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt());
assertCdcMetaData(recordMessages2.get(0).getData(), false);
Expand Down Expand Up @@ -411,6 +420,7 @@ public void testUpdate() throws Exception {
assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt());
assertEquals(updatedModel, recordMessages2.get(0).getData().get(COL_MODEL).asText());
assertCdcMetaData(recordMessages2.get(0).getData(), true);
assertExpectedStateMessageCountMatches(stateMessages2, 1);
}

@SuppressWarnings({"BusyWait", "CodeBlock2Expr"})
Expand Down Expand Up @@ -534,6 +544,8 @@ public void testCdcAndFullRefreshInSameSync() throws Exception {
final HashSet<String> names = new HashSet<>(STREAM_NAMES);
names.add(MODELS_STREAM_NAME + "_2");
assertExpectedStateMessages(stateMessages1);
// Full refresh does not get any state messages.
assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS_2.size());
assertExpectedRecords(Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
.collect(Collectors.toSet()),
recordMessages1,
Expand All @@ -554,6 +566,7 @@ public void testCdcAndFullRefreshInSameSync() throws Exception {
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertExpectedStateMessagesFromIncrementalSync(stateMessages2);
assertExpectedStateMessageCountMatches(stateMessages2, 1);
assertExpectedRecords(
Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord))
.collect(Collectors.toSet()),
Expand All @@ -576,6 +589,7 @@ public void testNoData() throws Exception {
final List<AirbyteStateMessage> stateMessages = extractStateMessages(actualRecords);
assertExpectedRecords(Collections.emptySet(), recordMessages);
assertExpectedStateMessagesForNoData(stateMessages);
assertExpectedStateMessageCountMatches(stateMessages, 0);
}

protected void assertExpectedStateMessagesForNoData(final List<AirbyteStateMessage> stateMessages) {
Expand All @@ -600,6 +614,7 @@ public void testNoDataOnSecondSync() throws Exception {

assertExpectedRecords(Collections.emptySet(), recordMessages2);
assertExpectedStateMessagesFromIncrementalSync(stateMessages2);
assertExpectedStateMessageCountMatches(stateMessages2, 0);
}

@Test
Expand Down Expand Up @@ -630,6 +645,7 @@ public void newTableSnapshotTest() throws Exception {
dataFromFirstBatch);
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertExpectedStateMessages(stateAfterFirstBatch);
assertExpectedStateMessageCountMatches(stateAfterFirstBatch, MODEL_RECORDS.size());

final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion = stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterFirstSyncCompletion.getType());
Expand Down
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.16'
cdkVersionRequired = '0.23.17'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.14
dockerImageTag: 3.3.15
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down

This file was deleted.

Expand Up @@ -5,18 +5,32 @@
package io.airbyte.integrations.source.postgres.ctid;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CtidStateManager {
public abstract class CtidStateManager implements SourceStateMessageProducer<AirbyteMessageWithCtid> {

private static final Logger LOGGER = LoggerFactory.getLogger(CtidStateManager.class);

public static final long CTID_STATUS_VERSION = 2;
public static final String STATE_TYPE_KEY = "state_type";

protected final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus;
private Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;

private String lastCtid;
private FileNodeHandler fileNodeHandler;

protected CtidStateManager(final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus) {
this.pairToCtidStatus = pairToCtidStatus;
Expand All @@ -41,4 +55,65 @@ public static boolean validateRelationFileNode(final CtidStatus ctidstatus,

public abstract AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun);

public void setStreamStateIteratorFields(Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier,
FileNodeHandler fileNodeHandler) {
this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier;
this.fileNodeHandler = fileNodeHandler;
}

@Override
public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());
final Long fileNode = fileNodeHandler.getFileNode(pair);
assert fileNode != null;
final CtidStatus ctidStatus = new CtidStatus()
.withVersion(CTID_STATUS_VERSION)
.withStateType(StateType.CTID)
.withCtid(lastCtid)
.withIncrementalState(getStreamState(pair))
.withRelationFilenode(fileNode);
LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus);
return createCtidStateMessage(pair, ctidStatus);
}

/**
* Stores the latest CTID.
*/
@Override
public AirbyteMessage processRecordMessage(final ConfiguredAirbyteStream stream, AirbyteMessageWithCtid message) {
if (Objects.nonNull(message.ctid())) {
this.lastCtid = message.ctid();
}
return message.recordMessage();
}

/**
* Creates a final state message for the stream.
*/
@Override
public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream stream) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());

final AirbyteStateMessage finalStateMessage = createFinalStateMessage(pair, getStreamState(pair));
LOGGER.info("Finished initial sync of stream {}, Emitting final state, state is {}", pair, finalStateMessage);
return finalStateMessage;
}

/**
* Extra criteria(besides checking frequency) to check if we should emit state message.
*/
@Override
public boolean shouldEmitStateMessage(final ConfiguredAirbyteStream stream) {
return Objects.nonNull(lastCtid)
&& StringUtils.isNotBlank(lastCtid);
}

private JsonNode getStreamState(final AirbyteStreamNameNamespacePair pair) {
final CtidStatus currentCtidStatus = getCtidStatus(pair);
return (currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair)
: currentCtidStatus.getIncrementalState();
}

}
Expand Up @@ -10,15 +10,17 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.postgres.PostgresQueryUtils.TableBlockSize;
import io.airbyte.integrations.source.postgres.PostgresType;
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations.RowDataWithCtid;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -109,7 +111,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getInitialSyncCtidIterator(
tablesMaxTuple.orElseGet(() -> Map.of(pair, -1)).get(pair));
final AutoCloseableIterator<AirbyteMessageWithCtid> recordIterator =
getRecordIterator(queryStream, streamName, namespace, emmitedAt.toEpochMilli());
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, pair);
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream);
final AutoCloseableIterator<AirbyteMessage> logAugmented = augmentWithLogs(recordAndMessageIterator, pair, streamName);
iteratorList.add(logAugmented);

Expand Down Expand Up @@ -165,21 +167,20 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithLogs(final AutoCloseabl
}

private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseableIterator<AirbyteMessageWithCtid> recordIterator,
final AirbyteStreamNameNamespacePair pair) {
final ConfiguredAirbyteStream airbyteStream) {

final CtidStatus currentCtidStatus = ctidStateManager.getCtidStatus(pair);
final JsonNode incrementalState =
(currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair)
: currentCtidStatus.getIncrementalState();
final Duration syncCheckpointDuration =
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: CtidStateIterator.SYNC_CHECKPOINT_DURATION;
: DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: CtidStateIterator.SYNC_CHECKPOINT_RECORDS;
: DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS;

ctidStateManager.setStreamStateIteratorFields(streamStateForIncrementalRunSupplier, fileNodeHandler);

final AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
return AutoCloseableIterators.transformIterator(
r -> new CtidStateIterator(r, pair, fileNodeHandler, ctidStateManager, incrementalState,
syncCheckpointDuration, syncCheckpointRecords),
r -> new SourceStateIterator(r, airbyteStream, ctidStateManager, new StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)),
recordIterator, pair);
}

Expand Down

0 comments on commit 1bb7a1c

Please sign in to comment.