Skip to content

Commit

Permalink
Destination postgres: upgrade cdk (#35528)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <gisripa@gmail.com>
Co-authored-by: Gireesh Sreepathi <gisripa@gmail.com>
  • Loading branch information
edgao and gisripa committed Mar 5, 2024
1 parent 16c00da commit 8b83f14
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 81 deletions.
@@ -1,9 +1,10 @@
plugins {
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.2'
cdkVersionRequired = '0.23.11'
features = ['db-destinations', 'typing-deduping', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.2
dockerImageTag: 2.0.3
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
@@ -1,9 +1,10 @@
plugins {
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.2'
cdkVersionRequired = '0.23.11'
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.2
dockerImageTag: 2.0.3
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Expand Up @@ -26,8 +26,9 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator;
import java.io.UnsupportedEncodingException;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresState;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -99,12 +100,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {

String encodedDatabase = config.get(JdbcUtils.DATABASE_KEY).asText();
if (encodedDatabase != null) {
try {
encodedDatabase = URLEncoder.encode(encodedDatabase, "UTF-8");
} catch (final UnsupportedEncodingException e) {
// Should never happen
e.printStackTrace();
}
encodedDatabase = URLEncoder.encode(encodedDatabase, StandardCharsets.UTF_8);
}
final String jdbcUrl = String.format("jdbc:postgresql://%s:%s/%s?",
config.get(JdbcUtils.HOST_KEY).asText(),
Expand Down Expand Up @@ -133,8 +129,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
return new PostgresDestinationHandler(databaseName, database);
protected JdbcDestinationHandler<PostgresState> getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
return new PostgresDestinationHandler(databaseName, database, rawTableSchema);
}

@Override
Expand Down
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.postgres.typing_deduping;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
Expand All @@ -12,11 +13,12 @@
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
import org.jooq.SQLDialect;

public class PostgresDestinationHandler extends JdbcDestinationHandler {
public class PostgresDestinationHandler extends JdbcDestinationHandler<PostgresState> {

public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) {
super(databaseName, jdbcDatabase);
public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase, String rawTableSchema) {
super(databaseName, jdbcDatabase, rawTableSchema, SQLDialect.POSTGRES);
}

@Override
Expand All @@ -33,6 +35,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) {
};
}

@Override
protected PostgresState toDestinationState(JsonNode json) {
return new PostgresState(
json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean());
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "varchar";
Expand Down
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.postgres.typing_deduping

import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState

data class PostgresState(val needsSoftReset: Boolean) : MinimumDestinationState {
override fun needsSoftReset(): Boolean {
return needsSoftReset
}

override fun <T : MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T {
return copy(needsSoftReset = needsSoftReset) as T
}
}
Expand Up @@ -16,7 +16,7 @@
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.destination.postgres.PostgresDestination;
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
Expand All @@ -31,7 +31,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class PostgresSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest {
public class PostgresSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest<PostgresState> {

private static PostgresTestDatabase testContainer;
private static String databaseName;
Expand Down Expand Up @@ -75,8 +75,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected DestinationHandler getDestinationHandler() {
return new PostgresDestinationHandler(databaseName, database);
protected DestinationHandler<PostgresState> getDestinationHandler() {
return new PostgresDestinationHandler(databaseName, database, namespace);
}

@Override
Expand All @@ -95,11 +95,11 @@ public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);

List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStates.size());
final DestinationInitialState initialState = initialStates.getFirst();
assertTrue(initialState.isFinalTablePresent());
assertFalse(initialState.isSchemaMismatch());
List<DestinationInitialStatus<PostgresState>> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStatuses.size());
final DestinationInitialStatus<PostgresState> initialStatus = initialStatuses.getFirst();
assertTrue(initialStatus.isFinalTablePresent());
assertFalse(initialStatus.isSchemaMismatch());
}

}
Expand Up @@ -26,7 +26,7 @@ protected PostgreSQLContainer<?> createNewContainer(DockerImageName imageName) {
/**
* Apply the postgresql.conf file that we've packaged as a resource.
*/
public void withConf(PostgreSQLContainer<?> container) {
public static void withConf(PostgreSQLContainer<?> container) {
container
.withCopyFileToContainer(
MountableFile.forClasspathResource("postgresql.conf"),
Expand All @@ -37,21 +37,14 @@ public void withConf(PostgreSQLContainer<?> container) {
/**
* Create a new network and bind it to the container.
*/
public void withNetwork(PostgreSQLContainer<?> container) {
public static void withNetwork(PostgreSQLContainer<?> container) {
container.withNetwork(Network.newNetwork());
}

/**
* Configure postgres with wal_level=logical.
*/
public void withWalLevelLogical(PostgreSQLContainer<?> container) {
container.withCommand("postgres -c wal_level=logical");
}

/**
* Generate SSL certificates and tell postgres to enable SSL and use them.
*/
public void withCert(PostgreSQLContainer<?> container) {
public static void withCert(PostgreSQLContainer<?> container) {
container.start();
String[] commands = {
"psql -U test -c \"CREATE USER postgres WITH PASSWORD 'postgres';\"",
Expand Down Expand Up @@ -97,7 +90,7 @@ public void withCert(PostgreSQLContainer<?> container) {
/**
* Tell postgres to enable SSL.
*/
public void withSSL(PostgreSQLContainer<?> container) {
public static void withSSL(PostgreSQLContainer<?> container) {
container.withCommand("postgres " +
"-c ssl=on " +
"-c ssl_cert_file=/var/lib/postgresql/server.crt " +
Expand All @@ -107,7 +100,7 @@ public void withSSL(PostgreSQLContainer<?> container) {
/**
* Configure postgres with client_encoding=sql_ascii.
*/
public void withASCII(PostgreSQLContainer<?> container) {
public static void withASCII(PostgreSQLContainer<?> container) {
container.withCommand("postgres -c client_encoding=sql_ascii");
}

Expand Down
Expand Up @@ -7,11 +7,13 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.testutils.ContainerFactory.NamedContainerModifier;
import io.airbyte.cdk.testutils.TestDatabase;
import io.airbyte.commons.json.Jsons;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.jooq.SQLDialect;
import org.testcontainers.containers.PostgreSQLContainer;
Expand Down Expand Up @@ -39,27 +41,30 @@ private BaseImage(String reference) {

}

public static enum ContainerModifier {
public enum ContainerModifier implements NamedContainerModifier<PostgreSQLContainer<?>> {

ASCII("withASCII"),
CONF("withConf"),
NETWORK("withNetwork"),
SSL("withSSL"),
WAL_LEVEL_LOGICAL("withWalLevelLogical"),
CERT("withCert"),
ASCII(PostgresContainerFactory::withASCII),
CONF(PostgresContainerFactory::withConf),
NETWORK(PostgresContainerFactory::withNetwork),
SSL(PostgresContainerFactory::withSSL),
CERT(PostgresContainerFactory::withCert),
;

private String methodName;
private Consumer<PostgreSQLContainer<?>> modifer;

private ContainerModifier(String methodName) {
this.methodName = methodName;
private ContainerModifier(final Consumer<PostgreSQLContainer<?>> modifer) {
this.modifer = modifer;
}

@Override
public Consumer<PostgreSQLContainer<?>> modifier() {
return modifer;
}

}

static public PostgresTestDatabase in(BaseImage baseImage, ContainerModifier... modifiers) {
String[] methodNames = Stream.of(modifiers).map(im -> im.methodName).toList().toArray(new String[0]);
final var container = new PostgresContainerFactory().shared(baseImage.reference, methodNames);
final var container = new PostgresContainerFactory().shared(baseImage.reference, modifiers);
return new PostgresTestDatabase(container).initialized();
}

Expand Down

0 comments on commit 8b83f14

Please sign in to comment.