Skip to content

Commit

Permalink
set isV2Destination in Destination classes
Browse files Browse the repository at this point in the history
  • Loading branch information
cynthiaxyin committed Jan 23, 2024
1 parent 6176d03 commit 68ed3cd
Show file tree
Hide file tree
Showing 19 changed files with 66 additions and 45 deletions.
Expand Up @@ -162,4 +162,11 @@ public void setType(final AirbyteMessage.Type type) {

}

/**
* Denotes if the destination fully supports Destinations V2.
*/
default Boolean isV2Destination() {
return false;
}

}
Expand Up @@ -20,18 +20,27 @@ public class DestinationConfig {

private static DestinationConfig config;

// whether the destination fully supports Destinations V2
private Boolean isV2Destination;

@VisibleForTesting
protected JsonNode root;

private DestinationConfig() {}

@VisibleForTesting
public static void initialize(final JsonNode root) {
initialize(root, false);
}

public static void initialize(final JsonNode root, final Boolean isV2Destination) {
if (config == null) {
if (root == null) {
throw new IllegalArgumentException("Cannot create DestinationConfig from null.");
}
config = new DestinationConfig();
config.root = root;
config.isV2Destination = isV2Destination;
} else {
LOGGER.warn("Singleton was already initialized.");
}
Expand All @@ -44,6 +53,7 @@ public static DestinationConfig getInstance() {
return config;
}

@VisibleForTesting
public static void clearInstance() {
config = null;
}
Expand Down Expand Up @@ -76,4 +86,8 @@ public Boolean getBooleanValue(final String key) {
return node.asBoolean();
}

public Boolean getIsV2Destination() {
return isV2Destination;
}

}
Expand Up @@ -140,7 +140,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
case CHECK -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
if (integration instanceof Destination) {
DestinationConfig.initialize(config);
DestinationConfig.initialize(config, ((Destination) integration).isV2Destination());
}
try {
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
Expand Down Expand Up @@ -183,7 +183,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
// save config to singleton
DestinationConfig.initialize(config);
DestinationConfig.initialize(config, ((Destination) integration).isV2Destination());
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);

try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) {
Expand Down
Expand Up @@ -9,12 +9,12 @@
public class TypingAndDedupingFlag {

public static boolean isDestinationV2() {
return DestinationConfig.getInstance().getBooleanValue("is_v2_destination")
return DestinationConfig.getInstance().getIsV2Destination()
|| DestinationConfig.getInstance().getBooleanValue("use_1s1t_format");
}

public static Optional<String> getRawNamespaceOverride(String option) {
String rawOverride = DestinationConfig.getInstance().getTextValue(option);
public static Optional<String> getRawNamespaceOverride(final String option) {
final String rawOverride = DestinationConfig.getInstance().getTextValue(option);
if (rawOverride == null || rawOverride.isEmpty()) {
return Optional.empty();
} else {
Expand Down
Expand Up @@ -4,14 +4,9 @@

package io.airbyte.cdk.integrations.base.adaptive;

import io.airbyte.cdk.integrations.base.Command;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.DestinationConfig;
import io.airbyte.cdk.integrations.base.IntegrationCliParser;
import io.airbyte.cdk.integrations.base.IntegrationConfig;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,15 +83,6 @@ private Destination getDestination() {
}

public void run(final String[] args) throws Exception {
// getDestination() sometimes depends on the singleton being initialized.
// Parse the CLI args just so we can accomplish that.
IntegrationConfig parsedArgs = new IntegrationCliParser().parse(args);
if (parsedArgs.getCommand() != Command.SPEC) {
DestinationConfig.initialize(IntegrationRunner.parseConfig(parsedArgs.getConfigPath()));
} else {
DestinationConfig.initialize(Jsons.emptyObject());
}

final Destination destination = getDestination();
LOGGER.info("Starting destination: {}", destination.getClass().getName());
new IntegrationRunner(destination).run(args);
Expand Down
@@ -1 +1 @@
version=0.13.2
version=0.13.3
Expand Up @@ -33,18 +33,21 @@ public void testInitialization() {
assertThrows(IllegalStateException.class, DestinationConfig::getInstance);

// good initialization
DestinationConfig.initialize(NODE);
DestinationConfig.initialize(NODE, true);
assertNotNull(DestinationConfig.getInstance());
assertEquals(NODE, DestinationConfig.getInstance().root);
assertEquals(true, DestinationConfig.getInstance().getIsV2Destination());

// initializing again doesn't change the config
final JsonNode nodeUnused = Jsons.deserialize("{}");
DestinationConfig.initialize(nodeUnused);
DestinationConfig.initialize(nodeUnused, false);
assertEquals(NODE, DestinationConfig.getInstance().root);
assertEquals(true, DestinationConfig.getInstance().getIsV2Destination());
}

@Test
public void testValues() {
DestinationConfig.clearInstance();
DestinationConfig.initialize(NODE);

assertEquals("bar", DestinationConfig.getInstance().getTextValue("foo"));
Expand All @@ -60,6 +63,8 @@ public void testValues() {
assertEquals(Jsons.deserialize("\"bar\""), DestinationConfig.getInstance().getNodeValue("foo"));
assertEquals(Jsons.deserialize("true"), DestinationConfig.getInstance().getNodeValue("baz"));
assertNull(DestinationConfig.getInstance().getNodeValue("blah"));

assertEquals(false, DestinationConfig.getInstance().getIsV2Destination());
}

}
Expand Up @@ -4,9 +4,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.12.0'
cdkVersionRequired = '0.13.3'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

airbyteJavaConnector.addCdkDependencies()
Expand Down
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.3.31
dockerImageTag: 2.3.32
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Expand Up @@ -457,6 +457,11 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator,

}

@Override
public Boolean isV2Destination() {
return true;
}

public static void main(final String[] args) throws Exception {
AirbyteExceptionHandler.addThrowableForDeinterpolation(BigQueryException.class);
final Destination destination = new BigQueryDestination();
Expand Down
Expand Up @@ -4,9 +4,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.13.0'
cdkVersionRequired = '0.13.3'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Expand Up @@ -69,6 +69,11 @@ public ConnectorSpecification spec() throws Exception {
return originalSpec;
}

@Override
public Boolean isV2Destination() {
return true;
}

public static void main(final String[] args) throws Exception {
final Destination destination = new RedshiftDestination();
LOGGER.info("starting destination: {}", RedshiftDestination.class);
Expand Down
Expand Up @@ -249,14 +249,6 @@
}
]
},
"is_v2_destination": {
"type": "boolean",
"description": "(For internal use.) Declares this is a V2 destination.",
"title": "(Internal) Is V2 Destination",
"group": "connection",
"airbyte_hidden": true,
"default": true
},
"raw_data_schema": {
"type": "string",
"description": "The schema to write raw tables into",
Expand Down
Expand Up @@ -4,9 +4,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.12.0'
cdkVersionRequired = '0.13.3'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

airbyteJavaConnector.addCdkDependencies()
Expand Down
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.4.22
dockerImageTag: 3.4.23
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Expand Up @@ -40,4 +40,9 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector);
}

@Override
public Boolean isV2Destination() {
return true;
}

}
9 changes: 5 additions & 4 deletions docs/integrations/destinations/bigquery.md
Expand Up @@ -210,10 +210,11 @@ tutorials:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.3.31 | 2024-01-22 | [34023](https://github.com/airbytehq/airbyte/pull/34023) | Combine DDL operations into a single execution |
| 2.3.30 | 2024-01-12 | [34226](https://github.com/airbytehq/airbyte/pull/34226) | Upgrade CDK to 0.12.0; Cleanup dependencies |
| 2.3.29 | 2024-01-09 | [34003](https://github.com/airbytehq/airbyte/pull/34003) | Fix loading credentials from GCP Env |
| 2.3.28 | 2024-01-08 | [34021](https://github.com/airbytehq/airbyte/pull/34021) | Add idempotency ids in dummy insert for check call |
| 2.3.32 | 2024-01-22 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Internal code changes for Destinations V2 |
| 2.3.31 | 2024-01-22 | [\#34023](https://github.com/airbytehq/airbyte/pull/34023) | Combine DDL operations into a single execution |
| 2.3.30 | 2024-01-12 | [\#34226](https://github.com/airbytehq/airbyte/pull/34226) | Upgrade CDK to 0.12.0; Cleanup dependencies |
| 2.3.29 | 2024-01-09 | [\#34003](https://github.com/airbytehq/airbyte/pull/34003) | Fix loading credentials from GCP Env |
| 2.3.28 | 2024-01-08 | [\#34021](https://github.com/airbytehq/airbyte/pull/34021) | Add idempotency ids in dummy insert for check call |
| 2.3.27 | 2024-01-05 | [\#33948](https://github.com/airbytehq/airbyte/pull/33948) | Skip retrieving initial table state when setup fails |
| 2.3.26 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | Internal code structure changes |
| 2.3.25 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Update to java CDK 0.10.0 (no changes) |
Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/destinations/redshift.md
Expand Up @@ -237,7 +237,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.0 | 2024-01-19 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Destinations V2 |
| 2.0.0 | 2024-01-22 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Destinations V2 |
| 0.8.0 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.0 |
| 0.7.15 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Update check method with svv_table_info permission check, fix bug where s3 staging files were not being deleted. |
| 0.7.14 | 2024-01-08 | [\#34014](https://github.com/airbytehq/airbyte/pull/34014) | Update order of options in spec |
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/destinations/snowflake.md
Expand Up @@ -246,8 +246,9 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.22 | 2024-01-12 | [34227](https://github.com/airbytehq/airbyte/pull/34227) | Upgrade CDK to 0.12.0; Cleanup unused dependencies |
| 3.4.21 | 2024-01-10 | [\#34083](https://github.com/airbytehq/airbte/pull/34083) | Emit destination stats as part of the state message |
| 3.4.23 | 2024-01-22 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Internal code changes for Destinations V2 |
| 3.4.22 | 2024-01-12 | [\#34227](https://github.com/airbytehq/airbyte/pull/34227) | Upgrade CDK to 0.12.0; Cleanup unused dependencies |
| 3.4.21 | 2024-01-10 | [\#34083](https://github.com/airbytehq/airbyte/pull/34083) | Emit destination stats as part of the state message |
| 3.4.20 | 2024-01-05 | [\#33948](https://github.com/airbytehq/airbyte/pull/33948) | Skip retrieving initial table state when setup fails |
| 3.4.19 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | Internal code structure changes |
| 3.4.18 | 2024-01-02 | [\#33728](https://github.com/airbytehq/airbyte/pull/33728) | Add option to only type and dedupe at the end of the sync |
Expand Down

0 comments on commit 68ed3cd

Please sign in to comment.