Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres-source: complete implementation for for ctid and xmin sync #27302

Merged
merged 37 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
194a62c
initial ctid for testing
rodireich Jun 10, 2023
ec7198c
initial ctid for testing
rodireich Jun 11, 2023
1459acb
initial ctid for testing
rodireich Jun 13, 2023
8681c8c
Automated Commit - Format and Process Resources Changes
rodireich Jun 13, 2023
14371ea
add version and state type to xmin status
subodh1810 Jun 13, 2023
a4ecc7e
Merge remote-tracking branch 'origin/26486-initial-sync-using-ctid' i…
subodh1810 Jun 13, 2023
eb7255c
add logic to swtich between xmin and ctid sync
subodh1810 Jun 13, 2023
69e6989
Merge branch 'master' into 26486-initial-sync-using-ctid
subodh1810 Jun 13, 2023
a796a81
Merge branch '26486-initial-sync-using-ctid' into state-structure-cha…
subodh1810 Jun 13, 2023
2215f64
npe fixes
subodh1810 Jun 13, 2023
52d09a0
use enum
subodh1810 Jun 13, 2023
c6bebb5
refactor
subodh1810 Jun 15, 2023
286c5cd
add relation node logic + validation for vacuuming + more refactor
subodh1810 Jun 16, 2023
7950be8
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 18, 2023
7ef08e3
refine test + make PR ready for review
subodh1810 Jun 18, 2023
cba94db
remove un-wanted changes
subodh1810 Jun 18, 2023
378357a
missed this one
subodh1810 Jun 18, 2023
cb5bd39
remove irrelevant comments
subodh1810 Jun 18, 2023
0689f30
add more assertions
subodh1810 Jun 19, 2023
7ea3fdf
remove jdbc log
subodh1810 Jun 19, 2023
3ab062c
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 20, 2023
f7e9939
address review comments
subodh1810 Jun 20, 2023
06f3ea7
🤖 Auto format source-postgres code [skip ci]
octavia-squidington-iii Jun 20, 2023
643aff7
skip streams under vacuum
subodh1810 Jun 20, 2023
0756579
🤖 Auto format source-postgres code [skip ci]
octavia-squidington-iii Jun 20, 2023
134637c
update log message
subodh1810 Jun 20, 2023
1f67fac
Merge branch 'state-structure-change-xmin-ctid' of https://github.com…
subodh1810 Jun 20, 2023
1e57a69
🤖 Auto format source-postgres code [skip ci]
octavia-squidington-iii Jun 20, 2023
f7dddd8
comment
rodireich Jun 20, 2023
06dcae4
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 21, 2023
854b414
latest round of review comments
subodh1810 Jun 21, 2023
65f4711
missed this file
subodh1810 Jun 21, 2023
63a14f7
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 21, 2023
52eb61d
🤖 Auto format source-postgres code [skip ci]
octavia-squidington-iii Jun 21, 2023
27fb1a2
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 21, 2023
f0a1dc4
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 22, 2023
97aa8ff
source-postgres : Add logic to handle xmin wraparound (#27466)
subodh1810 Jun 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ public static <T> AutoCloseableIterator<T> transform(final Function<AutoCloseabl
return new DefaultAutoCloseableIterator<>(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close, airbyteStream);
}

public static <T, F> AutoCloseableIterator<F> transformIterator(final Function<AutoCloseableIterator<T>, Iterator<F>> iteratorCreator,
final AutoCloseableIterator<T> autoCloseableIterator,
final AirbyteStreamNameNamespacePair airbyteStream) {
return new DefaultAutoCloseableIterator<F>(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close, airbyteStream);
}

@SafeVarargs
public static <T> CompositeIterator<T> concatWithEagerClose(final Consumer<AirbyteStreamStatusHolder> airbyteStreamStatusConsumer,
final AutoCloseableIterator<T>... iterators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ jsonSchema2Pojo {
includeSetters = true
}

rodireich marked this conversation as resolved.
Show resolved Hide resolved

Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.source.postgres.xmin.XminStateManager.XMIN_STATE_VERSION;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType;
import io.airbyte.integrations.source.postgres.internal.models.XminStatus;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,36 +95,67 @@ public static XminStatus getXminStatus(final JdbcDatabase database) throws SQLEx
return new XminStatus()
.withNumWraparound(result.get(NUM_WRAPAROUND_COL).asLong())
.withXminXidValue(result.get(XMIN_XID_VALUE_COL).asLong())
.withXminRawValue(result.get(XMIN_RAW_VALUE_COL).asLong());
.withXminRawValue(result.get(XMIN_RAW_VALUE_COL).asLong())
.withVersion(XMIN_STATE_VERSION)
.withStateType(StateType.XMIN);
subodh1810 marked this conversation as resolved.
Show resolved Hide resolved
}

public static void logFullVacuumStatus(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final String quoteString) {
catalog.getStreams().forEach(stream -> {
static Map<AirbyteStreamNameNamespacePair, Long> fileNodeForStreams(final JdbcDatabase database,
final List<ConfiguredAirbyteStream> streams,
final String quoteString) {
final Map<AirbyteStreamNameNamespacePair, Long> fileNodes = new HashMap<>();
streams.forEach(stream -> {
final AirbyteStreamNameNamespacePair namespacePair =
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
final long l = fileNodeForStreams(database, namespacePair, quoteString);
fileNodes.put(namespacePair, l);
});
return fileNodes;
}

public static long fileNodeForStreams(final JdbcDatabase database, final AirbyteStreamNameNamespacePair stream, final String quoteString) {
try {
final String streamName = stream.getName();
final String schemaName = stream.getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_FULL_VACUUM_REL_FILENODE_QUERY.formatted(fullTableName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
final long relationFilenode = jsonNodes.get(0).get("pg_relation_filenode").asLong();
LOGGER.info("Relation filenode is for stream {} is {}", fullTableName, relationFilenode);
return relationFilenode;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

public static List<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair> streamsUnderVacuum(final JdbcDatabase database,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never actually get here in case a full vacuum is running.
Since a full vacuum takes an exclusive read on a table as a result it'll block any SELECT until vacuum is done.
For example:
A call to getXminStatus() in PostgresSource.java#246 will block

this.xminStatus = PostgresQueryUtils.getXminStatus(database);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm! Am not sure I understand. The XMIN query doesnt query any specific table. It queries the xmin value from the database. If a table is going under vacuum how would it stop us from getting the xmin value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this line as an example
but even queries before and after are hanging if there's a full vacuum going on.
I'd leave our check still just in case, but it is largely a dead code.

final List<ConfiguredAirbyteStream> streams,
final String quoteString) {
final List<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair> streamsUnderVacuuming = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: static import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that there is already a static import for the class without the v0 (import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;) so it wont let me static import this

streams.forEach(stream -> {
final String streamName = stream.getStream().getName();
final String schemaName = stream.getStream().getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString);
LOGGER.info("Full Vacuum information for {}", fullTableName);
try {
List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_FULL_VACUUM_REL_FILENODE_QUERY.formatted(fullTableName)).executeQuery(),
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_FULL_VACUUM_IN_PROGRESS_QUERY.formatted(fullTableName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
LOGGER.info("Relation filenode is {}", jsonNodes.get(0).get("pg_relation_filenode"));

jsonNodes =
database.bufferedResultSetQuery(conn -> conn.prepareStatement(CTID_FULL_VACUUM_IN_PROGRESS_QUERY.formatted(fullTableName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
if (jsonNodes.size() == 0) {
LOGGER.info("No full vacuum currently in progress");
} else {
if (jsonNodes.size() != 0) {
Preconditions.checkState(jsonNodes.size() == 1);
LOGGER.info("Full Vacuum currently in progress in {} phase", jsonNodes.get(0).get("phase"));
LOGGER.warn("Full Vacuum currently in progress for table {} in {} phase, the table will be skipped from syncing data", fullTableName,
jsonNodes.get(0).get("phase"));
streamsUnderVacuuming.add(io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(stream));
}
} catch (SQLException e) {
LOGGER.warn("Failed to log full vacuum in progress. This warning shouldn't affect the sync and can be ignored", e);
// Assume it's safe to progress and skip relation node and vaccuum validation
LOGGER.warn("Failed to fetch vacuum for table {} info. Going to move ahead with the sync assuming it's safe", fullTableName, e);
}
});
return streamsUnderVacuuming;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
import static io.airbyte.integrations.source.postgres.PostgresUtils.isIncrementalSyncMode;
import static io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.categoriseStreams;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -56,8 +58,12 @@
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler;
import io.airbyte.integrations.source.postgres.internal.models.XminStatus;
import io.airbyte.integrations.source.postgres.xmin.PostgresXminHandler;
import io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.StreamsCategorised;
import io.airbyte.integrations.source.postgres.xmin.XminStateManager;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
Expand All @@ -83,6 +89,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -92,6 +99,7 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -216,7 +224,6 @@ protected Set<String> getExcludedViews() {
protected void logPreSyncDebugData(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog)
throws SQLException {
super.logPreSyncDebugData(database, catalog);
PostgresQueryUtils.logFullVacuumStatus(database, catalog, getQuoteString());
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getStream().getName();
final String schemaName = stream.getStream().getNamespace();
Expand Down Expand Up @@ -452,9 +459,50 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)));

} else if (PostgresUtils.isXmin(sourceConfig) && isIncrementalSyncMode(catalog)) {
final XminStateManager xminStateManager = new XminStateManager(stateManager.getRawStateMessages());
final PostgresXminHandler handler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);
return handler.getIncrementalIterators(catalog, tableNameToTable, emittedAt);
final StreamsCategorised streamsCategorised = categoriseStreams(stateManager, catalog);

final List<AutoCloseableIterator<AirbyteMessage>> ctidIterator = new ArrayList<>();
final List<AutoCloseableIterator<AirbyteMessage>> xminIterator = new ArrayList<>();

if (!streamsCategorised.ctidStreams().streamsForCtidSync().isEmpty()) {
final List<AirbyteStreamNameNamespacePair> streamsUnderVacuum = streamsUnderVacuum(database,
streamsCategorised.ctidStreams().streamsForCtidSync(), getQuoteString());

final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid =
streamsUnderVacuum.isEmpty() ? streamsCategorised.ctidStreams().streamsForCtidSync()
: streamsCategorised.ctidStreams().streamsForCtidSync().stream()
.filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c)))
.toList();
LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size());
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, Long> fileNodes = PostgresQueryUtils.fileNodeForStreams(database,
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());
final CtidStateManager ctidStateManager = new CtidStateManager(streamsCategorised.ctidStreams().statesFromCtidSync(), fileNodes);
final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(sourceConfig, database, new CtidPostgresSourceOperations(), getQuoteString(),
fileNodes, ctidStateManager,
namespacePair -> Jsons.jsonNode(xminStatus),
(namespacePair, jsonState) -> XminStateManager.getAirbyteStateMessage(namespacePair, Jsons.object(jsonState, XminStatus.class)));
ctidIterator.addAll(ctidHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt));
} else {
LOGGER.info("No Streams will be synced via ctid.");
}

if (!streamsCategorised.xminStreams().streamsForXminSync().isEmpty()) {
LOGGER.info("Streams to be synced via xmin : {}", streamsCategorised.xminStreams().streamsForXminSync().size());
final XminStateManager xminStateManager = new XminStateManager(streamsCategorised.xminStreams().statesFromXminSync());
final PostgresXminHandler xminHandler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);

xminIterator.addAll(xminHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(streamsCategorised.xminStreams().streamsForXminSync()), tableNameToTable, emittedAt));
} else {
LOGGER.info("No Streams will be synced via xmin.");
}

return Stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : More of an extension of the discussion we had in the AM

Maybe not for this PR, but this is the issue I was talking about : https://github.com/airbytehq/airbyte/issues/27115 where ideally we want to kick off an incremental xmin sync after the initial successful ctid sync.

One way to do this is to slightly augment what you have here. Even for streams that belong to streamsForCtidSync we can build iterators for the xmin query and merge them so that the xmin iterator runs after the ctid iterator.

When that happens, we'll have to make sure that the handlers can merge streams based on stream name

.of(ctidIterator, xminIterator)
rodireich marked this conversation as resolved.
Show resolved Hide resolved
.flatMap(Collection::stream)
.collect(Collectors.toList());
} else {
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres.ctid;

import io.airbyte.protocol.models.v0.AirbyteMessage;

/**
* ctid of rows is queried as part of our sync and is used to checkpoint to be able to restart
* failed sync from a known last point. Since we never want to emit a ctid it is kept in a different
* field, to save us an expensive JsonNode.remove() operation.
*
* @param recordMessage row fields to emit
* @param ctid ctid
*/
public record AirbyteMessageWithCtid(AirbyteMessage recordMessage, String ctid) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres.ctid;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresSourceOperations;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Objects;

public class CtidPostgresSourceOperations extends PostgresSourceOperations {

private static final String CTID = "ctid";

public RowDataWithCtid recordWithCtid(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
final ResultSetMetaData metadata = queryContext.getMetaData();
final int columnCount = metadata.getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
String ctid = null;
for (int i = 1; i <= columnCount; i++) {
final String columnName = metadata.getColumnName(i);
if (columnName.equalsIgnoreCase(CTID)) {
ctid = queryContext.getString(i);
continue;
}

// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode);
}

assert Objects.nonNull(ctid);
return new RowDataWithCtid(jsonNode, ctid);
}

public record RowDataWithCtid(JsonNode data, String ctid) {
rodireich marked this conversation as resolved.
Show resolved Hide resolved

}

}
Loading
Loading