diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java b/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java index 26152d9624797..5a66d803eae83 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java @@ -174,6 +174,12 @@ public static AutoCloseableIterator transform(final Function(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close, airbyteStream); } + public static AutoCloseableIterator transformIterator(final Function, Iterator> iteratorCreator, + final AutoCloseableIterator autoCloseableIterator, + final AirbyteStreamNameNamespacePair airbyteStream) { + return new DefaultAutoCloseableIterator(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close, airbyteStream); + } + @SafeVarargs public static CompositeIterator concatWithEagerClose(final Consumer airbyteStreamStatusConsumer, final AutoCloseableIterator... iterators) { diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index ca362e246613e..8219fcbf540af 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -57,3 +57,6 @@ jsonSchema2Pojo { includeSetters = true } + + + diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java index 7043037bd70fd..03ed5d942d48c 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java @@ -4,16 +4,22 @@ package io.airbyte.integrations.source.postgres; +import static io.airbyte.integrations.source.postgres.xmin.XminStateManager.XMIN_STATE_VERSION; import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType; import io.airbyte.integrations.source.postgres.internal.models.XminStatus; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,36 +95,67 @@ public static XminStatus getXminStatus(final JdbcDatabase database) throws SQLEx return new XminStatus() .withNumWraparound(result.get(NUM_WRAPAROUND_COL).asLong()) .withXminXidValue(result.get(XMIN_XID_VALUE_COL).asLong()) - .withXminRawValue(result.get(XMIN_RAW_VALUE_COL).asLong()); + .withXminRawValue(result.get(XMIN_RAW_VALUE_COL).asLong()) + .withVersion(XMIN_STATE_VERSION) + .withStateType(StateType.XMIN); } - public static void logFullVacuumStatus(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final String quoteString) { - catalog.getStreams().forEach(stream -> { + static Map fileNodeForStreams(final JdbcDatabase database, + final List streams, + final String quoteString) { + final Map fileNodes = new HashMap<>(); + streams.forEach(stream -> { + final AirbyteStreamNameNamespacePair namespacePair = + new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()); + final long l = fileNodeForStreams(database, namespacePair, quoteString); + fileNodes.put(namespacePair, l); + }); + return fileNodes; + } + + public static long fileNodeForStreams(final JdbcDatabase database, final AirbyteStreamNameNamespacePair stream, final String quoteString) { + try { + final String streamName = stream.getName(); + final String schemaName = stream.getNamespace(); + final String fullTableName = + getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString); + final List jsonNodes = database.bufferedResultSetQuery( + conn -> conn.prepareStatement(CTID_FULL_VACUUM_REL_FILENODE_QUERY.formatted(fullTableName)).executeQuery(), + resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); + Preconditions.checkState(jsonNodes.size() == 1); + final long relationFilenode = jsonNodes.get(0).get("pg_relation_filenode").asLong(); + LOGGER.info("Relation filenode is for stream {} is {}", fullTableName, relationFilenode); + return relationFilenode; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static List streamsUnderVacuum(final JdbcDatabase database, + final List streams, + final String quoteString) { + final List streamsUnderVacuuming = new ArrayList<>(); + streams.forEach(stream -> { final String streamName = stream.getStream().getName(); final String schemaName = stream.getStream().getNamespace(); final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString); - LOGGER.info("Full Vacuum information for {}", fullTableName); try { - List jsonNodes = database.bufferedResultSetQuery( - conn -> conn.prepareStatement(CTID_FULL_VACUUM_REL_FILENODE_QUERY.formatted(fullTableName)).executeQuery(), + final List jsonNodes = database.bufferedResultSetQuery( + conn -> conn.prepareStatement(CTID_FULL_VACUUM_IN_PROGRESS_QUERY.formatted(fullTableName)).executeQuery(), resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); - Preconditions.checkState(jsonNodes.size() == 1); - LOGGER.info("Relation filenode is {}", jsonNodes.get(0).get("pg_relation_filenode")); - - jsonNodes = - database.bufferedResultSetQuery(conn -> conn.prepareStatement(CTID_FULL_VACUUM_IN_PROGRESS_QUERY.formatted(fullTableName)).executeQuery(), - resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); - if (jsonNodes.size() == 0) { - LOGGER.info("No full vacuum currently in progress"); - } else { + if (jsonNodes.size() != 0) { Preconditions.checkState(jsonNodes.size() == 1); - LOGGER.info("Full Vacuum currently in progress in {} phase", jsonNodes.get(0).get("phase")); + LOGGER.warn("Full Vacuum currently in progress for table {} in {} phase, the table will be skipped from syncing data", fullTableName, + jsonNodes.get(0).get("phase")); + streamsUnderVacuuming.add(io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(stream)); } } catch (SQLException e) { - LOGGER.warn("Failed to log full vacuum in progress. This warning shouldn't affect the sync and can be ignored", e); + // Assume it's safe to progress and skip relation node and vaccuum validation + LOGGER.warn("Failed to fetch vacuum for table {} info. Going to move ahead with the sync assuming it's safe", fullTableName, e); } }); + return streamsUnderVacuuming; } } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 3df2d3f3ba6fa..60a00ff6203b9 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -20,7 +20,9 @@ import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum; import static io.airbyte.integrations.source.postgres.PostgresUtils.isIncrementalSyncMode; +import static io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.categoriseStreams; import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE; import static java.util.stream.Collectors.toList; @@ -56,8 +58,12 @@ import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils; import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode; import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; +import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations; +import io.airbyte.integrations.source.postgres.ctid.CtidStateManager; +import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler; import io.airbyte.integrations.source.postgres.internal.models.XminStatus; import io.airbyte.integrations.source.postgres.xmin.PostgresXminHandler; +import io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.StreamsCategorised; import io.airbyte.integrations.source.postgres.xmin.XminStateManager; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.integrations.source.relationaldb.state.StateManager; @@ -83,6 +89,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -92,6 +99,7 @@ import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,7 +224,6 @@ protected Set getExcludedViews() { protected void logPreSyncDebugData(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog) throws SQLException { super.logPreSyncDebugData(database, catalog); - PostgresQueryUtils.logFullVacuumStatus(database, catalog, getQuoteString()); for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { final String streamName = stream.getStream().getName(); final String schemaName = stream.getStream().getNamespace(); @@ -452,9 +459,50 @@ public List> getIncrementalIterators(final AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null))); } else if (PostgresUtils.isXmin(sourceConfig) && isIncrementalSyncMode(catalog)) { - final XminStateManager xminStateManager = new XminStateManager(stateManager.getRawStateMessages()); - final PostgresXminHandler handler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager); - return handler.getIncrementalIterators(catalog, tableNameToTable, emittedAt); + final StreamsCategorised streamsCategorised = categoriseStreams(stateManager, catalog, xminStatus); + + final List> ctidIterator = new ArrayList<>(); + final List> xminIterator = new ArrayList<>(); + + if (!streamsCategorised.ctidStreams().streamsForCtidSync().isEmpty()) { + final List streamsUnderVacuum = streamsUnderVacuum(database, + streamsCategorised.ctidStreams().streamsForCtidSync(), getQuoteString()); + + final List finalListOfStreamsToBeSyncedViaCtid = + streamsUnderVacuum.isEmpty() ? streamsCategorised.ctidStreams().streamsForCtidSync() + : streamsCategorised.ctidStreams().streamsForCtidSync().stream() + .filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c))) + .toList(); + LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size()); + final Map fileNodes = PostgresQueryUtils.fileNodeForStreams(database, + finalListOfStreamsToBeSyncedViaCtid, + getQuoteString()); + final CtidStateManager ctidStateManager = new CtidStateManager(streamsCategorised.ctidStreams().statesFromCtidSync(), fileNodes); + final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(sourceConfig, database, new CtidPostgresSourceOperations(), getQuoteString(), + fileNodes, ctidStateManager, + namespacePair -> Jsons.jsonNode(xminStatus), + (namespacePair, jsonState) -> XminStateManager.getAirbyteStateMessage(namespacePair, Jsons.object(jsonState, XminStatus.class))); + ctidIterator.addAll(ctidHandler.getIncrementalIterators( + new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt)); + } else { + LOGGER.info("No Streams will be synced via ctid."); + } + + if (!streamsCategorised.xminStreams().streamsForXminSync().isEmpty()) { + LOGGER.info("Streams to be synced via xmin : {}", streamsCategorised.xminStreams().streamsForXminSync().size()); + final XminStateManager xminStateManager = new XminStateManager(streamsCategorised.xminStreams().statesFromXminSync()); + final PostgresXminHandler xminHandler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager); + + xminIterator.addAll(xminHandler.getIncrementalIterators( + new ConfiguredAirbyteCatalog().withStreams(streamsCategorised.xminStreams().streamsForXminSync()), tableNameToTable, emittedAt)); + } else { + LOGGER.info("No Streams will be synced via xmin."); + } + + return Stream + .of(ctidIterator, xminIterator) + .flatMap(Collection::stream) + .collect(Collectors.toList()); } else { return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/AirbyteMessageWithCtid.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/AirbyteMessageWithCtid.java new file mode 100644 index 0000000000000..cbadc1af7044d --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/AirbyteMessageWithCtid.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres.ctid; + +import io.airbyte.protocol.models.v0.AirbyteMessage; + +/** + * ctid of rows is queried as part of our sync and is used to checkpoint to be able to restart + * failed sync from a known last point. Since we never want to emit a ctid it is kept in a different + * field, to save us an expensive JsonNode.remove() operation. + * + * @param recordMessage row fields to emit + * @param ctid ctid + */ +public record AirbyteMessageWithCtid(AirbyteMessage recordMessage, String ctid) { + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPostgresSourceOperations.java new file mode 100644 index 0000000000000..62047de57e23d --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPostgresSourceOperations.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres.ctid; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.postgres.PostgresSourceOperations; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.Collections; +import java.util.Objects; + +public class CtidPostgresSourceOperations extends PostgresSourceOperations { + + private static final String CTID = "ctid"; + + public RowDataWithCtid recordWithCtid(final ResultSet queryContext) throws SQLException { + // the first call communicates with the database. after that the result is cached. + final ResultSetMetaData metadata = queryContext.getMetaData(); + final int columnCount = metadata.getColumnCount(); + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + String ctid = null; + for (int i = 1; i <= columnCount; i++) { + final String columnName = metadata.getColumnName(i); + if (columnName.equalsIgnoreCase(CTID)) { + ctid = queryContext.getString(i); + continue; + } + + // convert to java types that will convert into reasonable json. + copyToJsonField(queryContext, i, jsonNode); + } + + assert Objects.nonNull(ctid); + return new RowDataWithCtid(jsonNode, ctid); + } + + public record RowDataWithCtid(JsonNode data, String ctid) { + + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateIterator.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateIterator.java new file mode 100644 index 0000000000000..4c4c440200aed --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateIterator.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres.ctid; + +import static io.airbyte.integrations.source.postgres.ctid.CtidStateManager.CTID_STATUS_VERSION; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.AbstractIterator; +import io.airbyte.integrations.source.postgres.internal.models.CtidStatus; +import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import java.time.Duration; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.util.Iterator; +import java.util.Objects; +import java.util.function.BiFunction; +import javax.annotation.CheckForNull; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CtidStateIterator extends AbstractIterator implements Iterator { + + private static final Logger LOGGER = LoggerFactory.getLogger(CtidStateIterator.class); + public static final Duration SYNC_CHECKPOINT_DURATION = Duration.ofMinutes(15); + public static final Integer SYNC_CHECKPOINT_RECORDS = 10_000; + + private final Iterator messageIterator; + private final AirbyteStreamNameNamespacePair pair; + private boolean hasEmittedFinalState; + private String lastCtid; + private final JsonNode streamStateForIncrementalRun; + private final long relationFileNode; + private final BiFunction finalStateMessageSupplier; + private long recordCount = 0L; + private Instant lastCheckpoint = Instant.now(); + private final Duration syncCheckpointDuration; + private final Long syncCheckpointRecords; + + public CtidStateIterator(final Iterator messageIterator, + final AirbyteStreamNameNamespacePair pair, + final long relationFileNode, + final JsonNode streamStateForIncrementalRun, + final BiFunction finalStateMessageSupplier, + final Duration checkpointDuration, + final Long checkpointRecords) { + this.messageIterator = messageIterator; + this.pair = pair; + this.relationFileNode = relationFileNode; + this.streamStateForIncrementalRun = streamStateForIncrementalRun; + this.finalStateMessageSupplier = finalStateMessageSupplier; + this.syncCheckpointDuration = checkpointDuration; + this.syncCheckpointRecords = checkpointRecords; + } + + @CheckForNull + @Override + protected AirbyteMessage computeNext() { + if (messageIterator.hasNext()) { + if ((recordCount >= syncCheckpointRecords || Duration.between(lastCheckpoint, OffsetDateTime.now()).compareTo(syncCheckpointDuration) > 0) + && Objects.nonNull(lastCtid) + && StringUtils.isNotBlank(lastCtid)) { + final CtidStatus ctidStatus = new CtidStatus() + .withVersion(CTID_STATUS_VERSION) + .withStateType(StateType.CTID) + .withCtid(lastCtid) + .withIncrementalState(streamStateForIncrementalRun) + .withRelationFilenode(relationFileNode); + LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus); + recordCount = 0L; + lastCheckpoint = Instant.now(); + return CtidStateManager.createPerStreamStateMessage(pair, ctidStatus); + } + // Use try-catch to catch Exception that could occur when connection to the database fails + try { + final AirbyteMessageWithCtid message = messageIterator.next(); + if (Objects.nonNull(message.ctid())) { + this.lastCtid = message.ctid(); + } + recordCount++; + return message.recordMessage(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } else if (!hasEmittedFinalState) { + hasEmittedFinalState = true; + final AirbyteStateMessage finalStateMessage = finalStateMessageSupplier.apply(pair, streamStateForIncrementalRun); + LOGGER.info("Emitting final state for stream {}, state is {}", pair, finalStateMessage.getStream().getStreamState()); + return new AirbyteMessage() + .withType(Type.STATE) + .withState(finalStateMessage); + } else { + return endOfData(); + } + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java new file mode 100644 index 0000000000000..c6aecfbd71aba --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres.ctid; + +import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.integrations.source.postgres.internal.models.CtidStatus; +import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.Jsons; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.v0.AirbyteStreamState; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CtidStateManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(CtidStateManager.class); + public static final long CTID_STATUS_VERSION = 2; + private final Map pairToCtidStatus; + private final static AirbyteStateMessage EMPTY_STATE = new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState()); + + public CtidStateManager(final List stateMessages, final Map fileNodes) { + this.pairToCtidStatus = createPairToCtidStatusMap(stateMessages, fileNodes); + } + + private static Map createPairToCtidStatusMap(final List stateMessages, + final Map fileNodes) { + final Map localMap = new HashMap<>(); + if (stateMessages != null) { + for (final AirbyteStateMessage stateMessage : stateMessages) { + if (stateMessage.getType() == AirbyteStateType.STREAM && !stateMessage.equals(EMPTY_STATE)) { + LOGGER.info("State message: " + stateMessage); + final StreamDescriptor streamDescriptor = stateMessage.getStream().getStreamDescriptor(); + final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace()); + final CtidStatus ctidStatus; + try { + ctidStatus = Jsons.object(stateMessage.getStream().getStreamState(), CtidStatus.class); + assert (ctidStatus.getVersion() == CTID_STATUS_VERSION); + assert (ctidStatus.getStateType().equals(StateType.CTID)); + } catch (final IllegalArgumentException e) { + throw new ConfigErrorException("Invalid per-stream state"); + } + if (validateRelationFileNode(ctidStatus, pair, fileNodes)) { + localMap.put(pair, ctidStatus); + } else { + LOGGER.warn( + "The relation file node for table in source db {} is not equal to the saved ctid state, a full sync from scratch will be triggered.", + pair); + } + } + } + } + return localMap; + } + + private static boolean validateRelationFileNode(final CtidStatus ctidstatus, + final AirbyteStreamNameNamespacePair pair, + final Map fileNodes) { + if (fileNodes.containsKey(pair)) { + final Long fileNode = fileNodes.get(pair); + return Objects.equals(ctidstatus.getRelationFilenode(), fileNode); + } + return true; + } + + public CtidStatus getCtidStatus(final AirbyteStreamNameNamespacePair pair) { + return pairToCtidStatus.get(pair); + } + + // TODO : We will need a similar method to generate a GLOBAL state message for CDC + public static AirbyteMessage createPerStreamStateMessage(final AirbyteStreamNameNamespacePair pair, final CtidStatus ctidStatus) { + final AirbyteStreamState airbyteStreamState = + new AirbyteStreamState() + .withStreamDescriptor( + new StreamDescriptor() + .withName(pair.getName()) + .withNamespace(pair.getNamespace())) + .withStreamState(Jsons.jsonNode(ctidStatus)); + + final AirbyteStateMessage stateMessage = + new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream(airbyteStreamState); + + return new AirbyteMessage() + .withType(Type.STATE) + .withState(stateMessage); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/PostgresCtidHandler.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/PostgresCtidHandler.java new file mode 100644 index 0000000000000..5b9410b2796fb --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/PostgresCtidHandler.java @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres.ctid; + +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.stream.AirbyteStreamUtils; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.source.postgres.PostgresType; +import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations.RowDataWithCtid; +import io.airbyte.integrations.source.postgres.internal.models.CtidStatus; +import io.airbyte.integrations.source.relationaldb.DbSourceDiscoverUtil; +import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils; +import io.airbyte.integrations.source.relationaldb.TableInfo; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.CommonField; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.CatalogHelpers; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.SyncMode; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PostgresCtidHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCtidHandler.class); + + private final JsonNode config; + private final JdbcDatabase database; + private final CtidPostgresSourceOperations sourceOperations; + private final String quoteString; + private final CtidStateManager ctidStateManager; + private final Map fileNodes; + private final Function streamStateForIncrementalRunSupplier; + private final BiFunction finalStateMessageSupplier; + + public PostgresCtidHandler(final JsonNode config, + final JdbcDatabase database, + final CtidPostgresSourceOperations sourceOperations, + final String quoteString, + final Map fileNodes, + final CtidStateManager ctidStateManager, + final Function streamStateForIncrementalRunSupplier, + final BiFunction finalStateMessageSupplier) { + this.config = config; + this.database = database; + this.sourceOperations = sourceOperations; + this.quoteString = quoteString; + this.fileNodes = fileNodes; + this.ctidStateManager = ctidStateManager; + this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier; + this.finalStateMessageSupplier = finalStateMessageSupplier; + } + + public List> getIncrementalIterators( + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final Instant emmitedAt) { + final List> iteratorList = new ArrayList<>(); + for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { + final AirbyteStream stream = airbyteStream.getStream(); + final String streamName = stream.getName(); + final String namespace = stream.getNamespace(); + final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, namespace); + final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(namespace, streamName); + if (!tableNameToTable.containsKey(fullyQualifiedTableName)) { + LOGGER.info("Skipping stream {} because it is not in the source", fullyQualifiedTableName); + continue; + } + if (airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL)) { + // Grab the selected fields to sync + final TableInfo> table = tableNameToTable + .get(fullyQualifiedTableName); + final List selectedDatabaseFields = table.getFields() + .stream() + .map(CommonField::getName) + .filter(CatalogHelpers.getTopLevelFieldNames(airbyteStream)::contains) + .toList(); + final AutoCloseableIterator queryStream = queryTableCtid(selectedDatabaseFields, table.getNameSpace(), table.getName()); + final AutoCloseableIterator recordIterator = + getRecordIterator(queryStream, streamName, namespace, emmitedAt.toEpochMilli()); + final AutoCloseableIterator recordAndMessageIterator = augmentWithState(recordIterator, pair); + final AutoCloseableIterator logAugmented = augmentWithLogs(recordAndMessageIterator, pair, streamName); + iteratorList.add(logAugmented); + + } + } + return iteratorList; + } + + private AutoCloseableIterator queryTableCtid( + final List columnNames, + final String schemaName, + final String tableName) { + + LOGGER.info("Queueing query for table: {}", tableName); + final AirbyteStreamNameNamespacePair airbyteStream = + AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName); + return AutoCloseableIterators.lazyIterator(() -> { + try { + final Stream stream = database.unsafeQuery( + connection -> createCtidQueryStatement(connection, columnNames, schemaName, tableName, airbyteStream), sourceOperations::recordWithCtid); + return AutoCloseableIterators.fromStream(stream, airbyteStream); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + }, airbyteStream); + } + + private PreparedStatement createCtidQueryStatement( + final Connection connection, + final List columnNames, + final String schemaName, + final String tableName, + final AirbyteStreamNameNamespacePair airbyteStream) { + try { + LOGGER.info("Preparing query for table: {}", tableName); + final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName, + quoteString); + + final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString); + final String sql = "SELECT ctid, %s FROM %s WHERE ctid > ?::tid".formatted(wrappedColumnNames, fullTableName); + final PreparedStatement preparedStatement = connection.prepareStatement(sql); + final CtidStatus currentCtidStatus = ctidStateManager.getCtidStatus(airbyteStream); + if (currentCtidStatus != null) { + preparedStatement.setObject(1, currentCtidStatus.getCtid()); + } else { + preparedStatement.setObject(1, "(0,0)"); + } + LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement); + return preparedStatement; + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + // Transforms the given iterator to create an {@link AirbyteRecordMessage} + private AutoCloseableIterator getRecordIterator( + final AutoCloseableIterator recordIterator, + final String streamName, + final String namespace, + final long emittedAt) { + return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessageWithCtid(new AirbyteMessage() + .withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(streamName) + .withNamespace(namespace) + .withEmittedAt(emittedAt) + .withData(r.data())), + r.ctid())); + } + + // Augments the given iterator with record count logs. + private AutoCloseableIterator augmentWithLogs(final AutoCloseableIterator iterator, + final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair, + final String streamName) { + final AtomicLong recordCount = new AtomicLong(); + return AutoCloseableIterators.transform(iterator, + AirbyteStreamUtils.convertFromNameAndNamespace(pair.getName(), pair.getNamespace()), + r -> { + final long count = recordCount.incrementAndGet(); + if (count % 1_000_000 == 0) { + LOGGER.info("Reading stream {}. Records read: {}", streamName, count); + } + return r; + }); + } + + private AutoCloseableIterator augmentWithState(final AutoCloseableIterator recordIterator, + final AirbyteStreamNameNamespacePair pair) { + + final CtidStatus currentCtidStatus = ctidStateManager.getCtidStatus(pair); + final JsonNode incrementalState = + (currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair) + : currentCtidStatus.getIncrementalState(); + final Long latestFileNode = fileNodes.get(pair); + assert latestFileNode != null; + + final Duration syncCheckpointDuration = + config.get("sync_checkpoint_seconds") != null ? Duration.ofSeconds(config.get("sync_checkpoint_seconds").asLong()) + : CtidStateIterator.SYNC_CHECKPOINT_DURATION; + final Long syncCheckpointRecords = config.get("sync_checkpoint_records") != null ? config.get("sync_checkpoint_records").asLong() + : CtidStateIterator.SYNC_CHECKPOINT_RECORDS; + + return AutoCloseableIterators.transformIterator( + r -> new CtidStateIterator(r, pair, latestFileNode, incrementalState, finalStateMessageSupplier, + syncCheckpointDuration, syncCheckpointRecords), + recordIterator, pair); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/PostgresXminHandler.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/PostgresXminHandler.java index eac9cdd380b1d..3ec0e3692ffd6 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/PostgresXminHandler.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/PostgresXminHandler.java @@ -7,6 +7,7 @@ import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.stream.AirbyteStreamUtils; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -45,7 +46,7 @@ public class PostgresXminHandler { private final JdbcCompatibleSourceOperations sourceOperations; private final JdbcDatabase database; private final String quoteString; - private final XminStatus xminStatus; + private final XminStatus currentXminStatus; private final XminStateManager xminStateManager; private static final Logger LOGGER = LoggerFactory.getLogger(PostgresXminHandler.class); @@ -58,7 +59,7 @@ public PostgresXminHandler(final JdbcDatabase database, this.database = database; this.sourceOperations = sourceOperations; this.quoteString = quoteString; - this.xminStatus = xminStatus; + this.currentXminStatus = xminStatus; this.xminStateManager = xminStateManager; } @@ -138,27 +139,64 @@ private PreparedStatement createXminQueryStatement( quoteString); final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString); + + // Get the xmin status associated with the previous run + final XminStatus previousRunXminStatus = xminStateManager.getXminStatus(airbyteStream); + final PreparedStatement xminPreparedStatement = + getXminPreparedStatement(connection, wrappedColumnNames, fullTableName, previousRunXminStatus, currentXminStatus); + LOGGER.info("Executing query for table {}: {}", tableName, xminPreparedStatement); + return xminPreparedStatement; + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + private PreparedStatement getXminPreparedStatement(final Connection connection, + final String wrappedColumnNames, + final String fullTableName, + final XminStatus prevRunXminStatus, + final XminStatus currentXminStatus) + throws SQLException { + + if (prevRunXminStatus == null) { + throw new RuntimeException("XminStatus not found for table " + fullTableName + ", should have triggered a full sync via ctid path"); + } else if (isSingleWraparound(prevRunXminStatus, currentXminStatus)) { + // The xmin state that we save represents the lowest XID that is still in progress. To make sure we + // don't miss data associated with the current transaction, we have to issue an >=. Because of the + // wraparound, the changes prior to the + // end xmin xid value must also be captured. + LOGGER.info("Detect a single wraparound for {}", fullTableName); + final String sql = String.format("SELECT %s FROM %s WHERE xmin::text::bigint >= ? OR xmin::text::bigint < ?", + wrappedColumnNames, fullTableName); + final PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setLong(1, prevRunXminStatus.getXminXidValue()); + preparedStatement.setLong(2, currentXminStatus.getXminXidValue()); + + return preparedStatement; + } else { // The xmin state that we save represents the lowest XID that is still in progress. To make sure we - // don't miss - // data associated with the current transaction, we have to issue an >= + // don't miss data associated with the current transaction, we have to issue an >= final String sql = String.format("SELECT %s FROM %s WHERE xmin::text::bigint >= ?", wrappedColumnNames, fullTableName); final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); + preparedStatement.setLong(1, prevRunXminStatus.getXminXidValue()); - final XminStatus currentStreamXminStatus = xminStateManager.getXminStatus(airbyteStream); - if (currentStreamXminStatus != null) { - preparedStatement.setLong(1, currentStreamXminStatus.getXminXidValue()); - } else { - preparedStatement.setLong(1, 0L); - } - LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement); return preparedStatement; - } catch (final SQLException e) { - throw new RuntimeException(e); } } + @VisibleForTesting + static boolean isSingleWraparound(final XminStatus prevRunXminStatus, final XminStatus currentXminStatus) { + // Detect whether the source Postgres DB has undergone a single wraparound event. + return currentXminStatus.getNumWraparound() - prevRunXminStatus.getNumWraparound() == 1; + } + + static boolean shouldPerformFullSync(final XminStatus currentXminStatus, final JsonNode streamState) { + // Detects whether source Postgres DB has undergone multiple wraparound events between syncs. + return streamState.has("num_wraparound") && (currentXminStatus.getNumWraparound() - streamState.get("num_wraparound").asLong() >= 2); + } + // Transforms the given iterator to create an {@link AirbyteRecordMessage} private static AutoCloseableIterator getRecordIterator( final AutoCloseableIterator recordIterator, @@ -196,7 +234,7 @@ private AutoCloseableIterator augmentWithState(final AutoCloseab autoCloseableIterator -> new XminStateIterator( autoCloseableIterator, pair, - xminStatus), + currentXminStatus), recordIterator, AirbyteStreamUtils.convertFromNameAndNamespace(pair.getName(), pair.getNamespace())); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java new file mode 100644 index 0000000000000..1eef23b6616d9 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres.xmin; + +import static io.airbyte.integrations.source.postgres.xmin.PostgresXminHandler.shouldPerformFullSync; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Sets; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.postgres.internal.models.XminStatus; +import io.airbyte.integrations.source.relationaldb.state.StateManager; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The class mainly categorises the streams based on the state type into two categories : 1. Streams + * that need to be synced via ctid iterator: These are streams that are either newly added or did + * not complete their initial sync. 2. Streams that need to be synced via xmin iterator: These are + * streams that have completed their initial sync and are not syncing data incrementally. + */ +public class XminCtidUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(XminCtidUtils.class); + + public static StreamsCategorised categoriseStreams(final StateManager stateManager, + final ConfiguredAirbyteCatalog fullCatalog, + final XminStatus currentXminStatus) { + final List rawStateMessages = stateManager.getRawStateMessages(); + final List statesFromCtidSync = new ArrayList<>(); + final List statesFromXminSync = new ArrayList<>(); + + final Set alreadySeenStreams = new HashSet<>(); + final Set streamsStillInCtidSync = new HashSet<>(); + + if (rawStateMessages != null) { + rawStateMessages.forEach(stateMessage -> { + final JsonNode streamState = stateMessage.getStream().getStreamState(); + final StreamDescriptor streamDescriptor = stateMessage.getStream().getStreamDescriptor(); + if (streamState == null || streamDescriptor == null) { + return; + } + + if (streamState.has("state_type")) { + if (streamState.get("state_type").asText().equalsIgnoreCase("ctid")) { + statesFromCtidSync.add(stateMessage); + streamsStillInCtidSync.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace())); + } else if (streamState.get("state_type").asText().equalsIgnoreCase("xmin")) { + if (shouldPerformFullSync(currentXminStatus, streamState)) { + final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), + streamDescriptor.getNamespace()); + LOGGER.info("Detected multiple wraparounds. Will perform a full sync for {}", pair); + streamsStillInCtidSync.add(pair); + } else { + statesFromXminSync.add(stateMessage); + } + } else { + throw new RuntimeException("Unknown state type: " + streamState.get("state_type").asText()); + } + } else { + throw new RuntimeException("State type not present"); + } + alreadySeenStreams.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace())); + }); + } + + final List newlyAddedStreams = identifyNewlyAddedStreams(fullCatalog, alreadySeenStreams); + final List streamsForCtidSync = new ArrayList<>(); + fullCatalog.getStreams().stream() + .filter(stream -> streamsStillInCtidSync.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))) + .map(Jsons::clone) + .forEach(streamsForCtidSync::add); + + streamsForCtidSync.addAll(newlyAddedStreams); + + final List streamsForXminSync = fullCatalog.getStreams().stream() + .filter(stream -> !streamsForCtidSync.contains(stream)) + .map(Jsons::clone) + .toList(); + + return new StreamsCategorised(new CtidStreams(streamsForCtidSync, statesFromCtidSync), new XminStreams(streamsForXminSync, statesFromXminSync)); + } + + private static List identifyNewlyAddedStreams(final ConfiguredAirbyteCatalog fullCatalog, + final Set alreadySeenStreams) { + final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(fullCatalog); + + final Set newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySeenStreams)); + + return fullCatalog.getStreams().stream() + .filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))) + .map(Jsons::clone) + .collect(Collectors.toList()); + } + + public record StreamsCategorised(CtidStreams ctidStreams, + XminStreams xminStreams) { + + } + + public record CtidStreams(List streamsForCtidSync, + List statesFromCtidSync) { + + } + + public record XminStreams(List streamsForXminSync, + List statesFromXminSync) { + + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminStateManager.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminStateManager.java index 5459027251a6c..14cd2cef09568 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminStateManager.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminStateManager.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.source.postgres.xmin; -import com.fasterxml.jackson.databind.ObjectMapper; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.postgres.internal.models.XminStatus; @@ -27,6 +26,7 @@ public class XminStateManager { private static final Logger LOGGER = LoggerFactory.getLogger(XminStateManager.class); + public static final long XMIN_STATE_VERSION = 2L; private final Map pairToXminStatus; @@ -73,23 +73,28 @@ public XminStatus getXminStatus(final AirbyteStreamNameNamespacePair pair) { * @return AirbyteMessage which includes information on state of records read so far */ public static AirbyteMessage createStateMessage(final AirbyteStreamNameNamespacePair pair, final XminStatus xminStatus) { + final AirbyteStateMessage stateMessage = getAirbyteStateMessage(pair, xminStatus); + + return new AirbyteMessage() + .withType(Type.STATE) + .withState(stateMessage); + } + + public static AirbyteStateMessage getAirbyteStateMessage(final AirbyteStreamNameNamespacePair pair, final XminStatus xminStatus) { final AirbyteStreamState airbyteStreamState = new AirbyteStreamState() .withStreamDescriptor( new StreamDescriptor() .withName(pair.getName()) .withNamespace(pair.getNamespace())) - .withStreamState(new ObjectMapper().valueToTree(xminStatus)); + .withStreamState(Jsons.jsonNode(xminStatus)); // Set state final AirbyteStateMessage stateMessage = new AirbyteStateMessage() .withType(AirbyteStateType.STREAM) .withStream(airbyteStreamState); - - return new AirbyteMessage() - .withType(Type.STATE) - .withState(stateMessage); + return stateMessage; } } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/resources/internal_models/internal_models.yaml b/airbyte-integrations/connectors/source-postgres/src/main/resources/internal_models/internal_models.yaml index 8455adb4dead7..a79f6ecda22fe 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/resources/internal_models/internal_models.yaml +++ b/airbyte-integrations/connectors/source-postgres/src/main/resources/internal_models/internal_models.yaml @@ -4,12 +4,27 @@ title: Postgres Models type: object description: Postgres Models properties: - state: + state_type: + "$ref": "#/definitions/StateType" + xmin_state: "$ref": "#/definitions/XminStatus" + ctid_state: + "$ref": "#/definitions/CtidStatus" definitions: + StateType: + description: Enum to define the sync mode of state. + type: string + enum: + - xmin + - ctid XminStatus: type: object properties: + version: + description: Version of state. + type: integer + state_type: + "$ref": "#/definitions/StateType" num_wraparound: description: Number of times the Xmin value has wrapped around. type: integer @@ -19,3 +34,20 @@ definitions: xmin_raw_value: description: The raw value of the xmin snapshot. If no wraparound has occurred, this should be the same as 2. type: integer + CtidStatus: + type: object + properties: + version: + description: Version of state. + type: integer + state_type: + "$ref": "#/definitions/StateType" + ctid: + description: ctid bookmark + type: string + incremental_state: + description: State to switch to after completion of ctid initial sync + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + relation_filenode: + type: integer diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/XminPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/XminPostgresSourceTest.java index ee1ec29ae5fd7..f87edae0bf2da 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/XminPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/XminPostgresSourceTest.java @@ -9,12 +9,12 @@ import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; @@ -38,9 +38,12 @@ import io.airbyte.protocol.models.v0.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.jooq.DSLContext; import org.jooq.SQLDialect; @@ -92,12 +95,12 @@ class XminPostgresSourceTest { private static final ConfiguredAirbyteCatalog CONFIGURED_XMIN_CATALOG = toConfiguredXminCatalog(CATALOG); - private static final Set INITIAL_RECORD_MESSAGES = Sets.newHashSet( + private static final List INITIAL_RECORD_MESSAGES = Arrays.asList( createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null)), createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)), createRecord(STREAM_NAME, SCHEMA_NAME, map("id", null, "name", "piccolo", "power", null))); - private static final Set NEXT_RECORD_MESSAGES = Sets.newHashSet( + private static final List NEXT_RECORD_MESSAGES = Arrays.asList( createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("3.0"), "name", "gohan", "power", 222.1))); private static PostgreSQLContainer PSQL_DB; @@ -128,7 +131,7 @@ void setup() throws Exception { "CREATE TABLE id_and_name(id NUMERIC(20, 10) NOT NULL, name VARCHAR(200) NOT NULL, power double precision NOT NULL, PRIMARY KEY (id));"); ctx.fetch("CREATE INDEX i1 ON id_and_name (id);"); ctx.fetch( - "INSERT INTO id_and_name (id, name, power) VALUES (2, 'vegeta', 9000.1), (1,'goku', 'Infinity'), ('NaN', 'piccolo', '-Infinity');"); + "INSERT INTO id_and_name (id, name, power) VALUES (1,'goku', 'Infinity'), (2, 'vegeta', 9000.1), ('NaN', 'piccolo', '-Infinity');"); ctx.fetch("CREATE TABLE id_and_name2(id NUMERIC(20, 10) NOT NULL, name VARCHAR(200) NOT NULL, power double precision NOT NULL);"); ctx.fetch( @@ -169,6 +172,7 @@ private JsonNode getXminConfig(final PostgreSQLContainer psqlDb, final String .put(JdbcUtils.PASSWORD_KEY, psqlDb.getPassword()) .put(JdbcUtils.SSL_KEY, false) .put("replication_method", getReplicationMethod()) + .put("sync_checkpoint_records", 1) .build()); } @@ -202,30 +206,88 @@ void testReadSuccess() throws Exception { CONFIGURED_XMIN_CATALOG .withStreams(CONFIGURED_XMIN_CATALOG.getStreams().stream().filter(s -> s.getStream().getName().equals(STREAM_NAME)).collect( Collectors.toList())); - final List actualMessages = + final List recordsFromFirstSync = MoreIterators.toList(new PostgresSource().read(getXminConfig(PSQL_DB, dbName), configuredCatalog, null)); - setEmittedAtToNull(actualMessages); - assertThat(filterRecords(actualMessages)).containsExactlyInAnyOrderElementsOf(INITIAL_RECORD_MESSAGES); + setEmittedAtToNull(recordsFromFirstSync); + assertThat(filterRecords(recordsFromFirstSync)).containsExactlyElementsOf(INITIAL_RECORD_MESSAGES); // Extract the state message and assert that it exists. It contains the xmin value, so validating // the actual value isn't useful right now. - final List stateAfterFirstBatch = extractStateMessage(actualMessages); - assertThat(stateAfterFirstBatch.size()).isEqualTo(1); - JsonNode state = Jsons.jsonNode(stateAfterFirstBatch); + final List stateAfterFirstBatch = extractStateMessage(recordsFromFirstSync); + // We should have 3 state messages because we have set state emission frequency after each record in + // the test + assertEquals(3, stateAfterFirstBatch.size()); + + final AirbyteStateMessage firstStateMessage = stateAfterFirstBatch.get(0); + final String stateTypeFromFirstStateMessage = firstStateMessage.getStream().getStreamState().get("state_type").asText(); + final String ctidFromFirstStateMessage = firstStateMessage.getStream().getStreamState().get("ctid").asText(); + final JsonNode incrementalStateFromFirstStateMessage = firstStateMessage.getStream().getStreamState().get("incremental_state"); + + final AirbyteStateMessage secondStateMessage = stateAfterFirstBatch.get(1); + final String stateTypeFromSecondStateMessage = secondStateMessage.getStream().getStreamState().get("state_type").asText(); + final String ctidFromSecondStateMessage = secondStateMessage.getStream().getStreamState().get("ctid").asText(); + final JsonNode incrementalStateFromSecondStateMessage = secondStateMessage.getStream().getStreamState().get("incremental_state"); + + final AirbyteStateMessage thirdStateMessage = stateAfterFirstBatch.get(2); + final String stateTypeFromThirdStateMessage = thirdStateMessage.getStream().getStreamState().get("state_type").asText(); + + // First two state messages should be of ctid type + assertEquals("ctid", stateTypeFromFirstStateMessage); + assertEquals("ctid", stateTypeFromSecondStateMessage); + + // Since the third state message would be the final, it should be of xmin type + assertEquals("xmin", stateTypeFromThirdStateMessage); + + // The ctid value from second state message should be bigger than first state message + assertEquals(1, ctidFromSecondStateMessage.compareTo(ctidFromFirstStateMessage)); + + // The incremental state value from first and second state message should be the same + assertNotNull(incrementalStateFromFirstStateMessage); + assertNotNull(incrementalStateFromSecondStateMessage); + assertEquals(incrementalStateFromFirstStateMessage, incrementalStateFromSecondStateMessage); + + // The third state message should be equal to incremental_state of first two state messages + assertEquals(incrementalStateFromFirstStateMessage, thirdStateMessage.getStream().getStreamState()); // Assert that the last message in the sequence is a state message - assertMessageSequence(actualMessages); + assertMessageSequence(recordsFromFirstSync); + + // Sync should work with a ctid state + final List recordsFromSyncRunningWithACtidState = + MoreIterators.toList(new PostgresSource().read(getXminConfig(PSQL_DB, dbName), configuredCatalog, + Jsons.jsonNode(Collections.singletonList(firstStateMessage)))); + setEmittedAtToNull(recordsFromSyncRunningWithACtidState); + final List expectedDataFromSyncUsingFirstCtidState = new ArrayList<>(2); + final AtomicBoolean skippedFirstRecord = new AtomicBoolean(false); + INITIAL_RECORD_MESSAGES.forEach(c -> { + if (!skippedFirstRecord.get()) { + skippedFirstRecord.set(true); + return; + } + expectedDataFromSyncUsingFirstCtidState.add(c); + }); + assertThat(filterRecords(recordsFromSyncRunningWithACtidState)).containsExactlyElementsOf(expectedDataFromSyncUsingFirstCtidState); + + final List stateAfterSyncWithCtidState = extractStateMessage(recordsFromSyncRunningWithACtidState); + // Since only 2 records should be emitted so 2 state messages are expected + assertEquals(2, stateAfterSyncWithCtidState.size()); + assertEquals(secondStateMessage, stateAfterSyncWithCtidState.get(0)); + assertEquals(thirdStateMessage, stateAfterSyncWithCtidState.get(1)); - // Second read, should return no data - final List nextMessages = - MoreIterators.toList(new PostgresSource().read(getXminConfig(PSQL_DB, dbName), configuredCatalog, state)); - setEmittedAtToNull(nextMessages); - assertThat(filterRecords(nextMessages)).isEmpty(); + assertMessageSequence(recordsFromSyncRunningWithACtidState); + + // Read with the final xmin state message should return no data + final List syncWithXminStateType = + MoreIterators.toList(new PostgresSource().read(getXminConfig(PSQL_DB, dbName), configuredCatalog, + Jsons.jsonNode(Collections.singletonList(thirdStateMessage)))); + setEmittedAtToNull(syncWithXminStateType); + assertEquals(0, filterRecords(syncWithXminStateType).size()); // Even though no records were emitted, a state message is still expected - final List stateAfterSecondBatch = extractStateMessage(nextMessages); - assertThat(stateAfterFirstBatch.size()).isEqualTo(1); - state = Jsons.jsonNode(stateAfterSecondBatch); + final List stateAfterXminSync = extractStateMessage(syncWithXminStateType); + assertEquals(1, stateAfterXminSync.size()); + // Since no records were returned so the state should be the same as before + assertEquals(thirdStateMessage, stateAfterXminSync.get(0)); // We add some data and perform a third read. We should verify that (i) a delete is not captured and // (ii) the new record that is inserted into the @@ -239,21 +301,27 @@ void testReadSuccess() throws Exception { }); } - final List lastMessages = - MoreIterators.toList(new PostgresSource().read(getXminConfig(PSQL_DB, dbName), configuredCatalog, state)); - setEmittedAtToNull(lastMessages); - assertThat(filterRecords(lastMessages)).containsExactlyInAnyOrderElementsOf(NEXT_RECORD_MESSAGES); - assertMessageSequence(lastMessages); + final List recordsAfterLastSync = + MoreIterators.toList(new PostgresSource().read(getXminConfig(PSQL_DB, dbName), configuredCatalog, + Jsons.jsonNode(Collections.singletonList(stateAfterXminSync.get(0))))); + setEmittedAtToNull(recordsAfterLastSync); + assertThat(filterRecords(recordsAfterLastSync)).containsExactlyElementsOf(NEXT_RECORD_MESSAGES); + assertMessageSequence(recordsAfterLastSync); + final List stateAfterLastSync = extractStateMessage(recordsAfterLastSync); + assertEquals(1, stateAfterLastSync.size()); + + final AirbyteStateMessage finalStateMesssage = stateAfterLastSync.get(0); + final String stateTypeFromFinalStateMessage = finalStateMesssage.getStream().getStreamState().get("state_type").asText(); + assertEquals("xmin", stateTypeFromFinalStateMessage); + assertTrue(finalStateMesssage.getStream().getStreamState().get("xmin_xid_value").asLong() > thirdStateMessage.getStream().getStreamState() + .get("xmin_xid_value").asLong()); + assertTrue(finalStateMesssage.getStream().getStreamState().get("xmin_raw_value").asLong() > thirdStateMessage.getStream().getStreamState() + .get("xmin_raw_value").asLong()); } // Assert that the state message is the last message to be emitted. private static void assertMessageSequence(final List messages) { - for (int i = 0; i < messages.size(); i++) { - final AirbyteMessage message = messages.get(i); - if (message.getType().equals(Type.STATE)) { - assertThat(i).isEqualTo(messages.size() - 1); - } - } + assertEquals(Type.STATE, messages.get(messages.size() - 1).getType()); } private static List extractStateMessage(final List messages) { diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/PostgresXminHandlerTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/PostgresXminHandlerTest.java new file mode 100644 index 0000000000000..25c2aad834a8d --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/PostgresXminHandlerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres.xmin; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.postgres.internal.models.XminStatus; +import org.junit.jupiter.api.Test; + +public class PostgresXminHandlerTest { + + @Test + void testWraparound() { + final XminStatus initialStatus = + new XminStatus() + .withNumWraparound(0L) + .withXminRawValue(5555L) + .withXminRawValue(5555L); + final JsonNode initialStatusAsJson = Jsons.jsonNode(initialStatus); + + final XminStatus noWrapAroundStatus = + new XminStatus() + .withNumWraparound(0L) + .withXminRawValue(5588L) + .withXminRawValue(5588L); + assertFalse(PostgresXminHandler.isSingleWraparound(initialStatus, noWrapAroundStatus)); + assertFalse(PostgresXminHandler.shouldPerformFullSync(noWrapAroundStatus, initialStatusAsJson)); + + final XminStatus singleWrapAroundStatus = + new XminStatus() + .withNumWraparound(1L) + .withXminRawValue(5588L) + .withXminRawValue(4294972884L); + + assertTrue(PostgresXminHandler.isSingleWraparound(initialStatus, singleWrapAroundStatus)); + assertFalse(PostgresXminHandler.shouldPerformFullSync(singleWrapAroundStatus, initialStatusAsJson)); + + final XminStatus doubleWrapAroundStatus = + new XminStatus() + .withNumWraparound(2L) + .withXminRawValue(5588L) + .withXminRawValue(8589940180L); + + assertFalse(PostgresXminHandler.isSingleWraparound(initialStatus, doubleWrapAroundStatus)); + assertTrue(PostgresXminHandler.shouldPerformFullSync(doubleWrapAroundStatus, initialStatusAsJson)); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtilsTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtilsTest.java new file mode 100644 index 0000000000000..6a5ceb13cb1d7 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtilsTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres.xmin; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.postgres.internal.models.CtidStatus; +import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType; +import io.airbyte.integrations.source.postgres.internal.models.XminStatus; +import io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.StreamsCategorised; +import io.airbyte.integrations.source.relationaldb.state.StreamStateManager; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.v0.AirbyteStreamState; +import io.airbyte.protocol.models.v0.CatalogHelpers; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import io.airbyte.protocol.models.v0.SyncMode; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.Test; + +public class XminCtidUtilsTest { + + private static final ConfiguredAirbyteStream MODELS_STREAM = CatalogHelpers.toDefaultConfiguredStream(CatalogHelpers.createAirbyteStream( + "MODELS_STREAM_NAME", + "MODELS_SCHEMA", + Field.of("COL_ID", JsonSchemaType.INTEGER), + Field.of("COL_MAKE_ID", JsonSchemaType.INTEGER), + Field.of("COL_MODEL", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of("COL_ID")))); + + private static final ConfiguredAirbyteStream MODELS_STREAM_2 = CatalogHelpers.toDefaultConfiguredStream(CatalogHelpers.createAirbyteStream( + "MODELS_STREAM_NAME_2", + "MODELS_SCHEMA", + Field.of("COL_ID", JsonSchemaType.INTEGER), + Field.of("COL_MAKE_ID", JsonSchemaType.INTEGER), + Field.of("COL_MODEL", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of("COL_ID")))); + + @Test + public void emptyStateTest() { + final ConfiguredAirbyteCatalog configuredCatalog = new ConfiguredAirbyteCatalog().withStreams(Arrays.asList(MODELS_STREAM, MODELS_STREAM_2)); + final XminStatus xminStatus = new XminStatus().withStateType(StateType.XMIN).withVersion(2L).withXminXidValue(9L).withXminRawValue(9L) + .withNumWraparound(1L); + final StreamStateManager streamStateManager = new StreamStateManager(Collections.emptyList(), configuredCatalog); + final StreamsCategorised streamsCategorised = XminCtidUtils.categoriseStreams(streamStateManager, configuredCatalog, xminStatus); + + assertTrue(streamsCategorised.xminStreams().streamsForXminSync().isEmpty()); + assertTrue(streamsCategorised.xminStreams().statesFromXminSync().isEmpty()); + + assertEquals(2, streamsCategorised.ctidStreams().streamsForCtidSync().size()); + assertThat(streamsCategorised.ctidStreams().streamsForCtidSync()).containsExactlyInAnyOrder(MODELS_STREAM, MODELS_STREAM_2); + assertTrue(streamsCategorised.ctidStreams().statesFromCtidSync().isEmpty()); + } + + @Test + public void correctCategorisationTest() { + final ConfiguredAirbyteCatalog configuredCatalog = new ConfiguredAirbyteCatalog().withStreams(Arrays.asList(MODELS_STREAM, MODELS_STREAM_2)); + final XminStatus xminStatus = new XminStatus().withStateType(StateType.XMIN).withVersion(2L).withXminXidValue(9L).withXminRawValue(9L) + .withNumWraparound(1L); + final JsonNode xminStatusAsJson = Jsons.jsonNode(xminStatus); + final AirbyteStateMessage xminState = generateStateMessage(xminStatusAsJson, + new StreamDescriptor().withName(MODELS_STREAM.getStream().getName()).withNamespace(MODELS_STREAM.getStream().getNamespace())); + + final CtidStatus ctidStatus = new CtidStatus().withStateType(StateType.CTID).withVersion(2L).withCtid("123").withRelationFilenode(456L) + .withIncrementalState(xminStatusAsJson); + final JsonNode ctidStatusAsJson = Jsons.jsonNode(ctidStatus); + final AirbyteStateMessage ctidState = generateStateMessage(ctidStatusAsJson, + new StreamDescriptor().withName(MODELS_STREAM_2.getStream().getName()).withNamespace(MODELS_STREAM_2.getStream().getNamespace())); + + final StreamStateManager streamStateManager = new StreamStateManager(Arrays.asList(xminState, ctidState), configuredCatalog); + final StreamsCategorised streamsCategorised = XminCtidUtils.categoriseStreams(streamStateManager, configuredCatalog, xminStatus); + + assertEquals(1, streamsCategorised.xminStreams().streamsForXminSync().size()); + assertEquals(MODELS_STREAM, streamsCategorised.xminStreams().streamsForXminSync().get(0)); + assertEquals(1, streamsCategorised.xminStreams().statesFromXminSync().size()); + assertEquals(xminState, streamsCategorised.xminStreams().statesFromXminSync().get(0)); + + assertEquals(1, streamsCategorised.ctidStreams().streamsForCtidSync().size()); + assertEquals(MODELS_STREAM_2, streamsCategorised.ctidStreams().streamsForCtidSync().get(0)); + assertEquals(1, streamsCategorised.ctidStreams().statesFromCtidSync().size()); + assertEquals(ctidState, streamsCategorised.ctidStreams().statesFromCtidSync().get(0)); + } + + private AirbyteStateMessage generateStateMessage(final JsonNode stateData, final StreamDescriptor streamDescriptor) { + return new AirbyteStateMessage().withType(AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState().withStreamDescriptor(streamDescriptor).withStreamState(stateData)); + } + +}