Skip to content

Commit

Permalink
Implement core xmin sync logic (#25280)
Browse files Browse the repository at this point in the history
Core xmin sync logic

Co-authored-by: akashkulk <akashkulk@users.noreply.github.com>
Co-authored-by: subodh <subodh1810@gmail.com>
  • Loading branch information
3 people committed May 30, 2023
1 parent bcc6d12 commit e88eb48
Show file tree
Hide file tree
Showing 19 changed files with 984 additions and 25 deletions.
17 changes: 17 additions & 0 deletions airbyte-integrations/connectors/source-postgres/build.gradle
@@ -1,9 +1,12 @@
import org.jsonschema2pojo.SourceType

plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
id 'airbyte-performance-test-java'
id 'airbyte-connector-acceptance-test'
id "org.jsonschema2pojo" version "1.2.1"
}

application {
Expand Down Expand Up @@ -39,3 +42,17 @@ dependencies {
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
performanceTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}

jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/internal_models")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
removeOldOutput = true

targetPackage = 'io.airbyte.integrations.source.postgres.internal.models'

useLongIntegers = true
generateBuilders = true
includeConstructors = false
includeSetters = true
}
Expand Up @@ -23,11 +23,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PostgresCdcCatalogHelper {
public final class PostgresCatalogHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcCatalogHelper.class);
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCatalogHelper.class);

private PostgresCdcCatalogHelper() {}
private PostgresCatalogHelper() {}

/*
* It isn't possible to recreate the state of the original database unless we include extra
Expand Down
Expand Up @@ -8,6 +8,7 @@
import com.google.common.base.Preconditions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.postgres.internal.models.XminStatus;
import java.sql.SQLException;
import java.util.List;
import org.slf4j.Logger;
Expand Down Expand Up @@ -68,14 +69,16 @@ public class PostgresQueryUtils {
* value of the xmin snapshot (which is a combination of 1 and 2). If no wraparound has occurred,
* this should be the same as 2.
*/
public static void logXminStatus(final JdbcDatabase database) throws SQLException {
public static XminStatus getXminStatus(final JdbcDatabase database) throws SQLException {
LOGGER.debug("xmin status query: {}", XMIN_STATUS_QUERY);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(XMIN_STATUS_QUERY).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
final JsonNode result = jsonNodes.get(0);
LOGGER.info(String.format("Xmin Status : {Number of wraparounds: %s, Xmin Transaction Value: %s, Xmin Raw Value: %s",
result.get(NUM_WRAPAROUND_COL), result.get(XMIN_XID_VALUE_COL), result.get(XMIN_RAW_VALUE_COL)));
return new XminStatus()
.withNumWraparound(result.get(NUM_WRAPAROUND_COL).asLong())
.withXminXidValue(result.get(XMIN_XID_VALUE_COL).asLong())
.withXminRawValue(result.get(XMIN_RAW_VALUE_COL).asLong());
}

}
Expand Up @@ -20,6 +20,7 @@
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL;
import static io.airbyte.integrations.source.postgres.PostgresUtils.isIncrementalSyncMode;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
Expand Down Expand Up @@ -56,6 +57,9 @@
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.postgres.internal.models.XminStatus;
import io.airbyte.integrations.source.postgres.xmin.PostgresXminHandler;
import io.airbyte.integrations.source.postgres.xmin.XminStateManager;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
Expand Down Expand Up @@ -114,6 +118,7 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");
private int stateEmissionFrequency;
private XminStatus xminStatus;

public static Source sshWrappedSource() {
return new SshWrappedSource(new PostgresSource(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY, "security");
Expand Down Expand Up @@ -228,7 +233,12 @@ protected void logPreSyncDebugData(final JdbcDatabase database, final Configured
}
indexInfo.close();
}
PostgresQueryUtils.logXminStatus(database);

// Log and save the xmin status
this.xminStatus = PostgresQueryUtils.getXminStatus(database);
LOGGER.info(String.format("Xmin Status : {Number of wraparounds: %s, Xmin Transaction Value: %s, Xmin Raw Value: %s",
xminStatus.getNumWraparound(), xminStatus.getXminXidValue(), xminStatus.getXminRawValue()));

}

@Override
Expand All @@ -238,13 +248,20 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {

if (PostgresUtils.isCdc(config)) {
final List<AirbyteStream> streams = catalog.getStreams().stream()
.map(PostgresCdcCatalogHelper::overrideSyncModes)
.map(PostgresCdcCatalogHelper::removeIncrementalWithoutPk)
.map(PostgresCdcCatalogHelper::setIncrementalToSourceDefined)
.map(PostgresCdcCatalogHelper::addCdcMetadataColumns)
.map(PostgresCatalogHelper::overrideSyncModes)
.map(PostgresCatalogHelper::removeIncrementalWithoutPk)
.map(PostgresCatalogHelper::setIncrementalToSourceDefined)
.map(PostgresCatalogHelper::addCdcMetadataColumns)
// If we're in CDC mode and a stream is not in the publication, the user should only be able to sync
// this in FULL_REFRESH mode
.map(stream -> PostgresCdcCatalogHelper.setFullRefreshForNonPublicationStreams(stream, publicizedTablesInCdc))
.map(stream -> PostgresCatalogHelper.setFullRefreshForNonPublicationStreams(stream, publicizedTablesInCdc))
.collect(toList());

catalog.setStreams(streams);
} else if (PostgresUtils.isXmin(config)) {
// Xmin replication has a source-defined cursor (the xmin column). This is done to prevent the user from being able to pick their own cursor.
final List<AirbyteStream> streams = catalog.getStreams().stream()
.map(PostgresCatalogHelper::setIncrementalToSourceDefined)
.collect(toList());

catalog.setStreams(streams);
Expand All @@ -256,7 +273,7 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
@Override
public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
final JdbcDatabase database = super.createDatabase(config);
this.publicizedTablesInCdc = PostgresCdcCatalogHelper.getPublicizedTables(database);
this.publicizedTablesInCdc = PostgresCatalogHelper.getPublicizedTables(database);
return database;
}

Expand Down Expand Up @@ -432,6 +449,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
AutoCloseableIterators.concatWithEagerClose(AirbyteTraceMessageUtility::emitStreamStatusTrace, snapshotIterator,
AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)));

} else if (PostgresUtils.isXmin(sourceConfig) && isIncrementalSyncMode(catalog)) {
final XminStateManager xminStateManager = new XminStateManager(stateManager.getRawStateMessages());
final PostgresXminHandler handler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);
return handler.getIncrementalIterators(catalog, tableNameToTable, emittedAt);
} else {
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
}
Expand Down
Expand Up @@ -23,6 +23,9 @@
import static io.airbyte.integrations.source.postgres.PostgresType.VARCHAR;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.time.Duration;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -155,4 +158,15 @@ public static Duration getFirstRecordWaitTime(final JsonNode config) {
return firstRecordWaitTime;
}

public static boolean isXmin(final JsonNode config) {
final boolean isXmin = config.hasNonNull("replication_method")
&& config.get("replication_method").get("method").asText().equals("Xmin");
LOGGER.info("using Xmin: {}", isXmin);
return isXmin;
}

public static boolean isIncrementalSyncMode(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode)
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
}
}
@@ -0,0 +1,201 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres.xmin;

import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.postgres.PostgresType;
import io.airbyte.integrations.source.postgres.internal.models.XminStatus;
import io.airbyte.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresXminHandler {

private final JdbcCompatibleSourceOperations sourceOperations;
private final JdbcDatabase database;
private final String quoteString;
private final XminStatus xminStatus;
private final XminStateManager xminStateManager;

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresXminHandler.class);

public PostgresXminHandler(final JdbcDatabase database,
final JdbcCompatibleSourceOperations sourceOperations,
final String quoteString,
final XminStatus xminStatus,
final XminStateManager xminStateManager) {
this.database = database;
this.sourceOperations = sourceOperations;
this.quoteString = quoteString;
this.xminStatus = xminStatus;
this.xminStateManager = xminStateManager;
}

public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<PostgresType>>> tableNameToTable,
final Instant emittedAt) {

final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();
/*
*/
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String streamName = airbyteStream.getStream().getName();
final String namespace = airbyteStream.getStream().getNamespace();
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName,
namespace);

// Skip syncing the stream if it doesn't exist in the source.
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!tableNameToTable.containsKey(fullyQualifiedTableName)) {
LOGGER.info("Skipping stream {} because it is not in the source", fullyQualifiedTableName);
continue;
}

if (airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL)) {
// Grab the selected fields to sync
final TableInfo<CommonField<PostgresType>> table = tableNameToTable
.get(fullyQualifiedTableName);
final List<String> selectedDatabaseFields = table.getFields()
.stream()
.map(CommonField::getName)
.filter(CatalogHelpers.getTopLevelFieldNames(airbyteStream)::contains)
.collect(Collectors.toList());

final AutoCloseableIterator<JsonNode> queryStream = queryTableXmin(selectedDatabaseFields, table.getNameSpace(), table.getName());
final AutoCloseableIterator<AirbyteMessage> recordIterator =
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, pair);

iteratorList.add(augmentWithLogs(recordAndMessageIterator, pair, streamName));
}
}

return iteratorList;
}

private AutoCloseableIterator<JsonNode> queryTableXmin(
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);
final AirbyteStreamNameNamespacePair airbyteStream =
AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<JsonNode> stream = database.unsafeQuery(
connection -> createXminQueryStatement(connection, columnNames, schemaName, tableName, airbyteStream),
sourceOperations::rowToJson);
return AutoCloseableIterators.fromStream(stream, airbyteStream);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}, airbyteStream);
}

private PreparedStatement createXminQueryStatement(
final Connection connection,
final List<String> columnNames,
final String schemaName,
final String tableName,
final AirbyteStreamNameNamespacePair airbyteStream) {
try {
LOGGER.info("Preparing query for table: {}", tableName);
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName,
quoteString);

final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString);
// The xmin state that we save represents the lowest XID that is still in progress. To make sure we don't miss
// data associated with the current transaction, we have to issue an >=
final String sql = String.format("SELECT %s FROM %s WHERE xmin::text::bigint >= ?",
wrappedColumnNames, fullTableName);

final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());

final XminStatus currentStreamXminStatus = xminStateManager.getXminStatus(airbyteStream);
if (currentStreamXminStatus != null) {
preparedStatement.setLong(1, currentStreamXminStatus.getXminXidValue());
} else {
preparedStatement.setLong(1, 0L);
}
LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement);
return preparedStatement;
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}

// Transforms the given iterator to create an {@link AirbyteRecordMessage}
private static AutoCloseableIterator<AirbyteMessage> getRecordIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(namespace)
.withEmittedAt(emittedAt)
.withData(r)));
}

// Augments the given iterator with record count logs.
private AutoCloseableIterator<AirbyteMessage> augmentWithLogs(final AutoCloseableIterator<AirbyteMessage> iterator,
final AirbyteStreamNameNamespacePair pair, final String streamName) {
final AtomicLong recordCount = new AtomicLong();
return AutoCloseableIterators.transform(iterator,
AirbyteStreamUtils.convertFromNameAndNamespace(pair.getName(), pair.getNamespace()),
r -> {
final long count = recordCount.incrementAndGet();
if (count % 10000 == 0) {
LOGGER.info("Reading stream {}. Records read: {}", streamName, count);
}
return r;
});
}

private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseableIterator<AirbyteMessage> recordIterator,
final AirbyteStreamNameNamespacePair pair) {
return AutoCloseableIterators.transform(
autoCloseableIterator -> new XminStateIterator(
autoCloseableIterator,
pair,
xminStatus),
recordIterator,
AirbyteStreamUtils.convertFromNameAndNamespace(pair.getName(), pair.getNamespace()));
}
}

0 comments on commit e88eb48

Please sign in to comment.