Skip to content

Commit

Permalink
Destination Postgres: CDK T+D initial state gathering (#35385)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <gisripa@gmail.com>
  • Loading branch information
gisripa committed Feb 23, 2024
1 parent 5d99614 commit a13bd80
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 75 deletions.
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.2 | 2024-02-22 | [\#35385](https://github.com/airbytehq/airbyte/pull/35342) | Bugfix: inverted logic of disableTypeDedupe flag |
| 0.23.1 | 2024-02-22 | [\#35527](https://github.com/airbytehq/airbyte/pull/35527) | reduce shutdown timeouts |
| 0.23.0 | 2024-02-22 | [\#35342](https://github.com/airbytehq/airbyte/pull/35342) | Consolidate and perform upfront gathering of DB metadata state |
| 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Reduce CDC state compression limit to 1MB |
| 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.23.0
version=0.23.2
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database);
final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.4'
cdkVersionRequired = '0.23.2'
features = ['db-destinations', 'typing-deduping', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.0
dockerImageTag: 2.0.1
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.4'
cdkVersionRequired = '0.23.2'
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.0
dockerImageTag: 2.0.1
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
Expand Down Expand Up @@ -127,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() {
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
return new PostgresDestinationHandler(databaseName, database);
}

@Override
public boolean isV2Destination() {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.postgres.typing_deduping;

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;

public class PostgresDestinationHandler extends JdbcDestinationHandler {

public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) {
super(databaseName, jdbcDatabase);
}

@Override
protected String toJdbcTypeName(AirbyteType airbyteType) {
// This is mostly identical to the postgres implementation, but swaps jsonb to super
if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) {
return toJdbcTypeName(airbyteProtocolType);
}
return switch (airbyteType.getTypeName()) {
case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "jsonb";
// No nested Unions supported so this will definitely not result in infinite recursion.
case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType());
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType);
};
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "varchar";
case NUMBER -> "numeric";
case INTEGER -> "int8";
case BOOLEAN -> "bool";
case TIMESTAMP_WITH_TIMEZONE -> "timestamptz";
case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp";
case TIME_WITH_TIMEZONE -> "timetz";
case TIME_WITHOUT_TIMEZONE -> "time";
case DATE -> "date";
case UNKNOWN -> "jsonb";
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import static org.jooq.impl.DSL.rowNumber;
import static org.jooq.impl.DSL.val;

import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
Expand All @@ -37,7 +34,6 @@
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -54,13 +50,6 @@ public class PostgresSqlGenerator extends JdbcSqlGenerator {

public static final DataType<?> JSONB_TYPE = new DefaultDataType<>(null, Object.class, "jsonb");

private static final Map<String, String> POSTGRES_TYPE_NAME_TO_JDBC_TYPE = ImmutableMap.of(
"numeric", "decimal",
"int8", "bigint",
"bool", "boolean",
"timestamptz", "timestamp with time zone",
"timetz", "time with time zone");

public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer) {
super(namingTransformer);
}
Expand Down Expand Up @@ -309,29 +298,6 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
.orderBy(orderedFields).as(ROW_NUMBER_COLUMN_NAME);
}

@Override
public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
// Check that the columns match, with special handling for the metadata columns.
// This is mostly identical to the redshift implementation, but swaps super to jsonb
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()),
LinkedHashMap::putAll);
final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream()
.filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream()
.noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey())))
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey(), jdbcTypeNameFromPostgresTypeName(column.getValue().type())),
LinkedHashMap::putAll);

final boolean sameColumns = actualColumns.equals(intendedColumns)
&& "varchar".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type())
&& "timestamptz".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type())
&& "jsonb".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META).type());

return sameColumns;
}

/**
* Extract a raw field, leaving it as jsonb
*/
Expand All @@ -343,8 +309,4 @@ private Field<String> jsonTypeof(final Field<?> field) {
return function("JSONB_TYPEOF", SQLDataType.VARCHAR, field);
}

private static String jdbcTypeNameFromPostgresTypeName(final String redshiftType) {
return POSTGRES_TYPE_NAME_TO_JDBC_TYPE.getOrDefault(redshiftType, redshiftType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,23 @@
package io.airbyte.integrations.destination.postgres.typing_deduping;

import static io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.JSONB_TYPE;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.destination.postgres.PostgresDestination;
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase;
import java.util.Optional;
import java.util.List;
import javax.sql.DataSource;
import org.jooq.DataType;
import org.jooq.Field;
Expand Down Expand Up @@ -76,8 +75,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected DestinationHandler<TableDefinition> getDestinationHandler() {
return new JdbcDestinationHandler(databaseName, database);
protected DestinationHandler getDestinationHandler() {
return new PostgresDestinationHandler(databaseName, database);
}

@Override
Expand All @@ -96,29 +95,11 @@ public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);

final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id());

assertTrue(existingTable.isPresent());
assertAll(
() -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("_airbyte_meta").type()),
() -> assertEquals("int8", existingTable.get().columns().get("id1").type()),
() -> assertEquals("int8", existingTable.get().columns().get("id2").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("struct").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("array").type()),
() -> assertEquals("varchar", existingTable.get().columns().get("string").type()),
() -> assertEquals("numeric", existingTable.get().columns().get("number").type()),
() -> assertEquals("int8", existingTable.get().columns().get("integer").type()),
() -> assertEquals("bool", existingTable.get().columns().get("boolean").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()),
() -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()),
() -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()),
() -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()),
() -> assertEquals("date", existingTable.get().columns().get("date").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("unknown").type()));
// TODO assert on table indexing, etc.
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStates.size());
final DestinationInitialState initialState = initialStates.getFirst();
assertTrue(initialState.isFinalTablePresent());
assertFalse(initialState.isSchemaMismatch());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private String generateBigString() {
}

@Override
protected SqlGenerator<?> getSqlGenerator() {
protected SqlGenerator getSqlGenerator() {
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
}

Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ Now that you have set up the Postgres destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
| 2.0.1 | 2024-02-22 | [35385](https://github.com/airbytehq/airbyte/pull/35385) | Upgrade CDK to 0.23.0; Gathering required initial state upfront |
| 2.0.0 | 2024-02-09 | [35042](https://github.com/airbytehq/airbyte/pull/35042) | GA release V2 destinations format. |
| 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults |
| 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib |
Expand Down Expand Up @@ -220,4 +221,4 @@ Now that you have set up the Postgres destination connector, check out the follo
| 0.3.13 | 2021-12-01 | [\#8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key |
| 0.3.12 | 2021-11-08 | [\#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |
| 0.3.11 | 2021-09-07 | [\#5743](https://github.com/airbytehq/airbyte/pull/5743) | Add SSH Tunnel support |
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |

0 comments on commit a13bd80

Please sign in to comment.