Skip to content

Commit

Permalink
🚨 Destination Snowflake: Remove GCS/S3 Staging. (#29236)
Browse files Browse the repository at this point in the history
As title, remove the GCS/S3 staging methods.

There isn't much usage so we can remove this. Internal Staging is also recommended by Snowflake, so using that is both cheaper and faster.

Co-authored-by: davinchia <davinchia@users.noreply.github.com>
Co-authored-by: Evan Tahler <evan@airbyte.io>
Co-authored-by: Pedro S. Lopez <pedroslopez@me.com>
  • Loading branch information
4 people committed Aug 18, 2023
1 parent 6ba7c03 commit ec0f83b
Show file tree
Hide file tree
Showing 23 changed files with 21 additions and 1,359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ AirbyteMessageConsumer getConsumer(JsonNode config,

/**
* Default implementation allows us to not have to touch existing destinations while avoiding a lot
* of conditional statements in {@link IntegrationRunner}.
* of conditional statements in {@link IntegrationRunner}. This is preferred over #getConsumer and
* is the default Async Framework method.
*
* @param config config
* @param catalog catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
ENV ENABLE_SENTRY true


LABEL io.airbyte.version=1.3.3
LABEL io.airbyte.version=2.0.0
LABEL io.airbyte.name=airbyte/destination-snowflake

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 1.3.3
dockerImageTag: 2.0.0
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand All @@ -28,6 +28,11 @@ data:
supportsDbt: true
tags:
- language:java
releases:
breakingChanges:
2.0.0:
message: "Remove GCS/S3 loading method support."
upgradeDeadline: "2023-08-31"
ab_internal:
sl: 200
ql: 400
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,19 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

// TODO: Remove the Switching Destination from this class as part of code cleanup.
@Slf4j
public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestination.DestinationType> {

public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
private final String airbyteEnvironment;

enum DestinationType {
COPY_S3,
COPY_GCS,
INTERNAL_STAGING
}

Expand All @@ -40,29 +35,8 @@ public SnowflakeDestination(final String airbyteEnvironment) {
@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
log.info("destination class: {}", getClass());
final var useAsyncSnowflake = useAsyncSnowflake(config);
log.info("using async snowflake: {}", useAsyncSnowflake);
if (useAsyncSnowflake) {
return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector);
} else {
return new ShimToSerializedAirbyteMessageConsumer(getConsumer(config, catalog, outputRecordCollector));
}

}

public static boolean useAsyncSnowflake(final JsonNode config) {
final Set<String> stagingLoadingMethods = Set.of("internal staging", "internal-staging", "internal_staging");

return Optional.of(config)
.map(node -> node.get("loading_method"))
.map(node -> node.get("method"))
.map(JsonNode::asText)
.map(String::toLowerCase)
.map(loadingMethod -> stagingLoadingMethods.contains(loadingMethod))
.orElse(false);
final Consumer<AirbyteMessage> outputRecordCollector) {
return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,13 @@
public class SnowflakeDestinationResolver {

public static DestinationType getTypeFromConfig(final JsonNode config) {
if (isS3Copy(config)) {
return DestinationType.COPY_S3;
} else if (isGcsCopy(config)) {
return DestinationType.COPY_GCS;
} else {
return DestinationType.INTERNAL_STAGING;
}
}

public static boolean isS3Copy(final JsonNode config) {
return config.has("loading_method") && config.get("loading_method").isObject() && config.get("loading_method").has("s3_bucket_name");
}

public static boolean isGcsCopy(final JsonNode config) {
return config.has("loading_method") && config.get("loading_method").isObject() && config.get("loading_method").has("project_id");
}

public static int getNumberOfFileBuffers(final JsonNode config) {
int numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER;
if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) {
numOfFileBuffers = Math.min(config.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
}
// Only allows for values 10 <= numOfFileBuffers <= 50
return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
}

public static Map<DestinationType, Destination> getTypeToDestination(
final String airbyteEnvironment) {
final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination(airbyteEnvironment);
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination(airbyteEnvironment);
public static Map<DestinationType, Destination> getTypeToDestination(final String airbyteEnvironment) {
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination(airbyteEnvironment);

return ImmutableMap.of(
DestinationType.COPY_S3, s3StagingDestination,
DestinationType.COPY_GCS, gcsStagingDestination,
DestinationType.INTERNAL_STAGING, internalStagingDestination);
return ImmutableMap.of(DestinationType.INTERNAL_STAGING, internalStagingDestination);
}

}

This file was deleted.

0 comments on commit ec0f83b

Please sign in to comment.