From 744e0d5f1305fe5069ed85a54ee9d1e5e27eab9b Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 11 Mar 2022 15:29:12 +0100 Subject: [PATCH] Refactor Snowflake internal Staging as a base class for other staging classes (#10865) * Refactor Snowflake internal Staging as model to share staging abilities in jdbc destinations --- .../destination/ExtendedNameTransformer.java | 4 - .../NamingConventionTransformer.java | 4 + .../destination/StandardNameTransformer.java | 8 +- .../destination.py.hbs | 12 +-- .../destination-python/setup.py | 4 +- .../ClickhouseSQLNameTransformer.java | 2 +- .../databricks/DatabricksNameTransformer.java | 2 +- .../destination/jdbc/WriteConfig.java | 34 +++++++ .../staging/StagingConsumerFactory.java} | 92 +++++++++---------- .../staging/StagingOperations.java | 43 +++++++++ .../MariadbColumnstoreNameTransformer.java | 2 +- .../mongodb/MongodbNameTransformer.java | 2 +- .../mssql/MSSQLNameTransformer.java | 2 +- .../mysql/MySQLNameTransformer.java | 2 +- .../oracle/OracleNameTransformer.java | 2 +- .../postgres/PostgresSQLNameTransformer.java | 2 +- .../rockset/RocksetSQLNameTransformer.java | 2 +- .../destination/s3/S3DestinationConfig.java | 30 +++++- .../s3/avro/AvroNameTransformer.java | 2 +- .../destination.py | 12 +-- .../setup.py | 4 +- .../destination-snowflake/README.md | 2 +- .../SnowflakeInternalStagingDestination.java | 28 +++--- ...nowflakeInternalStagingSqlOperations.java} | 40 +++++++- .../SnowflakeSQLNameTransformer.java | 10 +- .../snowflake/SnowflakeDestinationTest.java | 12 ++- .../source-hubspot/source_hubspot/streams.py | 2 +- .../connectors/source-recurly/.python-version | 1 - .../integration_tests/configured_catalog.json | 2 +- .../source_recurly/schemas/accounts.json | 82 ++++++++--------- .../source_recurly/schemas/subscriptions.json | 4 +- .../source_recurly/schemas/transactions.json | 6 +- .../source-recurly/source_recurly/spec.json | 1 - .../integration_tests/state.json | 2 +- .../source-shopify/source_shopify/source.py | 4 +- .../SnowflakeSourceOperations.java | 1 - .../workers/DefaultReplicationWorker.java | 5 +- 37 files changed, 298 insertions(+), 171 deletions(-) rename airbyte-integrations/connectors/{destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java => destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java} (71%) create mode 100644 airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingOperations.java rename airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/{SnowflakeStagingSqlOperations.java => SnowflakeInternalStagingSqlOperations.java} (65%) delete mode 100644 airbyte-integrations/connectors/source-recurly/.python-version diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/ExtendedNameTransformer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/ExtendedNameTransformer.java index b278af6e5a0eae..6af8c0bb97fae7 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/ExtendedNameTransformer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/ExtendedNameTransformer.java @@ -27,10 +27,6 @@ protected String disabled_convertStreamName(final String input) { } } - protected String applyDefaultCase(final String input) { - return input; - } - protected boolean useExtendedIdentifiers(final String input) { boolean result = false; if (input.matches("[^\\p{Alpha}_].*")) { diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/NamingConventionTransformer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/NamingConventionTransformer.java index 636bafe86a5cd5..1069f6f4c2b249 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/NamingConventionTransformer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/NamingConventionTransformer.java @@ -45,4 +45,8 @@ public interface NamingConventionTransformer { @Deprecated String getTmpTableName(String name); + String convertStreamName(final String input); + + String applyDefaultCase(final String input); + } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/StandardNameTransformer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/StandardNameTransformer.java index 41659b5415c6ce..9175ee5e11b2a3 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/StandardNameTransformer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/StandardNameTransformer.java @@ -32,10 +32,16 @@ public String getTmpTableName(final String streamName) { return convertStreamName(Strings.addRandomSuffix("_airbyte_tmp", "_", 3) + "_" + streamName); } - protected String convertStreamName(final String input) { + @Override + public String convertStreamName(final String input) { return Names.toAlphanumericAndUnderscore(input); } + @Override + public String applyDefaultCase(final String input) { + return input; + } + /** * Rebuild a JsonNode adding sanitized property names (a subset of special characters replaced by * underscores) while keeping original property names too. This is needed by some destinations as diff --git a/airbyte-integrations/connector-templates/destination-python/destination_{{snakeCase name}}/destination.py.hbs b/airbyte-integrations/connector-templates/destination-python/destination_{{snakeCase name}}/destination.py.hbs index 0a12fa316e9e08..fc7a7f95566d98 100644 --- a/airbyte-integrations/connector-templates/destination-python/destination_{{snakeCase name}}/destination.py.hbs +++ b/airbyte-integrations/connector-templates/destination-python/destination_{{snakeCase name}}/destination.py.hbs @@ -3,19 +3,16 @@ # -from typing import Mapping, Any, Iterable +from typing import Any, Iterable, Mapping from airbyte_cdk import AirbyteLogger from airbyte_cdk.destinations import Destination -from airbyte_cdk.models import AirbyteConnectionStatus, ConfiguredAirbyteCatalog, AirbyteMessage, Status +from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status class Destination{{properCase name}}(Destination): def write( - self, - config: Mapping[str, Any], - configured_catalog: ConfiguredAirbyteCatalog, - input_messages: Iterable[AirbyteMessage] + self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] ) -> Iterable[AirbyteMessage]: """ @@ -54,6 +51,3 @@ class Destination{{properCase name}}(Destination): return AirbyteConnectionStatus(status=Status.SUCCEEDED) except Exception as e: return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}") - - - diff --git a/airbyte-integrations/connector-templates/destination-python/setup.py b/airbyte-integrations/connector-templates/destination-python/setup.py index 95d9de4b70ba1b..7dbabcc48bcb6a 100644 --- a/airbyte-integrations/connector-templates/destination-python/setup.py +++ b/airbyte-integrations/connector-templates/destination-python/setup.py @@ -9,9 +9,7 @@ "airbyte-cdk", ] -TEST_REQUIREMENTS = [ - "pytest~=6.1" -] +TEST_REQUIREMENTS = ["pytest~=6.1"] setup( name="destination_{{snakeCase name}}", diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSQLNameTransformer.java b/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSQLNameTransformer.java index fca0dc91e4131a..c5e277c5e28e97 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSQLNameTransformer.java @@ -9,7 +9,7 @@ public class ClickhouseSQLNameTransformer extends ExtendedNameTransformer { @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return input.toLowerCase(); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java index dd901f986916d5..a648c0f77ad96c 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java @@ -29,7 +29,7 @@ public String getRawTableName(final String streamName) { } @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return input.toLowerCase(); } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/WriteConfig.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/WriteConfig.java index 4bbe64bc51329f..ef0bbfbf37157b 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/WriteConfig.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/WriteConfig.java @@ -5,6 +5,10 @@ package io.airbyte.integrations.destination.jdbc; import io.airbyte.protocol.models.DestinationSyncMode; +import java.util.ArrayList; +import java.util.List; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; /** * Write configuration POJO for all destinations extending {@link AbstractJdbcDestination}. @@ -19,6 +23,8 @@ public class WriteConfig { private final String tmpTableName; private final String outputTableName; private final DestinationSyncMode syncMode; + private final DateTime writeDatetime; + private final List stagedFiles; public WriteConfig(final String streamName, final String namespace, @@ -26,12 +32,24 @@ public WriteConfig(final String streamName, final String tmpTableName, final String outputTableName, final DestinationSyncMode syncMode) { + this(streamName, namespace, outputSchemaName, tmpTableName, outputTableName, syncMode, DateTime.now(DateTimeZone.UTC)); + } + + public WriteConfig(final String streamName, + final String namespace, + final String outputSchemaName, + final String tmpTableName, + final String outputTableName, + final DestinationSyncMode syncMode, + final DateTime writeDatetime) { this.streamName = streamName; this.namespace = namespace; this.outputSchemaName = outputSchemaName; this.tmpTableName = tmpTableName; this.outputTableName = outputTableName; this.syncMode = syncMode; + this.stagedFiles = new ArrayList<>(); + this.writeDatetime = writeDatetime; } public String getStreamName() { @@ -58,6 +76,22 @@ public DestinationSyncMode getSyncMode() { return syncMode; } + public DateTime getWriteDatetime() { + return writeDatetime; + } + + public List getStagedFiles() { + return stagedFiles; + } + + public void addStagedFile(final String file) { + stagedFiles.add(file); + } + + public void clearStagedFiles() { + stagedFiles.clear(); + } + @Override public String toString() { return "WriteConfig{" + diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java similarity index 71% rename from airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java rename to airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java index 79df935ad7128a..ebdc0c0d23564e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java @@ -2,7 +2,7 @@ * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.snowflake; +package io.airbyte.integrations.destination.staging; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; @@ -16,7 +16,6 @@ import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction; import io.airbyte.integrations.destination.buffered_stream_consumer.RecordWriter; -import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.WriteConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteStream; @@ -30,38 +29,40 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Snowflake Internal Staging consists of 4 main parts - * - * CREATE STAGE @TEMP_STAGE_NAME -- Creates a new named internal stage to use for loading data from - * files into Snowflake tables and unloading data from tables into files PUT - * file://local/ @TEMP_STAGE_NAME. --JDBC Driver will upload the files into stage - * COPY FROM @TEMP_STAGE_NAME -- Loads data from staged files to an existing table. - * DROP @TEMP_STAGE_NAME -- Drop temporary stage after sync - */ -public class SnowflakeInternalStagingConsumerFactory { +public class StagingConsumerFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class); + private static final Logger LOGGER = LoggerFactory.getLogger(StagingConsumerFactory.class); private static final long MAX_BATCH_SIZE_BYTES = 128 * 1024 * 1024; // 128mb - private final String CURRENT_SYNC_PATH = UUID.randomUUID().toString(); + private final DateTime CURRENT_SYNC_PATH = DateTime.now(DateTimeZone.UTC); + // using a random string here as a placeholder for the moment. + // This would avoid mixing data in the staging area between different syncs (especially if they + // manipulate streams with similar names) + // if we replaced the random connection id by the actual connection_id, we'd gain the opportunity to + // leverage data that was uploaded to stage + // in a previous attempt but failed to load to the warehouse for some reason (interrupted?) instead. + // This would also allow other programs/scripts + // to load (or reload backups?) in the connection's staging area to be loaded at the next sync. + private final String RANDOM_CONNECTION_ID = UUID.randomUUID().toString(); public AirbyteMessageConsumer create(final Consumer outputRecordCollector, final JdbcDatabase database, - final SnowflakeStagingSqlOperations sqlOperations, - final SnowflakeSQLNameTransformer namingResolver, + final StagingOperations sqlOperations, + final NamingConventionTransformer namingResolver, final JsonNode config, final ConfiguredAirbyteCatalog catalog) { final List writeConfigs = createWriteConfigs(namingResolver, config, catalog); return new BufferedStreamConsumer( outputRecordCollector, - onStartFunction(database, sqlOperations, writeConfigs, namingResolver), - recordWriterFunction(database, sqlOperations, writeConfigs, catalog, namingResolver), - onCloseFunction(database, sqlOperations, writeConfigs, namingResolver), + onStartFunction(database, sqlOperations, writeConfigs), + recordWriterFunction(database, sqlOperations, writeConfigs, catalog), + onCloseFunction(database, sqlOperations, writeConfigs), catalog, sqlOperations::isValidData, MAX_BATCH_SIZE_BYTES); @@ -74,8 +75,7 @@ private static List createWriteConfigs(final NamingConventionTransf return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(Collectors.toList()); } - private static Function toWriteConfig( - final NamingConventionTransformer namingResolver, + private static Function toWriteConfig(final NamingConventionTransformer namingResolver, final JsonNode config) { return stream -> { Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode"); @@ -104,9 +104,8 @@ private static String getOutputSchema(final AirbyteStream stream, } private static OnStartFunction onStartFunction(final JdbcDatabase database, - final SnowflakeStagingSqlOperations snowflakeSqlOperations, - final List writeConfigs, - final SnowflakeSQLNameTransformer namingResolver) { + final StagingOperations stagingOperations, + final List writeConfigs) { return () -> { LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size()); @@ -114,16 +113,16 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database, final String schema = writeConfig.getOutputSchemaName(); final String stream = writeConfig.getStreamName(); final String tmpTable = writeConfig.getTmpTableName(); - final String stage = namingResolver.getStageName(schema, writeConfig.getOutputTableName()); + final String stage = stagingOperations.getStageName(schema, writeConfig.getOutputTableName()); LOGGER.info("Preparing stage in destination started for schema {} stream {}: tmp table: {}, stage: {}", schema, stream, tmpTable, stage); AirbyteSentry.executeWithTracing("PrepareStreamStage", () -> { - snowflakeSqlOperations.createSchemaIfNotExists(database, schema); - snowflakeSqlOperations.createTableIfNotExists(database, schema, tmpTable); - snowflakeSqlOperations.createStageIfNotExists(database, stage); + stagingOperations.createSchemaIfNotExists(database, schema); + stagingOperations.createTableIfNotExists(database, schema, tmpTable); + stagingOperations.createStageIfNotExists(database, stage); }, Map.of("schema", schema, "stream", stream, "tmpTable", tmpTable, "stage", stage)); @@ -139,14 +138,13 @@ private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteCon } private RecordWriter recordWriterFunction(final JdbcDatabase database, - final SqlOperations snowflakeSqlOperations, + final StagingOperations stagingOperations, final List writeConfigs, - final ConfiguredAirbyteCatalog catalog, - final SnowflakeSQLNameTransformer namingResolver) { + final ConfiguredAirbyteCatalog catalog) { final Map pairToWriteConfig = writeConfigs.stream() .collect(Collectors.toUnmodifiableMap( - SnowflakeInternalStagingConsumerFactory::toNameNamespacePair, Function.identity())); + StagingConsumerFactory::toNameNamespacePair, Function.identity())); return (pair, records) -> { if (!pairToWriteConfig.containsKey(pair)) { @@ -157,16 +155,14 @@ private RecordWriter recordWriterFunction(final JdbcDatabase database, final WriteConfig writeConfig = pairToWriteConfig.get(pair); final String schemaName = writeConfig.getOutputSchemaName(); final String tableName = writeConfig.getOutputTableName(); - final String path = namingResolver.getStagingPath(schemaName, tableName, CURRENT_SYNC_PATH); - - snowflakeSqlOperations.insertRecords(database, records, schemaName, path); + final String path = stagingOperations.getStagingPath(RANDOM_CONNECTION_ID, schemaName, tableName, CURRENT_SYNC_PATH); + stagingOperations.insertRecords(database, records, schemaName, path); }; } private OnCloseFunction onCloseFunction(final JdbcDatabase database, - final SnowflakeStagingSqlOperations sqlOperations, - final List writeConfigs, - final SnowflakeSQLNameTransformer namingResolver) { + final StagingOperations stagingOperations, + final List writeConfigs) { return (hasFailed) -> { if (!hasFailed) { final List queryList = new ArrayList<>(); @@ -177,29 +173,29 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database, final String streamName = writeConfig.getStreamName(); final String srcTableName = writeConfig.getTmpTableName(); final String dstTableName = writeConfig.getOutputTableName(); - final String path = namingResolver.getStagingPath(schemaName, dstTableName, CURRENT_SYNC_PATH); + final String path = stagingOperations.getStagingPath(RANDOM_CONNECTION_ID, schemaName, dstTableName, CURRENT_SYNC_PATH); LOGGER.info("Finalizing stream {}. schema {}, tmp table {}, final table {}, stage path {}", streamName, schemaName, srcTableName, dstTableName, path); try { - sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName); + stagingOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName); } catch (final Exception e) { - sqlOperations.cleanUpStage(database, path); + stagingOperations.cleanUpStage(database, path); LOGGER.info("Cleaning stage path {}", path); throw new RuntimeException("Failed to upload data from stage " + path, e); } - sqlOperations.createTableIfNotExists(database, schemaName, dstTableName); + stagingOperations.createTableIfNotExists(database, schemaName, dstTableName); switch (writeConfig.getSyncMode()) { - case OVERWRITE -> queryList.add(sqlOperations.truncateTableQuery(database, schemaName, dstTableName)); + case OVERWRITE -> queryList.add(stagingOperations.truncateTableQuery(database, schemaName, dstTableName)); case APPEND, APPEND_DEDUP -> {} default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode()); } - queryList.add(sqlOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName)); + queryList.add(stagingOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName)); } LOGGER.info("Executing finalization of tables."); - sqlOperations.executeTransaction(database, queryList); + stagingOperations.executeTransaction(database, queryList); LOGGER.info("Finalizing tables in destination completed."); } LOGGER.info("Cleaning tmp tables in destination started for {} streams", writeConfigs.size()); @@ -209,12 +205,12 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database, LOGGER.info("Cleaning tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(), schemaName, tmpTableName); - sqlOperations.dropTableIfExists(database, schemaName, tmpTableName); + stagingOperations.dropTableIfExists(database, schemaName, tmpTableName); final String outputTableName = writeConfig.getOutputTableName(); - final String stageName = namingResolver.getStageName(schemaName, outputTableName); + final String stageName = stagingOperations.getStageName(schemaName, outputTableName); LOGGER.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName, stageName); - sqlOperations.dropStageIfExists(database, stageName); + stagingOperations.dropStageIfExists(database, stageName); } LOGGER.info("Cleaning tmp tables and stages in destination completed."); }; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingOperations.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingOperations.java new file mode 100644 index 00000000000000..f0ee658a7a76e2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingOperations.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.staging; + +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import java.io.File; +import org.joda.time.DateTime; + +public interface StagingOperations extends SqlOperations { + + String getStageName(String schemaName, String tableName); + + String getStagingPath(String connectionId, String schemaName, String tableName, DateTime writeDatetime); + + /** + * Create a staging folder where to upload temporary files before loading into the final destination + */ + void createStageIfNotExists(JdbcDatabase database, String stage) throws Exception; + + /** + * Upload the data file into the stage area.* + */ + void uploadRecordsToStage(JdbcDatabase database, File dataFile, String schemaName, String path) throws Exception; + + /** + * Load the data stored in the stage area into a temporary table in the destination + */ + void copyIntoTmpTableFromStage(JdbcDatabase database, String path, String srcTableName, String schemaName) throws Exception; + + /** + * Remove files that were just staged + */ + void cleanUpStage(JdbcDatabase database, String path) throws Exception; + + /** + * Delete the stage area and all staged files that was in it + */ + void dropStageIfExists(JdbcDatabase database, String stageName) throws Exception; + +} diff --git a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreNameTransformer.java b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreNameTransformer.java index ca75718e9548e7..c6f058e4f9ffa5 100644 --- a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreNameTransformer.java +++ b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreNameTransformer.java @@ -14,7 +14,7 @@ public String getIdentifier(final String name) { } @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return input.toLowerCase(); } diff --git a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbNameTransformer.java b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbNameTransformer.java index ba9e8e8967df79..e2caaa5c6d2c59 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbNameTransformer.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbNameTransformer.java @@ -11,7 +11,7 @@ public class MongodbNameTransformer extends ExtendedNameTransformer { @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return input.toLowerCase(); } diff --git a/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLNameTransformer.java b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLNameTransformer.java index e62facf6d07fd1..2702b7ce057bdb 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLNameTransformer.java @@ -9,7 +9,7 @@ public class MSSQLNameTransformer extends ExtendedNameTransformer { @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return input.toUpperCase(); } diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java index 5ae0f9a9a9d927..9f9edbbf6043f5 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java @@ -57,7 +57,7 @@ static String truncateName(final String name, final int maxLength) { } @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return input.toLowerCase(); } diff --git a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleNameTransformer.java b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleNameTransformer.java index b567548ee5c344..7f43f613c2c464 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleNameTransformer.java +++ b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleNameTransformer.java @@ -12,7 +12,7 @@ public class OracleNameTransformer extends ExtendedNameTransformer { @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return input.toUpperCase(); } diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSQLNameTransformer.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSQLNameTransformer.java index 2dad4cad969255..27d71218f152b5 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSQLNameTransformer.java @@ -9,7 +9,7 @@ public class PostgresSQLNameTransformer extends ExtendedNameTransformer { @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return input.toLowerCase(); } diff --git a/airbyte-integrations/connectors/destination-rockset/src/main/java/io/airbyte/integrations/destination/rockset/RocksetSQLNameTransformer.java b/airbyte-integrations/connectors/destination-rockset/src/main/java/io/airbyte/integrations/destination/rockset/RocksetSQLNameTransformer.java index 206d14549d964b..2edf60143c6aae 100644 --- a/airbyte-integrations/connectors/destination-rockset/src/main/java/io/airbyte/integrations/destination/rockset/RocksetSQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-rockset/src/main/java/io/airbyte/integrations/destination/rockset/RocksetSQLNameTransformer.java @@ -14,7 +14,7 @@ public String convertStreamName(String input) { } @Override - protected String applyDefaultCase(String input) { + public String applyDefaultCase(String input) { return input.toLowerCase(); } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index 626aa9eb638694..82d8c9c4a1fa83 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -14,6 +14,8 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.fasterxml.jackson.databind.JsonNode; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An S3 configuration. Typical usage sets at most one of {@code bucketPath} (necessary for more @@ -22,6 +24,8 @@ */ public class S3DestinationConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(S3DestinationConfig.class); + // The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. // WARNING: Too large a part size can cause potential OOM errors. @@ -36,6 +40,9 @@ public class S3DestinationConfig { private final Integer partSize; private final S3FormatConfig formatConfig; + private final Object lock = new Object(); + private AmazonS3 s3Client = null; + /** * The part size should not matter in any use case that depends on this constructor. So the default * 10 MB is used. @@ -127,6 +134,27 @@ public S3FormatConfig getFormatConfig() { } public AmazonS3 getS3Client() { + synchronized (lock) { + if (s3Client == null) { + return resetS3Client(); + } + return s3Client; + } + } + + public AmazonS3 resetS3Client() { + synchronized (lock) { + if (s3Client != null) { + s3Client.shutdown(); + } + s3Client = createS3Client(); + return s3Client; + } + } + + protected AmazonS3 createS3Client() { + LOGGER.info("Creating S3 client..."); + final AWSCredentials awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); if (accessKeyId.isEmpty() && !secretAccessKey.isEmpty() @@ -134,7 +162,7 @@ public AmazonS3 getS3Client() { throw new RuntimeException("Either both accessKeyId and secretAccessKey should be provided, or neither"); } - if (accessKeyId.isEmpty() && secretAccessKey.isEmpty()) { + if (accessKeyId.isEmpty()) { return AmazonS3ClientBuilder.standard() .withCredentials(new InstanceProfileCredentialsProvider(false)) .build(); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformer.java index c39152b1a4fb41..d356e177eaa3b5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformer.java @@ -9,7 +9,7 @@ public class AvroNameTransformer extends ExtendedNameTransformer { @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return super.convertStreamName(input).toLowerCase(); } diff --git a/airbyte-integrations/connectors/destination-scaffold-destination-python/destination_scaffold_destination_python/destination.py b/airbyte-integrations/connectors/destination-scaffold-destination-python/destination_scaffold_destination_python/destination.py index 276f9861bbd385..0705ec76e08162 100644 --- a/airbyte-integrations/connectors/destination-scaffold-destination-python/destination_scaffold_destination_python/destination.py +++ b/airbyte-integrations/connectors/destination-scaffold-destination-python/destination_scaffold_destination_python/destination.py @@ -3,19 +3,16 @@ # -from typing import Mapping, Any, Iterable +from typing import Any, Iterable, Mapping from airbyte_cdk import AirbyteLogger from airbyte_cdk.destinations import Destination -from airbyte_cdk.models import AirbyteConnectionStatus, ConfiguredAirbyteCatalog, AirbyteMessage, Status +from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status class DestinationScaffoldDestinationPython(Destination): def write( - self, - config: Mapping[str, Any], - configured_catalog: ConfiguredAirbyteCatalog, - input_messages: Iterable[AirbyteMessage] + self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] ) -> Iterable[AirbyteMessage]: """ @@ -54,6 +51,3 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn return AirbyteConnectionStatus(status=Status.SUCCEEDED) except Exception as e: return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}") - - - diff --git a/airbyte-integrations/connectors/destination-scaffold-destination-python/setup.py b/airbyte-integrations/connectors/destination-scaffold-destination-python/setup.py index b51d89f7f091ad..4e2dbe001f9547 100644 --- a/airbyte-integrations/connectors/destination-scaffold-destination-python/setup.py +++ b/airbyte-integrations/connectors/destination-scaffold-destination-python/setup.py @@ -9,9 +9,7 @@ "airbyte-cdk", ] -TEST_REQUIREMENTS = [ - "pytest~=6.1" -] +TEST_REQUIREMENTS = ["pytest~=6.1"] setup( name="destination_scaffold_destination_python", diff --git a/airbyte-integrations/connectors/destination-snowflake/README.md b/airbyte-integrations/connectors/destination-snowflake/README.md index e48eaa79fc85ff..b46d0d13ddee9e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/README.md +++ b/airbyte-integrations/connectors/destination-snowflake/README.md @@ -19,7 +19,7 @@ ``` ## For Airbyte employees -Put the contents of the `Snowflake Integration Test Config` secret on Rippling under the `Engineering` folder into `secrets/config.json` to be able to run integration tests locally. +Put the contents of the `Snowflake Integration Test Config` secret on LastPass under the `Engineering` folder into `secrets/config.json` to be able to run integration tests locally. 1. Put the contents of the `destination snowflake - insert test creds` LastPass secret into `secrets/insert_config.json`. 1. Put the contents of the `destination snowflake - insert staging test creds` secret into `internal_staging_config.json`. diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index 33897c854f362a..f0ee2f98978cf6 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -10,7 +10,9 @@ import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.sentry.AirbyteSentry; +import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -26,19 +28,23 @@ public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class); public SnowflakeInternalStagingDestination() { - super("", new SnowflakeSQLNameTransformer(), new SnowflakeStagingSqlOperations()); + this(new SnowflakeSQLNameTransformer()); + } + + public SnowflakeInternalStagingDestination(final NamingConventionTransformer nameTransformer) { + super("", nameTransformer, new SnowflakeInternalStagingSqlOperations(nameTransformer)); } @Override public AirbyteConnectionStatus check(final JsonNode config) { - final SnowflakeSQLNameTransformer nameTransformer = new SnowflakeSQLNameTransformer(); - final SnowflakeStagingSqlOperations snowflakeStagingSqlOperations = new SnowflakeStagingSqlOperations(); + final NamingConventionTransformer nameTransformer = getNamingResolver(); + final SnowflakeInternalStagingSqlOperations snowflakeInternalStagingSqlOperations = new SnowflakeInternalStagingSqlOperations(nameTransformer); try (final JdbcDatabase database = getDatabase(config)) { - final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText()); + final String outputSchema = nameTransformer.getIdentifier(config.get("schema").asText()); AirbyteSentry.executeWithTracing("CreateAndDropTable", - () -> attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeStagingSqlOperations)); + () -> attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations)); AirbyteSentry.executeWithTracing("CreateAndDropStage", - () -> attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeStagingSqlOperations)); + () -> attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations)); return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); } catch (final Exception e) { LOGGER.error("Exception while checking connection: ", e); @@ -50,13 +56,13 @@ public AirbyteConnectionStatus check(final JsonNode config) { private static void attemptSQLCreateAndDropStages(final String outputSchema, final JdbcDatabase database, - final SnowflakeSQLNameTransformer namingResolver, - final SnowflakeStagingSqlOperations sqlOperations) + final NamingConventionTransformer namingResolver, + final SnowflakeInternalStagingSqlOperations sqlOperations) throws Exception { // verify we have permissions to create/drop stage final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "")); - final String stageName = namingResolver.getStageName(outputSchema, outputTableName); + final String stageName = sqlOperations.getStageName(outputSchema, outputTableName); sqlOperations.createStageIfNotExists(database, stageName); sqlOperations.dropStageIfExists(database, stageName); } @@ -81,8 +87,8 @@ public JsonNode toJdbcConfig(final JsonNode config) { public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { - return new SnowflakeInternalStagingConsumerFactory().create(outputRecordCollector, getDatabase(config), - new SnowflakeStagingSqlOperations(), new SnowflakeSQLNameTransformer(), config, catalog); + return new StagingConsumerFactory().create(outputRecordCollector, getDatabase(config), + new SnowflakeInternalStagingSqlOperations(getNamingResolver()), getNamingResolver(), config, catalog); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java similarity index 65% rename from airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java rename to airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java index 6ef030b42c5b6f..b9344077ec5dff 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java @@ -6,7 +6,8 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.sentry.AirbyteSentry; -import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.NamingConventionTransformer; +import io.airbyte.integrations.destination.staging.StagingOperations; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.File; import java.nio.file.Files; @@ -14,12 +15,43 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.commons.lang3.NotImplementedException; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SnowflakeStagingSqlOperations extends SnowflakeSqlOperations implements SqlOperations { +public class SnowflakeInternalStagingSqlOperations extends SnowflakeSqlOperations implements StagingOperations { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class); + private final NamingConventionTransformer nameTransformer; + + public SnowflakeInternalStagingSqlOperations(final NamingConventionTransformer nameTransformer) { + this.nameTransformer = nameTransformer; + } + + @Override + public String getStageName(final String namespace, final String streamName) { + return nameTransformer.applyDefaultCase(String.join("_", + nameTransformer.convertStreamName(namespace), + nameTransformer.convertStreamName(streamName))); + } + + @Override + public String getStagingPath(final String connectionId, final String namespace, final String streamName, final DateTime writeDatetime) { + // see https://docs.snowflake.com/en/user-guide/data-load-considerations-stage.html + return nameTransformer.applyDefaultCase(String.format("%s/%s/%02d/%02d/%02d/%s/", + getStageName(namespace, streamName), + writeDatetime.year().get(), + writeDatetime.monthOfYear().get(), + writeDatetime.dayOfMonth().get(), + writeDatetime.hourOfDay().get(), + connectionId)); + } + + @Override + public void uploadRecordsToStage(final JdbcDatabase database, final File dataFile, final String schemaName, final String path) throws Exception { + throw new NotImplementedException("placeholder function is not implemented yet"); + } @Override public void insertRecordsInternal(final JdbcDatabase database, @@ -46,6 +78,7 @@ private void loadDataIntoStage(final JdbcDatabase database, final String stage, Files.delete(tempFile.toPath()); } + @Override public void createStageIfNotExists(final JdbcDatabase database, final String stageName) throws SQLException { final String query = "CREATE STAGE IF NOT EXISTS %s encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');"; AirbyteSentry.executeWithTracing("CreateStageIfNotExists", @@ -53,6 +86,7 @@ public void createStageIfNotExists(final JdbcDatabase database, final String sta Map.of("stage", stageName)); } + @Override public void copyIntoTmpTableFromStage(final JdbcDatabase database, final String stageName, final String dstTableName, final String schemaName) throws SQLException { final String query = "COPY INTO %s.%s FROM @%s file_format = " + @@ -62,12 +96,14 @@ public void copyIntoTmpTableFromStage(final JdbcDatabase database, final String Map.of("schema", schemaName, "stage", stageName, "table", dstTableName)); } + @Override public void dropStageIfExists(final JdbcDatabase database, final String stageName) throws SQLException { AirbyteSentry.executeWithTracing("DropStageIfExists", () -> database.execute(String.format("DROP STAGE IF EXISTS %s;", stageName)), Map.of("stage", stageName)); } + @Override public void cleanUpStage(final JdbcDatabase database, final String path) throws SQLException { AirbyteSentry.executeWithTracing("CleanStage", () -> database.execute(String.format("REMOVE @%s;", path)), diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java index 373c3aa0998308..cea4bf0b88ea59 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java @@ -9,16 +9,8 @@ public class SnowflakeSQLNameTransformer extends ExtendedNameTransformer { @Override - protected String applyDefaultCase(final String input) { + public String applyDefaultCase(final String input) { return input.toUpperCase(); } - public String getStageName(String schemaName, String outputTableName) { - return schemaName.concat(outputTableName).replaceAll("-", "_").toUpperCase(); - } - - public String getStagingPath(String schemaName, String tableName, String currentSyncPath) { - return (getStageName(schemaName, tableName) + "/staged/" + currentSyncPath).toUpperCase(); - } - } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java index ecf3b2d688f73f..74497672f5a09a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java @@ -13,6 +13,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -23,6 +24,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.CatalogHelpers; @@ -82,17 +84,19 @@ public void useInsertStrategyTest() { @Test public void testCleanupStageOnFailure() throws Exception { - JdbcDatabase mockDb = mock(JdbcDatabase.class); - SnowflakeStagingSqlOperations sqlOperations = mock(SnowflakeStagingSqlOperations.class); + final JdbcDatabase mockDb = mock(JdbcDatabase.class); + final SnowflakeInternalStagingSqlOperations sqlOperations = mock(SnowflakeInternalStagingSqlOperations.class); + when(sqlOperations.getStageName(anyString(), anyString())).thenReturn("stage_name"); + when(sqlOperations.getStagingPath(anyString(), anyString(), anyString(), any())).thenReturn("staging_path"); final var testMessages = generateTestMessages(); final JsonNode config = Jsons.deserialize(MoreResources.readResource("insert_config.json"), JsonNode.class); - AirbyteMessageConsumer airbyteMessageConsumer = new SnowflakeInternalStagingConsumerFactory() + final AirbyteMessageConsumer airbyteMessageConsumer = new StagingConsumerFactory() .create(Destination::defaultOutputRecordCollector, mockDb, sqlOperations, new SnowflakeSQLNameTransformer(), config, getCatalog()); doThrow(SQLException.class).when(sqlOperations).copyIntoTmpTableFromStage(any(), anyString(), anyString(), anyString()); airbyteMessageConsumer.start(); - for (AirbyteMessage m : testMessages) { + for (final AirbyteMessage m : testMessages) { airbyteMessageConsumer.accept(m); } assertThrows(RuntimeException.class, airbyteMessageConsumer::close); diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index b1a5c281163773..4a0e1ebf1f30f5 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -848,7 +848,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, payload["after"] = int(response["paging"]["next"]["after"]) return {"params": params, "payload": payload} - + def stream_slices( self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: diff --git a/airbyte-integrations/connectors/source-recurly/.python-version b/airbyte-integrations/connectors/source-recurly/.python-version deleted file mode 100644 index b13d4f55c6812d..00000000000000 --- a/airbyte-integrations/connectors/source-recurly/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.9.7/envs/airbyte-recurly diff --git a/airbyte-integrations/connectors/source-recurly/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-recurly/integration_tests/configured_catalog.json index aeedc323d89889..75bcfeaf58adb2 100644 --- a/airbyte-integrations/connectors/source-recurly/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-recurly/integration_tests/configured_catalog.json @@ -84,7 +84,7 @@ "sync_mode": "incremental", "destination_sync_mode": "overwrite" }, - { + { "stream": { "name": "export_dates", "json_schema": {}, diff --git a/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/accounts.json b/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/accounts.json index dec8c3de5f14b2..013509268cf468 100644 --- a/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/accounts.json +++ b/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/accounts.json @@ -62,48 +62,48 @@ "maxLength": 30 }, "address": { - "type": "object", - "properties": { - "phone": { - "type": "string", - "title": "Phone number", - "maxLength": 256 - }, - "street1": { - "type": "string", - "title": "Street 1", - "maxLength": 256 - }, - "street2": { - "type": "string", - "title": "Street 2", - "maxLength": 256 - }, - "city": { - "type": "string", - "title": "City", - "maxLength": 256 - }, - "region": { - "type": "string", - "title": "State/Province", - "description": "State or province.", - "maxLength": 256 - }, - "postal_code": { - "type": "string", - "title": "Zip/Postal code", - "description": "Zip or postal code.", - "maxLength": 256 - }, - "country": { - "type": "string", - "title": "Country", - "description": "Country, 2-letter ISO 3166-1 alpha-2 code.", - "maxLength": 2 - } - } + "type": "object", + "properties": { + "phone": { + "type": "string", + "title": "Phone number", + "maxLength": 256 }, + "street1": { + "type": "string", + "title": "Street 1", + "maxLength": 256 + }, + "street2": { + "type": "string", + "title": "Street 2", + "maxLength": 256 + }, + "city": { + "type": "string", + "title": "City", + "maxLength": 256 + }, + "region": { + "type": "string", + "title": "State/Province", + "description": "State or province.", + "maxLength": 256 + }, + "postal_code": { + "type": "string", + "title": "Zip/Postal code", + "description": "Zip or postal code.", + "maxLength": 256 + }, + "country": { + "type": "string", + "title": "Country", + "description": "Country, 2-letter ISO 3166-1 alpha-2 code.", + "maxLength": 2 + } + } + }, "custom_fields": { "type": ["null", "array"] }, diff --git a/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/subscriptions.json b/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/subscriptions.json index 071d937eee1132..eeed47064a7cc4 100644 --- a/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/subscriptions.json +++ b/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/subscriptions.json @@ -213,13 +213,13 @@ "type": "string", "title": "Type", "description": "Provides the tax type as \"vat\" for EU VAT, \"usst\" for U.S. Sales Tax, or the 2 letter country code for country level tax types like Canada, Australia, New Zealand, Israel, and all non-EU European countries.", - "maxLength": 256 + "maxLength": 256 }, "region": { "type": "string", "title": "Region", "description": "Provides the tax region applied on an invoice. For U.S. Sales Tax, this will be the 2 letter state code. For EU VAT this will be the 2 letter country code. For all country level tax types, this will display the regional tax, like VAT, GST, or PST.", - "maxLength": 256 + "maxLength": 256 }, "rate": { "type": "number", diff --git a/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/transactions.json b/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/transactions.json index ba53386cc2ba16..660e28842f2f95 100644 --- a/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/transactions.json +++ b/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/transactions.json @@ -190,15 +190,15 @@ }, "ip_address_v4": { "type": ["null", "string"], - "maxLength": 256 + "maxLength": 256 }, "ip_address_country": { "type": ["null", "string"], - "maxLength": 256 + "maxLength": 256 }, "status_code": { "type": ["null", "string"], - "maxLength": 256 + "maxLength": 256 }, "status_message": { "type": ["null", "string"], diff --git a/airbyte-integrations/connectors/source-recurly/source_recurly/spec.json b/airbyte-integrations/connectors/source-recurly/source_recurly/spec.json index 18a30248383eb6..02a427f7378b45 100644 --- a/airbyte-integrations/connectors/source-recurly/source_recurly/spec.json +++ b/airbyte-integrations/connectors/source-recurly/source_recurly/spec.json @@ -31,4 +31,3 @@ } } } - diff --git a/airbyte-integrations/connectors/source-shopify/integration_tests/state.json b/airbyte-integrations/connectors/source-shopify/integration_tests/state.json index 82d854aa6c9d71..263c4268de836c 100644 --- a/airbyte-integrations/connectors/source-shopify/integration_tests/state.json +++ b/airbyte-integrations/connectors/source-shopify/integration_tests/state.json @@ -78,4 +78,4 @@ "updated_at": "2022-03-03T03:47:46-08:00" } } -} \ No newline at end of file +} diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/source.py b/airbyte-integrations/connectors/source-shopify/source_shopify/source.py index 0e691ac67fb6aa..1e804a1dc14d2e 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/source.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/source.py @@ -189,8 +189,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite Reading the parent stream for slices with structure: EXAMPLE: for given nested_record as `id` of Orders, - Outputs: - [ + Outputs: + [ {slice_key: 123}, {slice_key: 456}, {...}, diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java index ad9da09a969eaa..c5be4ffb929e9d 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java @@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.db.jdbc.JdbcSourceOperations; - import java.math.BigDecimal; import java.sql.PreparedStatement; import java.sql.ResultSet; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java index 66aee63c62df35..8f644d8b6a5fca 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -297,11 +298,11 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, recordsRead += 1; if (recordsRead % 1000 == 0) { - LOGGER.info("Records read: {}", recordsRead); + LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); } } } - LOGGER.info("Total records read: {}", recordsRead); + LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); try { destination.notifyEndOfStream(); } catch (final Exception e) {