diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/Destination.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/Destination.java index 20a0c2464bb70..130f6ce016f3c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/Destination.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/Destination.java @@ -162,4 +162,11 @@ public void setType(final AirbyteMessage.Type type) { } + /** + * Denotes if the destination fully supports Destinations V2. + */ + default Boolean isV2Destination() { + return false; + } + } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/DestinationConfig.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/DestinationConfig.java index 280658683f7b5..9e78520c4f9a6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/DestinationConfig.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/DestinationConfig.java @@ -20,18 +20,27 @@ public class DestinationConfig { private static DestinationConfig config; + // whether the destination fully supports Destinations V2 + private Boolean isV2Destination; + @VisibleForTesting protected JsonNode root; private DestinationConfig() {} + @VisibleForTesting public static void initialize(final JsonNode root) { + initialize(root, false); + } + + public static void initialize(final JsonNode root, final Boolean isV2Destination) { if (config == null) { if (root == null) { throw new IllegalArgumentException("Cannot create DestinationConfig from null."); } config = new DestinationConfig(); config.root = root; + config.isV2Destination = isV2Destination; } else { LOGGER.warn("Singleton was already initialized."); } @@ -44,6 +53,7 @@ public static DestinationConfig getInstance() { return config; } + @VisibleForTesting public static void clearInstance() { config = null; } @@ -76,4 +86,8 @@ public Boolean getBooleanValue(final String key) { return node.asBoolean(); } + public Boolean getIsV2Destination() { + return isV2Destination; + } + } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java index 5887466c126de..8da1ff8588aa8 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java @@ -140,7 +140,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { case CHECK -> { final JsonNode config = parseConfig(parsed.getConfigPath()); if (integration instanceof Destination) { - DestinationConfig.initialize(config); + DestinationConfig.initialize(config, ((Destination) integration).isV2Destination()); } try { validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK"); @@ -183,7 +183,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { final JsonNode config = parseConfig(parsed.getConfigPath()); validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); // save config to singleton - DestinationConfig.initialize(config); + DestinationConfig.initialize(config, ((Destination) integration).isV2Destination()); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/TypingAndDedupingFlag.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/TypingAndDedupingFlag.java index 27ba7a1f8227b..8820b1d7017f9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/TypingAndDedupingFlag.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/TypingAndDedupingFlag.java @@ -9,12 +9,12 @@ public class TypingAndDedupingFlag { public static boolean isDestinationV2() { - return DestinationConfig.getInstance().getBooleanValue("is_v2_destination") + return DestinationConfig.getInstance().getIsV2Destination() || DestinationConfig.getInstance().getBooleanValue("use_1s1t_format"); } - public static Optional getRawNamespaceOverride(String option) { - String rawOverride = DestinationConfig.getInstance().getTextValue(option); + public static Optional getRawNamespaceOverride(final String option) { + final String rawOverride = DestinationConfig.getInstance().getTextValue(option); if (rawOverride == null || rawOverride.isEmpty()) { return Optional.empty(); } else { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.java index 878eef089be03..81d508b0dd2c7 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.java @@ -4,14 +4,9 @@ package io.airbyte.cdk.integrations.base.adaptive; -import io.airbyte.cdk.integrations.base.Command; import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.cdk.integrations.base.DestinationConfig; -import io.airbyte.cdk.integrations.base.IntegrationCliParser; -import io.airbyte.cdk.integrations.base.IntegrationConfig; import io.airbyte.cdk.integrations.base.IntegrationRunner; import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.commons.json.Jsons; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,15 +83,6 @@ private Destination getDestination() { } public void run(final String[] args) throws Exception { - // getDestination() sometimes depends on the singleton being initialized. - // Parse the CLI args just so we can accomplish that. - IntegrationConfig parsedArgs = new IntegrationCliParser().parse(args); - if (parsedArgs.getCommand() != Command.SPEC) { - DestinationConfig.initialize(IntegrationRunner.parseConfig(parsedArgs.getConfigPath())); - } else { - DestinationConfig.initialize(Jsons.emptyObject()); - } - final Destination destination = getDestination(); LOGGER.info("Starting destination: {}", destination.getClass().getName()); new IntegrationRunner(destination).run(args); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index b18dfa7feb695..a4868348559c6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.13.2 +version=0.13.3 diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/DestinationConfigTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/DestinationConfigTest.java index 2d06503baf20d..68044162bb724 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/DestinationConfigTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/DestinationConfigTest.java @@ -33,18 +33,21 @@ public void testInitialization() { assertThrows(IllegalStateException.class, DestinationConfig::getInstance); // good initialization - DestinationConfig.initialize(NODE); + DestinationConfig.initialize(NODE, true); assertNotNull(DestinationConfig.getInstance()); assertEquals(NODE, DestinationConfig.getInstance().root); + assertEquals(true, DestinationConfig.getInstance().getIsV2Destination()); // initializing again doesn't change the config final JsonNode nodeUnused = Jsons.deserialize("{}"); - DestinationConfig.initialize(nodeUnused); + DestinationConfig.initialize(nodeUnused, false); assertEquals(NODE, DestinationConfig.getInstance().root); + assertEquals(true, DestinationConfig.getInstance().getIsV2Destination()); } @Test public void testValues() { + DestinationConfig.clearInstance(); DestinationConfig.initialize(NODE); assertEquals("bar", DestinationConfig.getInstance().getTextValue("foo")); @@ -60,6 +63,8 @@ public void testValues() { assertEquals(Jsons.deserialize("\"bar\""), DestinationConfig.getInstance().getNodeValue("foo")); assertEquals(Jsons.deserialize("true"), DestinationConfig.getInstance().getNodeValue("baz")); assertNull(DestinationConfig.getInstance().getNodeValue("blah")); + + assertEquals(false, DestinationConfig.getInstance().getIsV2Destination()); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 9d4c49a4163df..3fd4efd26e548 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -4,9 +4,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.12.0' + cdkVersionRequired = '0.13.3' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } airbyteJavaConnector.addCdkDependencies() diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 3ad145a95972b..08ac15d3df87c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.3.31 + dockerImageTag: 2.3.32 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index edec86e609917..00a96e5e2b566 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -457,6 +457,11 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, } + @Override + public Boolean isV2Destination() { + return true; + } + public static void main(final String[] args) throws Exception { AirbyteExceptionHandler.addThrowableForDeinterpolation(BigQueryException.class); final Destination destination = new BigQueryDestination(); diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index aa75211ebf3e7..ad655f4579d93 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -4,9 +4,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.13.0' + cdkVersionRequired = '0.13.3' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } java { diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java index d10cabd45730b..d1c9c0c5949fc 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java @@ -69,6 +69,11 @@ public ConnectorSpecification spec() throws Exception { return originalSpec; } + @Override + public Boolean isV2Destination() { + return true; + } + public static void main(final String[] args) throws Exception { final Destination destination = new RedshiftDestination(); LOGGER.info("starting destination: {}", RedshiftDestination.class); diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index 33668701900d4..55cb60c52a62f 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -249,14 +249,6 @@ } ] }, - "is_v2_destination": { - "type": "boolean", - "description": "(For internal use.) Declares this is a V2 destination.", - "title": "(Internal) Is V2 Destination", - "group": "connection", - "airbyte_hidden": true, - "default": true - }, "raw_data_schema": { "type": "string", "description": "The schema to write raw tables into", diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 4ce8698d75d8f..b0085155b8b10 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -4,9 +4,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.12.0' + cdkVersionRequired = '0.13.3' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } airbyteJavaConnector.addCdkDependencies() diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index b5d7311d68df2..aea48a80e0025 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.4.22 + dockerImageTag: 3.4.23 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index 217ed51abb78a..0eeefde5e19d9 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -40,4 +40,9 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector); } + @Override + public Boolean isV2Destination() { + return true; + } + } diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index b30ba594a0cb8..a919b4bc27bc4 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -210,10 +210,11 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.3.31 | 2024-01-22 | [34023](https://github.com/airbytehq/airbyte/pull/34023) | Combine DDL operations into a single execution | -| 2.3.30 | 2024-01-12 | [34226](https://github.com/airbytehq/airbyte/pull/34226) | Upgrade CDK to 0.12.0; Cleanup dependencies | -| 2.3.29 | 2024-01-09 | [34003](https://github.com/airbytehq/airbyte/pull/34003) | Fix loading credentials from GCP Env | -| 2.3.28 | 2024-01-08 | [34021](https://github.com/airbytehq/airbyte/pull/34021) | Add idempotency ids in dummy insert for check call | +| 2.3.32 | 2024-01-22 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Internal code changes for Destinations V2 | +| 2.3.31 | 2024-01-22 | [\#34023](https://github.com/airbytehq/airbyte/pull/34023) | Combine DDL operations into a single execution | +| 2.3.30 | 2024-01-12 | [\#34226](https://github.com/airbytehq/airbyte/pull/34226) | Upgrade CDK to 0.12.0; Cleanup dependencies | +| 2.3.29 | 2024-01-09 | [\#34003](https://github.com/airbytehq/airbyte/pull/34003) | Fix loading credentials from GCP Env | +| 2.3.28 | 2024-01-08 | [\#34021](https://github.com/airbytehq/airbyte/pull/34021) | Add idempotency ids in dummy insert for check call | | 2.3.27 | 2024-01-05 | [\#33948](https://github.com/airbytehq/airbyte/pull/33948) | Skip retrieving initial table state when setup fails | | 2.3.26 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | Internal code structure changes | | 2.3.25 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Update to java CDK 0.10.0 (no changes) | diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 21acbb28f9fc8..43c24993bb581 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -237,7 +237,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.0.0 | 2024-01-19 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Destinations V2 | +| 2.0.0 | 2024-01-22 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Destinations V2 | | 0.8.0 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.0 | | 0.7.15 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Update check method with svv_table_info permission check, fix bug where s3 staging files were not being deleted. | | 0.7.14 | 2024-01-08 | [\#34014](https://github.com/airbytehq/airbyte/pull/34014) | Update order of options in spec | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index aad1612b10c82..f838722dfcc44 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -246,8 +246,9 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.4.22 | 2024-01-12 | [34227](https://github.com/airbytehq/airbyte/pull/34227) | Upgrade CDK to 0.12.0; Cleanup unused dependencies | -| 3.4.21 | 2024-01-10 | [\#34083](https://github.com/airbytehq/airbte/pull/34083) | Emit destination stats as part of the state message | +| 3.4.23 | 2024-01-22 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Internal code changes for Destinations V2 | +| 3.4.22 | 2024-01-12 | [\#34227](https://github.com/airbytehq/airbyte/pull/34227) | Upgrade CDK to 0.12.0; Cleanup unused dependencies | +| 3.4.21 | 2024-01-10 | [\#34083](https://github.com/airbytehq/airbyte/pull/34083) | Emit destination stats as part of the state message | | 3.4.20 | 2024-01-05 | [\#33948](https://github.com/airbytehq/airbyte/pull/33948) | Skip retrieving initial table state when setup fails | | 3.4.19 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | Internal code structure changes | | 3.4.18 | 2024-01-02 | [\#33728](https://github.com/airbytehq/airbyte/pull/33728) | Add option to only type and dedupe at the end of the sync |