Skip to content

Commit

Permalink
Remove incremental estimate query from Postgres (#26810)
Browse files Browse the repository at this point in the history
* Remove incremental estimate query from Postgres

* Bump dockerfile + documentation
  • Loading branch information
akashkulk committed May 31, 2023
1 parent 7089869 commit a8ef771
Show file tree
Hide file tree
Showing 7 changed files with 5 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.31
LABEL io.airbyte.version=2.0.32
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
maxSecondsBetweenMessages: 7200
dockerImageTag: 2.0.31
dockerImageTag: 2.0.32
dockerRepository: airbyte/source-postgres-strict-encrypt
githubIssueLabel: source-postgres
icon: postgresql.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.31
LABEL io.airbyte.version=2.0.32
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 2.0.31
dockerImageTag: 2.0.32
maxSecondsBetweenMessages: 7200
dockerRepository: airbyte/source-postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL;
import static io.airbyte.integrations.source.postgres.PostgresUtils.isIncrementalSyncMode;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
Expand Down Expand Up @@ -60,7 +59,6 @@
import io.airbyte.integrations.source.postgres.internal.models.XminStatus;
import io.airbyte.integrations.source.postgres.xmin.PostgresXminHandler;
import io.airbyte.integrations.source.postgres.xmin.XminStateManager;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.integrations.util.HostPortResolver;
Expand Down Expand Up @@ -632,47 +630,6 @@ protected void estimateFullRefreshSyncSize(final JdbcDatabase database,
}
}

@Override
protected void estimateIncrementalSyncSize(final JdbcDatabase database,
final ConfiguredAirbyteStream configuredAirbyteStream,
final CursorInfo cursorInfo,
final PostgresType cursorFieldType) {
try {
final String schemaName = configuredAirbyteStream.getStream().getNamespace();
final String tableName = configuredAirbyteStream.getStream().getName();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName, schemaName, tableName);

final long tableRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong();
final long tableByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong();

// The fast count query can return negative or otherwise invalid results for small tables. In this
// case, we can skip emitting an
// estimate trace altogether since the sync will likely complete quickly.
if (tableRowCount <= 0) {
return;
}

final long syncRowCount;
final long syncByteCount;

syncRowCount = getIncrementalTableRowCount(database, fullTableName, cursorInfo, cursorFieldType);
syncByteCount = (tableByteCount / tableRowCount) * syncRowCount;

// Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this
// is to
// read a row and Stringify it to better understand the accurate volume of data sent over the wire.
// However, this approach doesn't account for different row sizes
AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName);
LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}",
fullTableName, syncRowCount, syncByteCount, tableRowCount, tableRowCount));
} catch (final SQLException e) {
LOGGER.warn("Error occurred while attempting to estimate sync size", e);
}
}

private List<JsonNode> getFullTableEstimate(final JdbcDatabase database,
final String fullTableName,
final String schemaName,
Expand All @@ -687,46 +644,4 @@ private List<JsonNode> getFullTableEstimate(final JdbcDatabase database,
Preconditions.checkState(jsonNodes.size() == 1);
return jsonNodes;
}

private long getIncrementalTableRowCount(final JdbcDatabase database,
final String fullTableName,
final CursorInfo cursorInfo,
final PostgresType cursorFieldType)
throws SQLException {
final String quotedCursorField = getIdentifierWithQuoting(cursorInfo.getCursorField(), getQuoteString());

// Calculate actual number of rows to sync here.
final List<JsonNode> result = database.queryJsons(
connection -> {
LOGGER.info("Preparing query for table: {}", fullTableName);
final String operator;
if (cursorInfo.getCursorRecordCount() <= 0L) {
operator = ">";
} else {
final long actualRecordCount = getActualCursorRecordCount(
connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor());
LOGGER.info("Table {} cursor count: expected {}, actual {}", fullTableName, cursorInfo.getCursorRecordCount(), actualRecordCount);
if (actualRecordCount == cursorInfo.getCursorRecordCount()) {
operator = ">";
} else {
operator = ">=";
}
}

final StringBuilder sql = new StringBuilder(String.format("SELECT COUNT(*) FROM %s WHERE %s %s ?",
fullTableName,
quotedCursorField,
operator));

final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());
LOGGER.info("Executing query for table {}: {}", fullTableName, preparedStatement);
sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor());
return preparedStatement;
},
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));

Preconditions.checkState(result.size() == 1);
return result.get(0).get("count").asLong();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -250,19 +250,6 @@ protected void estimateFullRefreshSyncSize(final Database database,
/* no-op */
}

/**
* Estimates the total volume (rows and bytes) to sync and emits a
* {@link AirbyteEstimateTraceMessage} associated with an incremental stream.
*
* @param database database
*/
protected void estimateIncrementalSyncSize(final Database database,
final ConfiguredAirbyteStream configuredAirbyteStream,
final CursorInfo cursorInfo,
final DataType dataType) {
/* no-op */
}

private List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(
final Database database)
throws Exception {
Expand Down Expand Up @@ -466,7 +453,6 @@ private AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Databas
table.getFields().stream().anyMatch(f -> f.getName().equals(cursorField)),
String.format("Could not find cursor field %s in table %s", cursorField, table.getName()));

estimateIncrementalSyncSize(database, airbyteStream, cursorInfo, cursorType);
final AutoCloseableIterator<JsonNode> queryIterator = queryTableIncremental(
database,
selectedDatabaseFields,
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ Some larger tables may encounter an error related to the temporary file size lim

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.32 | 2023-05-31 | [26810](https://github.com/airbytehq/airbyte/pull/26810) | Remove incremental sync estimate from Postgres to increase performance. |
| 2.0.31 | 2023-05-25 | [26633](https://github.com/airbytehq/airbyte/pull/26633) | Collect and log information related to full vacuum operation in db |
| 2.0.30 | 2023-05-25 | [26473](https://github.com/airbytehq/airbyte/pull/26473) | CDC : Limit queue size |
| 2.0.29 | 2023-05-18 | [25898](https://github.com/airbytehq/airbyte/pull/25898) | Translate Numeric values without decimal, e.g: NUMERIC(38,0), as BigInt instead of Double |
Expand Down

0 comments on commit a8ef771

Please sign in to comment.