Skip to content

Commit

Permalink
DV2 destinations: Build DestinationState / Migration framework (#35303)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <gisripa@gmail.com>
Co-authored-by: Gireesh Sreepathi <gisripa@gmail.com>
  • Loading branch information
edgao and gisripa committed Mar 1, 2024
1 parent 7277fc5 commit 4efc065
Show file tree
Hide file tree
Showing 24 changed files with 908 additions and 222 deletions.
5 changes: 3 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.9 | 2024-03-01 | [\#35720](https://github.com/airbytehq/airbyte/pull/35720) | various improvements for tests TestDataHolder |
| 0.23.10 | 2024-03-01 | [\#35303](https://github.com/airbytehq/airbyte/pull/35303) | various improvements for tests TestDataHolder |
| 0.23.9 | 2024-03-01 | [\#35720](https://github.com/airbytehq/airbyte/pull/35720) | various improvements for tests TestDataHolder |
| 0.23.8 | 2024-02-28 | [\#35529](https://github.com/airbytehq/airbyte/pull/35529) | Refactor on state iterators |
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Extract typereduper migrations to separte method |
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Extract typereduper migrations to separte method |
| 0.23.6 | 2024-02-26 | [\#35647](https://github.com/airbytehq/airbyte/pull/35647) | Add a getNamespace into TestDataHolder |
| 0.23.5 | 2024-02-26 | [\#35512](https://github.com/airbytehq/airbyte/pull/35512) | Remove @DisplayName from all CDK tests. |
| 0.23.4 | 2024-02-26 | [\#35507](https://github.com/airbytehq/airbyte/pull/35507) | Add more logs into TestDatabase. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.23.9
version=0.23.10
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.integrations.destination.jdbc;

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.cdk.integrations.util.ConfiguredCatalogUtilKt.addDefaultNamespaceToStreams;

Expand All @@ -17,7 +18,6 @@
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
Expand All @@ -37,6 +37,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand All @@ -45,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import javax.sql.DataSource;
Expand Down Expand Up @@ -93,7 +95,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
attemptTableOperations(outputSchema, database, namingResolver, sqlOperations, false);
if (TypingAndDedupingFlag.isDestinationV2()) {
final var v2RawSchema = namingResolver.getIdentifier(TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false);
destinationSpecificTableOperations(database);
}
Expand Down Expand Up @@ -252,7 +254,9 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri

protected abstract JdbcSqlGenerator getSqlGenerator();

protected abstract JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database);
protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
final String rawTableSchema);

/**
* "database" key at root of the config json, for any other variants in config, override this
Expand Down Expand Up @@ -309,21 +313,23 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
*/
private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JdbcDatabase database) {
final JdbcSqlGenerator sqlGenerator = getSqlGenerator();
final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE);
final ParsedCatalog parsedCatalog = rawNamespaceOverride
.map(override -> new CatalogParser(sqlGenerator, override))
.orElse(new CatalogParser(sqlGenerator))
.parseCatalog(catalog);
final String databaseName = getDatabaseName(config);
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database);
final DestinationHandler<? extends MinimumDestinationState> destinationHandler =
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
} else {
typerDeduper =
new DefaultTyperDeduper(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
}
return typerDeduper;
}
Expand Down
Loading

0 comments on commit 4efc065

Please sign in to comment.