Skip to content

Commit

Permalink
Revert "source-postgres: connect with adaptiveFetch=true" (#38365)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed May 20, 2024
1 parent 471c8b3 commit 0db33b8
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.6
dockerImageTag: 3.4.7
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@
import static io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.reclassifyCategorisedCtidStreams;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.postgresql.PGProperty.ADAPTIVE_FETCH;
import static org.postgresql.PGProperty.CURRENT_SCHEMA;
import static org.postgresql.PGProperty.DEFAULT_ROW_FETCH_SIZE;
import static org.postgresql.PGProperty.MAX_RESULT_BUFFER;
import static org.postgresql.PGProperty.PREPARE_THRESHOLD;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -57,6 +52,7 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.cdk.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.Source;
Expand Down Expand Up @@ -126,7 +122,6 @@
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import org.postgresql.PGProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -151,14 +146,6 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
public static final String SSL_MODE_DISABLE = "disable";
public static final String SSL_MODE_REQUIRE = "require";

public static final Map<PGProperty, String> JDBC_CONNECTION_PARAMS = ImmutableMap.of(
// Initialize parameters with prepareThreshold=0 to mitigate pgbouncer errors
// https://github.com/airbytehq/airbyte/issues/24796
PREPARE_THRESHOLD, "0",
DEFAULT_ROW_FETCH_SIZE, "1",
ADAPTIVE_FETCH, "true",
MAX_RESULT_BUFFER, "10percent");

private List<String> schemas;

private Set<AirbyteStreamNameNamespacePair> publicizedTablesInCdc;
Expand All @@ -170,7 +157,7 @@ public static Source sshWrappedSource(PostgresSource source) {
}

PostgresSource() {
super(DRIVER_CLASS, PostgresStreamingQueryConfig::new, new PostgresSourceOperations());
super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new PostgresSourceOperations());
this.stateEmissionFrequency = INTERMEDIATE_STATE_EMISSION_FREQUENCY;
}

Expand All @@ -189,9 +176,9 @@ public ConnectorSpecification spec() throws Exception {
@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final List<String> additionalParameters = new ArrayList<>();
for (var e : JDBC_CONNECTION_PARAMS.entrySet()) {
additionalParameters.add(e.getKey().getName() + EQUALS + e.getValue());
}
// Initialize parameters with prepareThreshold=0 to mitigate pgbouncer errors
// https://github.com/airbytehq/airbyte/issues/24796
additionalParameters.add("prepareThreshold=0");

final String encodedDatabaseName = URLEncoder.encode(config.get(JdbcUtils.DATABASE_KEY).asText(), StandardCharsets.UTF_8);

Expand All @@ -201,7 +188,7 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
encodedDatabaseName));

if (config.get(JdbcUtils.JDBC_URL_PARAMS_KEY) != null && !config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText().isEmpty()) {
additionalParameters.add(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
jdbcUrl.append(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText()).append(AMPERSAND);
}

final Map<String, String> sslParameters = parseSSLConfig(config);
Expand All @@ -219,10 +206,12 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
}

if (schemas != null && !schemas.isEmpty()) {
additionalParameters.add(CURRENT_SCHEMA.getName() + EQUALS + String.join(",", schemas));
additionalParameters.add("currentSchema=" + String.join(",", schemas));
}
additionalParameters.addAll(toJDBCQueryParams(sslParameters));
jdbcUrl.append(String.join(AMPERSAND, additionalParameters));

additionalParameters.forEach(x -> jdbcUrl.append(x).append("&"));

jdbcUrl.append(toJDBCQueryParams(sslParameters));
LOGGER.debug("jdbc url: {}", jdbcUrl);
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
Expand All @@ -236,9 +225,8 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
return Jsons.jsonNode(configBuilder.build());
}

public List<String> toJDBCQueryParams(final Map<String, String> sslParams) {
return Objects.isNull(sslParams)
? List.of()
public String toJDBCQueryParams(final Map<String, String> sslParams) {
return Objects.isNull(sslParams) ? ""
: sslParams.entrySet()
.stream()
.map((entry) -> {
Expand All @@ -255,7 +243,7 @@ public List<String> toJDBCQueryParams(final Map<String, String> sslParams) {
}
})
.filter(s -> Objects.nonNull(s) && !s.isEmpty())
.toList();
.collect(Collectors.joining(JdbcUtils.AMPERSAND));
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -815,21 +815,10 @@ private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database
@Test
void testJdbcUrlWithEscapedDatabaseName() {
final JsonNode jdbcConfig = source().toDatabaseConfig(buildConfigEscapingNeeded());
assertEquals("jdbc:postgresql://localhost:1111/db%2Ffoo?" + EXPECTED_DEFAULT_PARAMS,
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText());
assertEquals(EXPECTED_JDBC_ESCAPED_URL, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText());
}

@Test
void testJdbcUrlWithSchemas() {
final JsonNode sourceConfig = buildConfigEscapingNeeded();
((ObjectNode) sourceConfig).set("schemas", Jsons.arrayNode().add("bar").add("baz"));
final JsonNode jdbcConfig = source().toDatabaseConfig(sourceConfig);
assertEquals("jdbc:postgresql://localhost:1111/db%2Ffoo?" + EXPECTED_DEFAULT_PARAMS + "&currentSchema=bar,baz",
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText());
}

private static final String EXPECTED_DEFAULT_PARAMS =
"prepareThreshold=0&defaultRowFetchSize=1&adaptiveFetch=true&maxResultBuffer=10percent";
private static final String EXPECTED_JDBC_ESCAPED_URL = "jdbc:postgresql://localhost:1111/db%2Ffoo?prepareThreshold=0&";

private JsonNode buildConfigEscapingNeeded() {
return Jsons.jsonNode(ImmutableMap.of(
Expand Down

0 comments on commit 0db33b8

Please sign in to comment.