diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json index 3baf3b976f7f4..5fd2be6837584 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json @@ -2,7 +2,7 @@ "destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba", "name": "Snowflake", "dockerRepository": "airbyte/destination-snowflake", - "dockerImageTag": "0.3.16", + "dockerImageTag": "0.3.18", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake", "icon": "snowflake.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 59f2e9abd4be1..bb618fa6433c7 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -151,7 +151,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.3.17 + dockerImageTag: 0.3.18 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake icon: snowflake.svg - name: MariaDB ColumnStore diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index f16bb4e1e3945..b3b37017fb992 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -15,5 +15,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.17 +LABEL io.airbyte.version=0.3.18 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index c84c58aed30d1..d3d18f927641f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -20,7 +20,8 @@ public class SnowflakeDestination extends SwitchingDestination getTypeToDestination() { final SnowflakeInsertDestination insertDestination = new SnowflakeInsertDestination(); final SnowflakeCopyS3Destination copyS3Destination = new SnowflakeCopyS3Destination(); final SnowflakeCopyGcsDestination copyGcsDestination = new SnowflakeCopyGcsDestination(); + final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination(); return ImmutableMap.of( DestinationType.INSERT, insertDestination, DestinationType.COPY_S3, copyS3Destination, - DestinationType.COPY_GCS, copyGcsDestination); + DestinationType.COPY_GCS, copyGcsDestination, + DestinationType.INTERNAL_STAGING, internalStagingDestination); } public static void main(final String[] args) throws Exception { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java new file mode 100644 index 0000000000000..55dda9ef63d31 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.snowflake; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.destination.NamingConventionTransformer; +import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; +import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; +import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction; +import io.airbyte.integrations.destination.buffered_stream_consumer.RecordWriter; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.WriteConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Snowflake Internal Staging consists of 4 main parts + * + * CREATE STAGE @TEMP_STAGE_NAME -- Creates a new named internal stage to use for loading data from + * files into Snowflake tables and unloading data from tables into files + * PUT file://local/ @TEMP_STAGE_NAME. --JDBC Driver will upload the files into stage + * COPY FROM @TEMP_STAGE_NAME -- Loads data from staged files to an existing table. + * DROP @TEMP_STAGE_NAME -- Drop temporary stage after sync + */ +public class SnowflakeInternalStagingConsumerFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class); + + private static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mb + + public static AirbyteMessageConsumer create(final Consumer outputRecordCollector, + final JdbcDatabase database, + final SnowflakeStagingSqlOperations sqlOperations, + final SnowflakeSQLNameTransformer namingResolver, + final JsonNode config, + final ConfiguredAirbyteCatalog catalog) { + final List writeConfigs = createWriteConfigs(namingResolver, config, catalog); + + return new BufferedStreamConsumer( + outputRecordCollector, + onStartFunction(database, sqlOperations, writeConfigs, namingResolver), + recordWriterFunction(database, sqlOperations, writeConfigs, catalog, namingResolver), + onCloseFunction(database, sqlOperations, writeConfigs, namingResolver), + catalog, + sqlOperations::isValidData, + MAX_BATCH_SIZE_BYTES); + } + + private static List createWriteConfigs(final NamingConventionTransformer namingResolver, + final JsonNode config, + final ConfiguredAirbyteCatalog catalog) { + + return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(Collectors.toList()); + } + + private static Function toWriteConfig( + final NamingConventionTransformer namingResolver, + final JsonNode config) { + return stream -> { + Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode"); + final AirbyteStream abStream = stream.getStream(); + + final String outputSchema = getOutputSchema(abStream, namingResolver.getIdentifier(config.get("schema").asText())); + + final String streamName = abStream.getName(); + final String tableName = namingResolver.getRawTableName(streamName); + final String tmpTableName = namingResolver.getTmpTableName(streamName); + final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); + + final WriteConfig writeConfig = new WriteConfig(streamName, abStream.getNamespace(), outputSchema, tmpTableName, tableName, syncMode); + LOGGER.info("Write config: {}", writeConfig); + + return writeConfig; + }; + } + + private static String getOutputSchema(final AirbyteStream stream, final String defaultDestSchema) { + final String sourceSchema = stream.getNamespace(); + if (sourceSchema != null) { + return sourceSchema; + } + return defaultDestSchema; + } + + private static OnStartFunction onStartFunction(final JdbcDatabase database, + final SnowflakeStagingSqlOperations snowflakeSqlOperations, + final List writeConfigs, + final SnowflakeSQLNameTransformer namingResolver) { + return () -> { + LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size()); + for (final WriteConfig writeConfig : writeConfigs) { + final String schemaName = writeConfig.getOutputSchemaName(); + final String tmpTableName = writeConfig.getTmpTableName(); + LOGGER.info("Preparing tmp table in destination started for stream {}. schema: {}, tmp table name: {}", writeConfig.getStreamName(), + schemaName, tmpTableName); + final String outputTableName = writeConfig.getOutputTableName(); + final String stageName = namingResolver.getStageName(schemaName, outputTableName); + LOGGER.info("Preparing stage in destination started for stream {}. schema: {}, stage: {}", writeConfig.getStreamName(), + schemaName, stageName); + + snowflakeSqlOperations.createSchemaIfNotExists(database, schemaName); + snowflakeSqlOperations.createTableIfNotExists(database, schemaName, tmpTableName); + snowflakeSqlOperations.createStageIfNotExists(database, stageName); + LOGGER.info("Preparing stages in destination completed " + stageName); + + } + LOGGER.info("Preparing tables in destination completed."); + }; + } + + private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) { + return new AirbyteStreamNameNamespacePair(config.getStreamName(), config.getNamespace()); + } + + private static RecordWriter recordWriterFunction(final JdbcDatabase database, + final SqlOperations snowflakeSqlOperations, + final List writeConfigs, + final ConfiguredAirbyteCatalog catalog, + final SnowflakeSQLNameTransformer namingResolver) { + final Map pairToWriteConfig = + writeConfigs.stream() + .collect(Collectors.toUnmodifiableMap( + SnowflakeInternalStagingConsumerFactory::toNameNamespacePair, Function.identity())); + + return (pair, records) -> { + if (!pairToWriteConfig.containsKey(pair)) { + throw new IllegalArgumentException( + String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog))); + } + + final WriteConfig writeConfig = pairToWriteConfig.get(pair); + final String schemaName = writeConfig.getOutputSchemaName(); + final String tableName = writeConfig.getOutputTableName(); + final String stageName = namingResolver.getStageName(schemaName, tableName); + + snowflakeSqlOperations.insertRecords(database, records, schemaName, stageName); + }; + } + + private static OnCloseFunction onCloseFunction(final JdbcDatabase database, + final SnowflakeStagingSqlOperations sqlOperations, + final List writeConfigs, + final SnowflakeSQLNameTransformer namingResolver) { + return (hasFailed) -> { + if (!hasFailed) { + final List queryList = new ArrayList<>(); + LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size()); + for (final WriteConfig writeConfig : writeConfigs) { + final String schemaName = writeConfig.getOutputSchemaName(); + final String srcTableName = writeConfig.getTmpTableName(); + final String dstTableName = writeConfig.getOutputTableName(); + LOGGER.info("Finalizing stream {}. schema {}, tmp table {}, final table {}", writeConfig.getStreamName(), schemaName, srcTableName, + dstTableName); + + final String stageName = namingResolver.getStageName(schemaName, dstTableName); + sqlOperations.copyIntoTmpTableFromStage(database, stageName, srcTableName, schemaName); + LOGGER.info("Uploading data from stage: stream {}. schema {}, tmp table {}, stage {}", writeConfig.getStreamName(), schemaName, + srcTableName, + stageName); + sqlOperations.createTableIfNotExists(database, schemaName, dstTableName); + switch (writeConfig.getSyncMode()) { + case OVERWRITE -> queryList.add(sqlOperations.truncateTableQuery(database, schemaName, dstTableName)); + case APPEND, APPEND_DEDUP -> {} + default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode()); + } + queryList.add(sqlOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName)); + } + + LOGGER.info("Executing finalization of tables."); + sqlOperations.executeTransaction(database, queryList); + LOGGER.info("Finalizing tables in destination completed."); + } + LOGGER.info("Cleaning tmp tables in destination started for {} streams", writeConfigs.size()); + for (final WriteConfig writeConfig : writeConfigs) { + final String schemaName = writeConfig.getOutputSchemaName(); + final String tmpTableName = writeConfig.getTmpTableName(); + LOGGER.info("Cleaning tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(), schemaName, + tmpTableName); + + sqlOperations.dropTableIfExists(database, schemaName, tmpTableName); + final String outputTableName = writeConfig.getOutputTableName(); + final String stageName = namingResolver.getStageName(schemaName, outputTableName); + LOGGER.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName, + stageName); + sqlOperations.dropStageIfExists(database, stageName); + } + LOGGER.info("Cleaning tmp tables and stages in destination completed."); + }; + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java new file mode 100644 index 0000000000000..bf32c15e8c2f5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.snowflake; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; + +public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination { + + public SnowflakeInternalStagingDestination() { + super("", new SnowflakeSQLNameTransformer(), new SnowflakeStagingSqlOperations()); + } + + @Override + protected JdbcDatabase getDatabase(final JsonNode config) { + return SnowflakeDatabase.getDatabase(config); + } + + // this is a no op since we override getDatabase. + @Override + public JsonNode toJdbcConfig(final JsonNode config) { + return Jsons.emptyObject(); + } + + @Override + public AirbyteMessageConsumer getConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { + return SnowflakeInternalStagingConsumerFactory.create(outputRecordCollector, getDatabase(config), + new SnowflakeStagingSqlOperations(), new SnowflakeSQLNameTransformer(), config, catalog); + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java index f2fefcd7c1294..420d03e709412 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java @@ -13,4 +13,8 @@ protected String applyDefaultCase(final String input) { return input.toUpperCase(); } + public String getStageName(String schemaName, String outputTableName) { + return schemaName.concat(outputTableName).replaceAll("-", "_").toUpperCase(); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java new file mode 100644 index 0000000000000..50ca8c57d8e8f --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.snowflake; + +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.io.File; +import java.nio.file.Files; +import java.sql.SQLException; +import java.util.List; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnowflakeStagingSqlOperations extends JdbcSqlOperations implements SqlOperations { + + private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class); + + @Override + protected void insertRecordsInternal(JdbcDatabase database, List records, String schemaName, String stage) throws Exception { + LOGGER.info("actual size of batch for staging: {}", records.size()); + + if (records.isEmpty()) { + return; + } + try { + loadDataIntoStage(database, stage, records); + } catch (Exception e) { + LOGGER.error("Failed to upload records into stage {}", stage, e); + throw new RuntimeException(e); + } + } + + private void loadDataIntoStage(JdbcDatabase database, String stage, List partition) throws Exception { + final File tempFile = Files.createTempFile(UUID.randomUUID().toString(), ".csv").toFile(); + writeBatchToFile(tempFile, partition); + database.execute(String.format("PUT file://%s @%s PARALLEL = %d", tempFile.getAbsolutePath(), stage, Runtime.getRuntime().availableProcessors())); + Files.delete(tempFile.toPath()); + } + + public void createStageIfNotExists(final JdbcDatabase database, final String stageName) throws SQLException { + database.execute(String.format("CREATE STAGE IF NOT EXISTS %s encryption = (type = 'SNOWFLAKE_SSE')" + + " copy_options = (on_error='skip_file');", stageName)); + } + + public void copyIntoTmpTableFromStage(JdbcDatabase database, String stageName, String dstTableName, String schemaName) throws SQLException { + database.execute(String.format("COPY INTO %s.%s FROM @%s file_format = " + + "(type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"')", + schemaName, + dstTableName, + stageName)); + + } + + public void dropStageIfExists(final JdbcDatabase database, final String stageName) throws SQLException { + database.execute(String.format("DROP STAGE IF EXISTS %s;", stageName)); + } + + @Override + public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException { + database.execute(createTableQuery(database, schemaName, tableName)); + } + + @Override + public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) { + return String.format( + "CREATE TABLE IF NOT EXISTS %s.%s ( \n" + + "%s VARCHAR PRIMARY KEY,\n" + + "%s VARIANT,\n" + + "%s TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp()\n" + + ") data_retention_time_in_days = 0;", + schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json index 77ffaf50f1b52..5369c198774a0 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json @@ -75,15 +75,15 @@ "order": 7, "oneOf": [ { - "title": "Standard Inserts", + "title": "[Recommended] Internal Staging", "additionalProperties": false, - "description": "Uses
INSERT
statements to send batches of records to Snowflake. Easiest (no setup) but not recommended for large production workloads due to slow speed.", + "description": "Writes large batches of records to a file, uploads the file to Snowflake, then uses
COPY INTO table
to upload the file. Recommended for large production workloads for better speed and scalability.", "required": ["method"], "properties": { "method": { "type": "string", - "enum": ["Standard"], - "default": "Standard" + "enum": ["Internal Staging"], + "default": "Internal Staging" } } }, diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinatiomAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinatiomAcceptanceTest.java new file mode 100644 index 0000000000000..ec01f9ada001f --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinatiomAcceptanceTest.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.snowflake; + +import static io.airbyte.integrations.destination.snowflake.SnowflakeDestination.isInternalStaging; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.nio.file.Path; + +public class SnowflakeInternalStagingDestinatiomAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest { + + public JsonNode getStaticConfig() { + final JsonNode internalStagingConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/internal_staging_config.json"))); + Preconditions.checkArgument(!SnowflakeDestination.isS3Copy(internalStagingConfig)); + Preconditions.checkArgument(!SnowflakeDestination.isGcsCopy(internalStagingConfig)); + Preconditions.checkArgument(isInternalStaging(internalStagingConfig)); + return internalStagingConfig; + } + +} diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 41a633cc2763a..c1134e421811a 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -148,6 +148,10 @@ Therefore, Airbyte Snowflake destination will create tables and schemas using th By default, Airbyte uses batches of `INSERT` commands to add data to a temporary table before copying it over to the final table in Snowflake. This is too slow for larger/multi-GB replications. For those larger replications we recommend configuring using cloud storage to allow batch writes and loading. +### Internal Staging + +Internal named stages are storage location objects within a Snowflake database/schema. Because they are database objects, the same security permissions apply as with any other database objects. No need to provide additional properties for internal staging + ### AWS S3 For AWS S3, you will need to create a bucket and provide credentials to access the bucket. We recommend creating a bucket that is only used for Airbyte to stage data to Snowflake. Airbyte needs read/write access to interact with this bucket. @@ -190,6 +194,7 @@ Finally, you need to add read/write permissions to your bucket with that email. | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.3.18 | 2021-11-26 | [#8253](https://github.com/airbytehq/airbyte/pull/8253) | Snowflake Internal Staging Support | | 0.3.17 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | | 0.3.15 | 2021-10-11 | [#6949](https://github.com/airbytehq/airbyte/pull/6949) | Each stream was split into files of 10,000 records each for copying using S3 or GCS | | 0.3.14 | 2021-09-08 | [#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table |