Skip to content

Commit

Permalink
Stop sync on a null value in a cursor column (#19889)
Browse files Browse the repository at this point in the history
* Stop sync on a null value in a cursor column

* Fix quoting in query to solve a failing test

* Fix another escaping issue in query

* Fix failing test

* Fix failing tests

* Test view with null value cursor

* Improve error message

* bump dockerfile version and update note

* bump version to 1.0.31

* Fix failing test

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
rodireich and octavia-squidington-iii committed Dec 6, 2022
1 parent 293075e commit b5c08ce
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.30
dockerImageTag: 1.0.31
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11351,7 +11351,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.30"
- dockerImage: "airbyte/source-postgres:1.0.31"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -11600,7 +11600,7 @@
enum:
- "pgoutput"
- "wal2json"
const: "pgoutput"
default: "pgoutput"
order: 2
replication_slot:
type: "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,5 +621,4 @@ protected List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final Configur
.map(Jsons::clone)
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.30

LABEL io.airbyte.version=1.0.31
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
3 changes: 1 addition & 2 deletions airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.30

LABEL io.airbyte.version=1.0.31
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand Down Expand Up @@ -70,6 +71,7 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -90,6 +92,20 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
public static final String NULL_CURSOR_VALUE_WITH_SCHEMA =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from %s.\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";
public static final String NULL_CURSOR_VALUE_NO_SCHEMA =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";
private List<String> schemas;
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");
Expand Down Expand Up @@ -468,7 +484,7 @@ public static void main(final String[] args) throws Exception {
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
if (PostgresUtils.isCdc(config)) {
if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)) {
String sslModeValue = config.get(SSL_MODE).get(MODE).asText();
final String sslModeValue = config.get(SSL_MODE).get(MODE).asText();
if (INVALID_CDC_SSL_MODES.contains(sslModeValue)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
Expand Down Expand Up @@ -500,4 +516,24 @@ protected static String toSslJdbcParamInternal(final SslMode sslMode) {
return result;
}

@Override
protected boolean verifyCursorColumnValues(final JdbcDatabase database, final String schema, final String tableName, final String columnName) throws SQLException {
final String query;
final String resultColName = "nullValue";
// Query: Only if cursor column allows null values, query whether it contains one
if (StringUtils.isNotBlank(schema)) {
query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA,
schema, tableName, columnName, schema, tableName, columnName, resultColName);
} else {
query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA,
tableName, columnName, tableName, columnName, resultColName);
}
LOGGER.debug("null value query: {}", query);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(query),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
final boolean nullValExist = jsonNodes.get(0).get(resultColName.toLowerCase()).booleanValue(); // For some reason value in node is lowercase
LOGGER.debug("null value exist: {}", nullValExist);
return !nullValExist;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@
@ExtendWith(SystemStubsExtension.class)
public abstract class AbstractPostgresSourceSSLCertificateAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "testview";
private static final String SCHEMA_NAME = "public";
@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";

private PostgreSQLContainer<?> container;
private JsonNode config;
protected static final String PASSWORD = "Passw0rd";
Expand Down Expand Up @@ -127,28 +126,31 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME_MATERIALIZED_VIEW,
STREAM_NAME_MATERIALIZED_VIEW, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
@ExtendWith(SystemStubsExtension.class)
public abstract class AbstractSshPostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String SCHEMA_NAME = "public";
@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final Network network = Network.newNetwork();
private static JsonNode config;
private final SshBastionContainer bastion = new SshBastionContainer();
Expand Down Expand Up @@ -128,19 +128,21 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@

@ExtendWith(SystemStubsExtension.class)
public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "testview";
private static final String SCHEMA_NAME = "public";
@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";
public static final String LIMIT_PERMISSION_SCHEMA = "limit_perm_schema";
public static final String LIMIT_PERMISSION_ROLE = "limit_perm_role";
public static final String LIMIT_PERMISSION_ROLE_PASSWORD = "test";
Expand All @@ -66,9 +65,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc

container = new PostgreSQLContainer<>("postgres:13-alpine");
container.start();
String username = container.getUsername();
String password = container.getPassword();
List<String> schemas = List.of("public");
final String username = container.getUsername();
final String password = container.getPassword();
final List<String> schemas = List.of("public");
config = getConfig(username, password, schemas);
try (final DSLContext dslContext = DSLContextFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
Expand All @@ -93,7 +92,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
}
}

private JsonNode getConfig(String username, String password, List<String> schemas) {
private JsonNode getConfig(final String username, final String password, final List<String> schemas) {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "Standard")
.build());
Expand Down Expand Up @@ -170,19 +169,19 @@ public void testDiscoverWithRevokingSchemaPermissions() throws Exception {
config = getConfig(LIMIT_PERMISSION_ROLE, LIMIT_PERMISSION_ROLE_PASSWORD, List.of(LIMIT_PERMISSION_SCHEMA));

runDiscover();
AirbyteCatalog lastPersistedCatalogSecond = getLastPersistedCatalog();
final AirbyteCatalog lastPersistedCatalogSecond = getLastPersistedCatalog();
final String assertionMessageWithoutPermission = "Expected no streams after discover for user without schema permissions";
assertTrue(lastPersistedCatalogSecond.getStreams().isEmpty(), assertionMessageWithoutPermission);
}

private void revokeSchemaPermissions(Database database) throws SQLException {
private void revokeSchemaPermissions(final Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch(String.format("REVOKE USAGE ON schema %s FROM %s;", LIMIT_PERMISSION_SCHEMA, LIMIT_PERMISSION_ROLE));
return null;
});
}

private void prepareEnvForUserWithoutPermissions(Database database) throws SQLException {
private void prepareEnvForUserWithoutPermissions(final Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch(String.format("CREATE ROLE %s WITH LOGIN PASSWORD '%s';", LIMIT_PERMISSION_ROLE, LIMIT_PERMISSION_ROLE_PASSWORD));
ctx.fetch(String.format("CREATE SCHEMA %s;", LIMIT_PERMISSION_SCHEMA));
Expand All @@ -202,28 +201,31 @@ private ConfiguredAirbyteCatalog getCommonConfigCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME_MATERIALIZED_VIEW,
STREAM_NAME_MATERIALIZED_VIEW, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

private ConfiguredAirbyteCatalog getLimitPermissionConfiguredCatalog() {
Expand All @@ -233,7 +235,7 @@ private ConfiguredAirbyteCatalog getLimitPermissionConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
LIMIT_PERMISSION_SCHEMA + "." + "id_and_name",
"id_and_name", LIMIT_PERMISSION_SCHEMA,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.util.HashMap;
import java.util.List;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -45,12 +46,11 @@
@ExtendWith(SystemStubsExtension.class)
public class PostgresSourceStrictEncryptAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String SCHEMA_NAME = "public";
@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";

private PostgreSQLContainer<?> container;
private JsonNode config;

Expand Down Expand Up @@ -133,19 +133,21 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Loading

0 comments on commit b5c08ce

Please sign in to comment.