Skip to content

Commit

Permalink
postgres-source: complete implementation for for ctid and xmin sync (#…
Browse files Browse the repository at this point in the history
…27302)

* initial ctid for testing

* initial ctid for testing

* initial ctid for testing

* Automated Commit - Format and Process Resources Changes

* add version and state type to xmin status

* add logic to swtich between xmin and ctid sync

* npe fixes

* use enum

* refactor

* add relation node logic + validation for vacuuming + more refactor

* refine test + make PR ready for review

* remove un-wanted changes

* missed this one

* remove irrelevant comments

* add more assertions

* remove jdbc log

* address review comments

* 🤖 Auto format source-postgres code [skip ci]

* skip streams under vacuum

* 🤖 Auto format source-postgres code [skip ci]

* update log message

* 🤖 Auto format source-postgres code [skip ci]

* comment

* latest round of review comments

* missed this file

* 🤖 Auto format source-postgres code [skip ci]

* source-postgres : Add logic to handle xmin wraparound (#27466)

* Initial logic for xmin wraparound

* Add tests

* Address comments

* add xmin-wraparound check

* address review comments

* 🤖 Auto format source-postgres code [skip ci]

* missed this

* 🤖 Auto format source-postgres code [skip ci]

---------

Co-authored-by: Akash Kulkarni <akash@airbyte.io>
Co-authored-by: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com>
Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com>

---------

Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
Co-authored-by: rodireich <rodireich@users.noreply.github.com>
Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Akash Kulkarni <akash@airbyte.io>
Co-authored-by: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com>
  • Loading branch information
6 people committed Jun 22, 2023
1 parent 3bd340a commit 8c8f041
Show file tree
Hide file tree
Showing 16 changed files with 1,073 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ public static <T> AutoCloseableIterator<T> transform(final Function<AutoCloseabl
return new DefaultAutoCloseableIterator<>(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close, airbyteStream);
}

public static <T, F> AutoCloseableIterator<F> transformIterator(final Function<AutoCloseableIterator<T>, Iterator<F>> iteratorCreator,
final AutoCloseableIterator<T> autoCloseableIterator,
final AirbyteStreamNameNamespacePair airbyteStream) {
return new DefaultAutoCloseableIterator<F>(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close, airbyteStream);
}

@SafeVarargs
public static <T> CompositeIterator<T> concatWithEagerClose(final Consumer<AirbyteStreamStatusHolder> airbyteStreamStatusConsumer,
final AutoCloseableIterator<T>... iterators) {
Expand Down
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/source-postgres/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ jsonSchema2Pojo {
includeSetters = true
}




Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.source.postgres.xmin.XminStateManager.XMIN_STATE_VERSION;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType;
import io.airbyte.integrations.source.postgres.internal.models.XminStatus;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,36 +95,67 @@ public static XminStatus getXminStatus(final JdbcDatabase database) throws SQLEx
return new XminStatus()
.withNumWraparound(result.get(NUM_WRAPAROUND_COL).asLong())
.withXminXidValue(result.get(XMIN_XID_VALUE_COL).asLong())
.withXminRawValue(result.get(XMIN_RAW_VALUE_COL).asLong());
.withXminRawValue(result.get(XMIN_RAW_VALUE_COL).asLong())
.withVersion(XMIN_STATE_VERSION)
.withStateType(StateType.XMIN);
}

public static void logFullVacuumStatus(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final String quoteString) {
catalog.getStreams().forEach(stream -> {
static Map<AirbyteStreamNameNamespacePair, Long> fileNodeForStreams(final JdbcDatabase database,
final List<ConfiguredAirbyteStream> streams,
final String quoteString) {
final Map<AirbyteStreamNameNamespacePair, Long> fileNodes = new HashMap<>();
streams.forEach(stream -> {
final AirbyteStreamNameNamespacePair namespacePair =
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
final long l = fileNodeForStreams(database, namespacePair, quoteString);
fileNodes.put(namespacePair, l);
});
return fileNodes;
}

public static long fileNodeForStreams(final JdbcDatabase database, final AirbyteStreamNameNamespacePair stream, final String quoteString) {
try {
final String streamName = stream.getName();
final String schemaName = stream.getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_FULL_VACUUM_REL_FILENODE_QUERY.formatted(fullTableName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
final long relationFilenode = jsonNodes.get(0).get("pg_relation_filenode").asLong();
LOGGER.info("Relation filenode is for stream {} is {}", fullTableName, relationFilenode);
return relationFilenode;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

public static List<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair> streamsUnderVacuum(final JdbcDatabase database,
final List<ConfiguredAirbyteStream> streams,
final String quoteString) {
final List<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair> streamsUnderVacuuming = new ArrayList<>();
streams.forEach(stream -> {
final String streamName = stream.getStream().getName();
final String schemaName = stream.getStream().getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString);
LOGGER.info("Full Vacuum information for {}", fullTableName);
try {
List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_FULL_VACUUM_REL_FILENODE_QUERY.formatted(fullTableName)).executeQuery(),
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_FULL_VACUUM_IN_PROGRESS_QUERY.formatted(fullTableName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
LOGGER.info("Relation filenode is {}", jsonNodes.get(0).get("pg_relation_filenode"));

jsonNodes =
database.bufferedResultSetQuery(conn -> conn.prepareStatement(CTID_FULL_VACUUM_IN_PROGRESS_QUERY.formatted(fullTableName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
if (jsonNodes.size() == 0) {
LOGGER.info("No full vacuum currently in progress");
} else {
if (jsonNodes.size() != 0) {
Preconditions.checkState(jsonNodes.size() == 1);
LOGGER.info("Full Vacuum currently in progress in {} phase", jsonNodes.get(0).get("phase"));
LOGGER.warn("Full Vacuum currently in progress for table {} in {} phase, the table will be skipped from syncing data", fullTableName,
jsonNodes.get(0).get("phase"));
streamsUnderVacuuming.add(io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(stream));
}
} catch (SQLException e) {
LOGGER.warn("Failed to log full vacuum in progress. This warning shouldn't affect the sync and can be ignored", e);
// Assume it's safe to progress and skip relation node and vaccuum validation
LOGGER.warn("Failed to fetch vacuum for table {} info. Going to move ahead with the sync assuming it's safe", fullTableName, e);
}
});
return streamsUnderVacuuming;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
import static io.airbyte.integrations.source.postgres.PostgresUtils.isIncrementalSyncMode;
import static io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.categoriseStreams;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -56,8 +58,12 @@
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler;
import io.airbyte.integrations.source.postgres.internal.models.XminStatus;
import io.airbyte.integrations.source.postgres.xmin.PostgresXminHandler;
import io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.StreamsCategorised;
import io.airbyte.integrations.source.postgres.xmin.XminStateManager;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
Expand All @@ -83,6 +89,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -92,6 +99,7 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -216,7 +224,6 @@ protected Set<String> getExcludedViews() {
protected void logPreSyncDebugData(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog)
throws SQLException {
super.logPreSyncDebugData(database, catalog);
PostgresQueryUtils.logFullVacuumStatus(database, catalog, getQuoteString());
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getStream().getName();
final String schemaName = stream.getStream().getNamespace();
Expand Down Expand Up @@ -452,9 +459,50 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)));

} else if (PostgresUtils.isXmin(sourceConfig) && isIncrementalSyncMode(catalog)) {
final XminStateManager xminStateManager = new XminStateManager(stateManager.getRawStateMessages());
final PostgresXminHandler handler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);
return handler.getIncrementalIterators(catalog, tableNameToTable, emittedAt);
final StreamsCategorised streamsCategorised = categoriseStreams(stateManager, catalog, xminStatus);

final List<AutoCloseableIterator<AirbyteMessage>> ctidIterator = new ArrayList<>();
final List<AutoCloseableIterator<AirbyteMessage>> xminIterator = new ArrayList<>();

if (!streamsCategorised.ctidStreams().streamsForCtidSync().isEmpty()) {
final List<AirbyteStreamNameNamespacePair> streamsUnderVacuum = streamsUnderVacuum(database,
streamsCategorised.ctidStreams().streamsForCtidSync(), getQuoteString());

final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid =
streamsUnderVacuum.isEmpty() ? streamsCategorised.ctidStreams().streamsForCtidSync()
: streamsCategorised.ctidStreams().streamsForCtidSync().stream()
.filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c)))
.toList();
LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size());
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, Long> fileNodes = PostgresQueryUtils.fileNodeForStreams(database,
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());
final CtidStateManager ctidStateManager = new CtidStateManager(streamsCategorised.ctidStreams().statesFromCtidSync(), fileNodes);
final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(sourceConfig, database, new CtidPostgresSourceOperations(), getQuoteString(),
fileNodes, ctidStateManager,
namespacePair -> Jsons.jsonNode(xminStatus),
(namespacePair, jsonState) -> XminStateManager.getAirbyteStateMessage(namespacePair, Jsons.object(jsonState, XminStatus.class)));
ctidIterator.addAll(ctidHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt));
} else {
LOGGER.info("No Streams will be synced via ctid.");
}

if (!streamsCategorised.xminStreams().streamsForXminSync().isEmpty()) {
LOGGER.info("Streams to be synced via xmin : {}", streamsCategorised.xminStreams().streamsForXminSync().size());
final XminStateManager xminStateManager = new XminStateManager(streamsCategorised.xminStreams().statesFromXminSync());
final PostgresXminHandler xminHandler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);

xminIterator.addAll(xminHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(streamsCategorised.xminStreams().streamsForXminSync()), tableNameToTable, emittedAt));
} else {
LOGGER.info("No Streams will be synced via xmin.");
}

return Stream
.of(ctidIterator, xminIterator)
.flatMap(Collection::stream)
.collect(Collectors.toList());
} else {
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres.ctid;

import io.airbyte.protocol.models.v0.AirbyteMessage;

/**
* ctid of rows is queried as part of our sync and is used to checkpoint to be able to restart
* failed sync from a known last point. Since we never want to emit a ctid it is kept in a different
* field, to save us an expensive JsonNode.remove() operation.
*
* @param recordMessage row fields to emit
* @param ctid ctid
*/
public record AirbyteMessageWithCtid(AirbyteMessage recordMessage, String ctid) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres.ctid;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresSourceOperations;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Objects;

public class CtidPostgresSourceOperations extends PostgresSourceOperations {

private static final String CTID = "ctid";

public RowDataWithCtid recordWithCtid(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
final ResultSetMetaData metadata = queryContext.getMetaData();
final int columnCount = metadata.getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
String ctid = null;
for (int i = 1; i <= columnCount; i++) {
final String columnName = metadata.getColumnName(i);
if (columnName.equalsIgnoreCase(CTID)) {
ctid = queryContext.getString(i);
continue;
}

// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode);
}

assert Objects.nonNull(ctid);
return new RowDataWithCtid(jsonNode, ctid);
}

public record RowDataWithCtid(JsonNode data, String ctid) {

}

}
Loading

0 comments on commit 8c8f041

Please sign in to comment.