From b45014da6b315b87bc151b8bee633816820b2904 Mon Sep 17 00:00:00 2001 From: Alexander Tsukanov Date: Fri, 10 Jun 2022 14:47:03 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Destination=20Redshift:=20Add=20?= =?UTF-8?q?"Loading=20Method"=20option=20to=20Redshift=20Destination=20spe?= =?UTF-8?q?c=20and=20UI=20(#13415)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * airbyte-12709 Add "Loading Method" option to Redshift Destination spec. * airbyte-12709: Fixed unit tests. * airbyte-12709: Updated README.md. * airbyte-12709: change the number of PR in redshift.md. * airbyte-12709: Added backward compatibility for old json schema. * airbyte-12709: Fix PR comments. * airbyte-12709: Removed throwing an exception. Fixed PR comments. * airbyte-12709: Bump the airbyte version. * auto-bump connector version Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 205 ++++++++++-------- .../destination-redshift/Dockerfile | 2 +- .../connectors/destination-redshift/README.md | 4 + .../redshift/RedshiftDestination.java | 49 ++--- .../RedshiftStagingS3Destination.java | 9 +- .../RedshiftDestinationConstants.java | 12 + .../redshift/validator/RedshiftUtil.java | 30 +++ .../src/main/resources/spec.json | 166 ++++++++------ ...dshiftInsertDestinationAcceptanceTest.java | 15 +- ...tagingInsertDestinationAcceptanceTest.java | 2 +- ...iftStagingS3DestinationAcceptanceTest.java | 5 +- .../redshift/RedshiftDestinationTest.java | 46 +++- docs/integrations/destinations/redshift.md | 51 ++--- 14 files changed, 353 insertions(+), 245 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/constants/RedshiftDestinationConstants.java create mode 100644 airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index ca4326fe335233..f266a149952f12 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -225,7 +225,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.37 + dockerImageTag: 0.3.39 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 1c91c36da49f37..f572379bc13035 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3678,7 +3678,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.37" +- dockerImage: "airbyte/destination-redshift:0.3.39" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift" connectionSpecification: @@ -3730,94 +3730,121 @@ - "public" default: "public" title: "Default Schema" - s3_bucket_name: - title: "S3 Bucket Name (Optional)" - type: "string" - description: "The name of the staging S3 bucket to use if utilising a COPY\ - \ strategy. COPY is recommended for production workloads for better speed\ - \ and scalability. See AWS docs for more details." - examples: - - "airbyte.staging" - s3_bucket_path: - title: "S3 Bucket Path (Optional)" - type: "string" - description: "The directory under the S3 bucket where data will be written.\ - \ If not provided, then defaults to the root directory. See path's name recommendations for more details." - examples: - - "data_sync/test" - s3_bucket_region: - title: "S3 Bucket Region (Optional)" - type: "string" - default: "" - description: "The region of the S3 staging bucket to use if utilising a\ - \ COPY strategy. See AWS docs for details." - enum: - - "" - - "us-east-1" - - "us-east-2" - - "us-west-1" - - "us-west-2" - - "af-south-1" - - "ap-east-1" - - "ap-south-1" - - "ap-northeast-1" - - "ap-northeast-2" - - "ap-northeast-3" - - "ap-southeast-1" - - "ap-southeast-2" - - "ca-central-1" - - "cn-north-1" - - "cn-northwest-1" - - "eu-central-1" - - "eu-north-1" - - "eu-south-1" - - "eu-west-1" - - "eu-west-2" - - "eu-west-3" - - "sa-east-1" - - "me-south-1" - access_key_id: - type: "string" - description: "This ID grants access to the above S3 staging bucket. Airbyte\ - \ requires Read and Write permissions to the given bucket. See AWS docs on how to generate an access key ID and secret access key." - title: "S3 Key Id (Optional)" - airbyte_secret: true - secret_access_key: - type: "string" - description: "The corresponding secret to the above access key id. See AWS docs on how to generate an access key ID and secret access key." - title: "S3 Access Key (Optional)" - airbyte_secret: true - part_size: - type: "integer" - minimum: 10 - maximum: 100 - examples: - - "10" - description: "Increase this if syncing tables larger than 100GB. Only relevant\ - \ for COPY. Files are streamed to S3 in parts. This determines the size\ - \ of each part, in MBs. As S3 has a limit of 10,000 parts per file, part\ - \ size affects the table size. This is 10MB by default, resulting in a\ - \ default limit of 100GB tables. Note: a larger part size will result\ - \ in larger memory requirements. A rule of thumb is to multiply the part\ - \ size by 10 to get the memory requirement. Modify this with care. See\ - \ docs for details." - title: "Stream Part Size (Optional)" - purge_staging_data: - title: "Purge Staging Files and Tables (Optional)" - type: "boolean" - description: "Whether to delete the staging files from S3 after completing\ - \ the sync. See docs for details." - default: true + uploading_method: + title: "Uploading Method" + type: "object" + description: "The method how the data will be uploaded to the database." + oneOf: + - title: "Standard" + additionalProperties: false + required: + - "method" + properties: + method: + type: "string" + const: "Standard" + - title: "S3 Staging" + additionalProperties: false + required: + - "method" + - "s3_bucket_name" + - "s3_bucket_region" + - "access_key_id" + - "secret_access_key" + properties: + method: + type: "string" + const: "S3 Staging" + s3_bucket_name: + title: "S3 Bucket Name" + type: "string" + description: "The name of the staging S3 bucket to use if utilising\ + \ a COPY strategy. COPY is recommended for production workloads\ + \ for better speed and scalability. See AWS docs for more details." + examples: + - "airbyte.staging" + s3_bucket_path: + title: "S3 Bucket Path (Optional)" + type: "string" + description: "The directory under the S3 bucket where data will be\ + \ written. If not provided, then defaults to the root directory.\ + \ See path's name recommendations for more details." + examples: + - "data_sync/test" + s3_bucket_region: + title: "S3 Bucket Region" + type: "string" + default: "" + description: "The region of the S3 staging bucket to use if utilising\ + \ a COPY strategy. See AWS docs for details." + enum: + - "" + - "us-east-1" + - "us-east-2" + - "us-west-1" + - "us-west-2" + - "af-south-1" + - "ap-east-1" + - "ap-south-1" + - "ap-northeast-1" + - "ap-northeast-2" + - "ap-northeast-3" + - "ap-southeast-1" + - "ap-southeast-2" + - "ca-central-1" + - "cn-north-1" + - "cn-northwest-1" + - "eu-central-1" + - "eu-north-1" + - "eu-south-1" + - "eu-west-1" + - "eu-west-2" + - "eu-west-3" + - "sa-east-1" + - "me-south-1" + access_key_id: + type: "string" + description: "This ID grants access to the above S3 staging bucket.\ + \ Airbyte requires Read and Write permissions to the given bucket.\ + \ See AWS docs on how to generate an access key ID and secret access\ + \ key." + title: "S3 Key Id" + airbyte_secret: true + secret_access_key: + type: "string" + description: "The corresponding secret to the above access key id.\ + \ See AWS docs on how to generate an access key ID and secret access\ + \ key." + title: "S3 Access Key" + airbyte_secret: true + part_size: + type: "integer" + minimum: 10 + maximum: 100 + examples: + - "10" + description: "Increase this if syncing tables larger than 100GB. Only\ + \ relevant for COPY. Files are streamed to S3 in parts. This determines\ + \ the size of each part, in MBs. As S3 has a limit of 10,000 parts\ + \ per file, part size affects the table size. This is 10MB by default,\ + \ resulting in a default limit of 100GB tables. Note: a larger part\ + \ size will result in larger memory requirements. A rule of thumb\ + \ is to multiply the part size by 10 to get the memory requirement.\ + \ Modify this with care. See docs for details." + title: "Stream Part Size (Optional)" + purge_staging_data: + title: "Purge Staging Files and Tables (Optional)" + type: "boolean" + description: "Whether to delete the staging files from S3 after completing\ + \ the sync. See docs for details." + default: true supportsIncremental: true supportsNormalization: true supportsDBT: true diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index a6bf71cbe1ebd5..3e1528f9eaff1a 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.37 +LABEL io.airbyte.version=0.3.39 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/README.md b/airbyte-integrations/connectors/destination-redshift/README.md index a24d4a65117efa..0c7c6b73cc4717 100644 --- a/airbyte-integrations/connectors/destination-redshift/README.md +++ b/airbyte-integrations/connectors/destination-redshift/README.md @@ -17,4 +17,8 @@ redshift.connString= redshift.user= redshift.pass= ``` +## Actual secrets +The actual secrets for integration tests could be found in Google Secrets Manager. It could be found by next labels: +- SECRET_DESTINATION-REDSHIFT__CREDS - used for Standard tests. (__config.json__) +- SECRET_DESTINATION-REDSHIFT_STAGING__CREDS - used for S3 Staging tests. (__config_staging.json__) diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java index d13742cb05319e..c52884b61c94ee 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java @@ -4,6 +4,9 @@ package io.airbyte.integrations.destination.redshift; +import static io.airbyte.integrations.destination.redshift.validator.RedshiftUtil.findS3Options; +import static io.airbyte.integrations.destination.redshift.validator.RedshiftUtil.anyOfS3FieldsAreNullOrEmpty; + import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; @@ -13,17 +16,20 @@ import org.slf4j.LoggerFactory; /** - * The Redshift Destination offers two replication strategies. The first inserts via a typical SQL - * Insert statement. Although less efficient, this requires less user set up. See - * {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to - * an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for - * production workloads, but does require users to set up an S3 bucket and pass in additional - * credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the - * given arguments to determine which strategy to use. + * The Redshift Destination offers two replication strategies. The first inserts via a typical SQL Insert statement. Although less efficient, this requires less user set up. See {@link + * RedshiftInsertDestination} for more detail. The second inserts via streaming the data to an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for production + * workloads, but does require users to set up an S3 bucket and pass in additional credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the given arguments to + * determine which strategy to use. */ public class RedshiftDestination extends SwitchingDestination { private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestination.class); + private static final String METHOD = "method"; + + private static final Map destinationMap = Map.of( + DestinationType.STANDARD, new RedshiftInsertDestination(), + DestinationType.COPY_S3, new RedshiftStagingS3Destination() + ); enum DestinationType { STANDARD, @@ -31,36 +37,22 @@ enum DestinationType { } public RedshiftDestination() { - super(DestinationType.class, RedshiftDestination::getTypeFromConfig, getTypeToDestination()); + super(DestinationType.class, RedshiftDestination::getTypeFromConfig, destinationMap); } - public static DestinationType getTypeFromConfig(final JsonNode config) { + private static DestinationType getTypeFromConfig(final JsonNode config) { return determineUploadMode(config); } - public static Map getTypeToDestination() { - return Map.of( - DestinationType.STANDARD, new RedshiftInsertDestination(), - DestinationType.COPY_S3, new RedshiftStagingS3Destination()); - } - public static DestinationType determineUploadMode(final JsonNode config) { - final var bucketNode = config.get("s3_bucket_name"); - final var regionNode = config.get("s3_bucket_region"); - final var accessKeyIdNode = config.get("access_key_id"); - final var secretAccessKeyNode = config.get("secret_access_key"); - if (isNullOrEmpty(bucketNode) && isNullOrEmpty(regionNode) && isNullOrEmpty(accessKeyIdNode) - && isNullOrEmpty(secretAccessKeyNode)) { + final JsonNode jsonNode = findS3Options(config); + + if (anyOfS3FieldsAreNullOrEmpty(jsonNode)) { LOGGER.warn("The \"standard\" upload mode is not performant, and is not recommended for production. " + "Please use the Amazon S3 upload mode if you are syncing a large amount of data."); return DestinationType.STANDARD; } - - if (isNullOrEmpty(bucketNode) && isNullOrEmpty(regionNode) && isNullOrEmpty(accessKeyIdNode) - && isNullOrEmpty(secretAccessKeyNode)) { - throw new RuntimeException("Error: Partially missing S3 Configuration."); - } return DestinationType.COPY_S3; } @@ -70,9 +62,4 @@ public static void main(final String[] args) throws Exception { new IntegrationRunner(destination).run(args); LOGGER.info("completed destination: {}", RedshiftDestination.class); } - - private static boolean isNullOrEmpty(JsonNode jsonNode) { - return jsonNode == null || jsonNode.asText().equals(""); - } - } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index 63a30f615a399a..91609c5019dc88 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -9,6 +9,7 @@ import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS; import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.USERNAME; import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcConfig; +import static io.airbyte.integrations.destination.redshift.validator.RedshiftUtil.findS3Options; import static io.airbyte.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig; import com.fasterxml.jackson.databind.JsonNode; @@ -48,7 +49,7 @@ public RedshiftStagingS3Destination() { @Override public AirbyteConnectionStatus check(final JsonNode config) { - final S3DestinationConfig s3Config = getS3DestinationConfig(config); + final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config)); S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, ""); final NamingConventionTransformer nameTransformer = getNamingResolver(); @@ -104,9 +105,9 @@ public JsonNode toJdbcConfig(final JsonNode config) { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { - final S3DestinationConfig s3Config = getS3DestinationConfig(config); + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { + final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config)); return new StagingConsumerFactory().create( outputRecordCollector, getDatabase(getDataSource(config)), diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/constants/RedshiftDestinationConstants.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/constants/RedshiftDestinationConstants.java new file mode 100644 index 00000000000000..9fbe512f0acc22 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/constants/RedshiftDestinationConstants.java @@ -0,0 +1,12 @@ +package io.airbyte.integrations.destination.redshift.constants; + +/** + * Constant holder for Redshift Destination + */ +public class RedshiftDestinationConstants { + + private RedshiftDestinationConstants() { + } + + public static final String UPLOADING_METHOD = "uploading_method"; +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java new file mode 100644 index 00000000000000..29f52847e1d590 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java @@ -0,0 +1,30 @@ +package io.airbyte.integrations.destination.redshift.validator; + +import static io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants.UPLOADING_METHOD; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * Helper class for Destination Redshift connector. + */ +public class RedshiftUtil { + + private RedshiftUtil() { + } + + // We check whether config located in root of node. (This check is done for Backward compatibility) + public static JsonNode findS3Options(final JsonNode config) { + return config.has(UPLOADING_METHOD) ? config.get(UPLOADING_METHOD) : config; + } + + public static boolean anyOfS3FieldsAreNullOrEmpty(final JsonNode jsonNode) { + return isNullOrEmpty(jsonNode.get("s3_bucket_name")) + && isNullOrEmpty(jsonNode.get("s3_bucket_region")) + && isNullOrEmpty(jsonNode.get("access_key_id")) + && isNullOrEmpty(jsonNode.get("secret_access_key")); + } + + private static boolean isNullOrEmpty(final JsonNode jsonNode) { + return null == jsonNode || "".equals(jsonNode.asText()); + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index 243259955ddf86..d70c27665cc7b7 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -48,76 +48,106 @@ "default": "public", "title": "Default Schema" }, - "s3_bucket_name": { - "title": "S3 Bucket Name (Optional)", - "type": "string", - "description": "The name of the staging S3 bucket to use if utilising a COPY strategy. COPY is recommended for production workloads for better speed and scalability. See AWS docs for more details.", - "examples": ["airbyte.staging"] - }, - "s3_bucket_path": { - "title": "S3 Bucket Path (Optional)", - "type": "string", - "description": "The directory under the S3 bucket where data will be written. If not provided, then defaults to the root directory. See path's name recommendations for more details.", - "examples": ["data_sync/test"] - }, - "s3_bucket_region": { - "title": "S3 Bucket Region (Optional)", - "type": "string", - "default": "", - "description": "The region of the S3 staging bucket to use if utilising a COPY strategy. See AWS docs for details.", - "enum": [ - "", - "us-east-1", - "us-east-2", - "us-west-1", - "us-west-2", - "af-south-1", - "ap-east-1", - "ap-south-1", - "ap-northeast-1", - "ap-northeast-2", - "ap-northeast-3", - "ap-southeast-1", - "ap-southeast-2", - "ca-central-1", - "cn-north-1", - "cn-northwest-1", - "eu-central-1", - "eu-north-1", - "eu-south-1", - "eu-west-1", - "eu-west-2", - "eu-west-3", - "sa-east-1", - "me-south-1" + "uploading_method": { + "title": "Uploading Method", + "type": "object", + "description": "The method how the data will be uploaded to the database.", + "oneOf": [ + { + "title": "Standard", + "additionalProperties": false, + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "Standard" + } + } + }, + { + "title": "S3 Staging", + "additionalProperties": false, + "required": ["method", "s3_bucket_name", "s3_bucket_region", "access_key_id", "secret_access_key"], + "properties": { + "method": { + "type": "string", + "const": "S3 Staging" + }, + "s3_bucket_name": { + "title": "S3 Bucket Name", + "type": "string", + "description": "The name of the staging S3 bucket to use if utilising a COPY strategy. COPY is recommended for production workloads for better speed and scalability. See AWS docs for more details.", + "examples": ["airbyte.staging"] + }, + "s3_bucket_path": { + "title": "S3 Bucket Path (Optional)", + "type": "string", + "description": "The directory under the S3 bucket where data will be written. If not provided, then defaults to the root directory. See path's name recommendations for more details.", + "examples": ["data_sync/test"] + }, + "s3_bucket_region": { + "title": "S3 Bucket Region", + "type": "string", + "default": "", + "description": "The region of the S3 staging bucket to use if utilising a COPY strategy. See AWS docs for details.", + "enum": [ + "", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1" + ] + }, + "access_key_id": { + "type": "string", + "description": "This ID grants access to the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket. See AWS docs on how to generate an access key ID and secret access key.", + "title": "S3 Key Id", + "airbyte_secret": true + }, + "secret_access_key": { + "type": "string", + "description": "The corresponding secret to the above access key id. See AWS docs on how to generate an access key ID and secret access key.", + "title": "S3 Access Key", + "airbyte_secret": true + }, + "part_size": { + "type": "integer", + "minimum": 10, + "maximum": 100, + "examples": ["10"], + "description": "Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note: a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care. See docs for details.", + "title": "Stream Part Size (Optional)" + }, + "purge_staging_data": { + "title": "Purge Staging Files and Tables (Optional)", + "type": "boolean", + "description": "Whether to delete the staging files from S3 after completing the sync. See docs for details.", + "default": true + } + } + } ] - }, - "access_key_id": { - "type": "string", - "description": "This ID grants access to the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket. See AWS docs on how to generate an access key ID and secret access key.", - "title": "S3 Key Id (Optional)", - "airbyte_secret": true - }, - "secret_access_key": { - "type": "string", - "description": "The corresponding secret to the above access key id. See AWS docs on how to generate an access key ID and secret access key.", - "title": "S3 Access Key (Optional)", - "airbyte_secret": true - }, - "part_size": { - "type": "integer", - "minimum": 10, - "maximum": 100, - "examples": ["10"], - "description": "Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note: a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care. See docs for details.", - "title": "Stream Part Size (Optional)" - }, - "purge_staging_data": { - "title": "Purge Staging Files and Tables (Optional)", - "type": "boolean", - "description": "Whether to delete the staging files from S3 after completing the sync. See docs for details.", - "default": true } } } } + diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java index 10bdf80c1fd4d1..c699438ce8b539 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java @@ -25,6 +25,8 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.DestinationSyncMode; import io.airbyte.protocol.models.JsonSchemaType; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; import java.time.Instant; @@ -63,17 +65,8 @@ class RedshiftInsertDestinationAcceptanceTest extends RedshiftStagingS3Destinati private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); - public JsonNode getStaticConfig() { - return removeStagingConfigurationFromRedshift(Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")))); - } - - public static JsonNode removeStagingConfigurationFromRedshift(final JsonNode config) { - final var original = (ObjectNode) Jsons.clone(config); - original.remove("s3_bucket_name"); - original.remove("s3_bucket_region"); - original.remove("access_key_id"); - original.remove("secret_access_key"); - return original; + public JsonNode getStaticConfig() throws IOException { + return Jsons.deserialize(Files.readString(Path.of("secrets/config.json"))); } void setup() { diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java index 673b4a9574011d..41589fd55cd631 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java @@ -59,7 +59,7 @@ public class RedshiftS3StagingInsertDestinationAcceptanceTest extends RedshiftSt .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); public JsonNode getStaticConfig() { - return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json"))); + return Jsons.deserialize(IOs.readFile(Path.of("secrets/config_staging.json"))); } void setup() { diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java index 5bda7cfec80c26..cada9007db361e 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java @@ -17,6 +17,7 @@ import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; +import java.io.IOException; import java.nio.file.Path; import java.sql.SQLException; import java.util.List; @@ -52,8 +53,8 @@ protected JsonNode getConfig() { return config; } - public JsonNode getStaticConfig() { - return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json"))); + public JsonNode getStaticConfig() throws IOException { + return Jsons.deserialize(IOs.readFile(Path.of("secrets/config_staging.json"))); } @Override diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java index 58d1e53fc9f6d4..700e8c7d0f37ee 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java @@ -18,22 +18,44 @@ public class RedshiftDestinationTest { private static final ObjectMapper mapper = MoreMappers.initMapper(); @Test - @DisplayName("When given S3 credentials should use COPY with SUPER Datatype") - public void useS3Staging() { - final var stubConfig = mapper.createObjectNode(); - stubConfig.put("s3_bucket_name", "fake-bucket"); - stubConfig.put("s3_bucket_region", "fake-region"); - stubConfig.put("access_key_id", "test"); - stubConfig.put("secret_access_key", "test key"); + @DisplayName("When not given S3 credentials should use INSERT") + public void useStandardInsert() { + final var standardInsertConfigStub = mapper.createObjectNode(); + standardInsertConfigStub.put("method", "Standard"); + final var uploadingMethodStub = mapper.createObjectNode(); + uploadingMethodStub.set("uploading_method", standardInsertConfigStub); + assertEquals(DestinationType.STANDARD, RedshiftDestination.determineUploadMode(uploadingMethodStub)); + } - assertEquals(DestinationType.COPY_S3, RedshiftDestination.determineUploadMode(stubConfig)); + @Test + @DisplayName("When given standard backward compatibility test") + public void useStandardInsertBackwardCompatibility() { + final var standardInsertConfigStub = mapper.createObjectNode(); + assertEquals(DestinationType.STANDARD, RedshiftDestination.determineUploadMode(standardInsertConfigStub)); } @Test - @DisplayName("When not given S3 credentials should use INSERT with SUPER Datatype") - public void useStandardInsert() { - final var stubConfig = mapper.createObjectNode(); - assertEquals(DestinationType.STANDARD, RedshiftDestination.determineUploadMode(stubConfig)); + @DisplayName("When given S3 credentials should use COPY") + public void useS3Staging() { + final var s3StagingStub = mapper.createObjectNode(); + final var uploadingMethodStub = mapper.createObjectNode(); + s3StagingStub.put("s3_bucket_name", "fake-bucket"); + s3StagingStub.put("s3_bucket_region", "fake-region"); + s3StagingStub.put("access_key_id", "test"); + s3StagingStub.put("secret_access_key", "test key"); + s3StagingStub.put("method", "S3 Staging"); + uploadingMethodStub.set("uploading_method", s3StagingStub); + assertEquals(DestinationType.COPY_S3, RedshiftDestination.determineUploadMode(uploadingMethodStub)); } + @Test + @DisplayName("When given S3 backward compatibility test") + public void useS3StagingBackwardCompatibility() { + final var s3StagingStub = mapper.createObjectNode(); + s3StagingStub.put("s3_bucket_name", "fake-bucket"); + s3StagingStub.put("s3_bucket_region", "fake-region"); + s3StagingStub.put("access_key_id", "test"); + s3StagingStub.put("secret_access_key", "test key"); + assertEquals(DestinationType.COPY_S3, RedshiftDestination.determineUploadMode(s3StagingStub)); + } } diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index c3d67c16d0ef3b..b53603ffb341f9 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -136,30 +136,31 @@ Each stream will be output into its own raw table in Redshift. Each table will c ## Changelog -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. | -| 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | -| 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type | -| 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check | -| 0.3.33 | 2022-05-04 | [12601](https://github.com/airbytehq/airbyte/pull/12601) | Apply buffering strategy for S3 staging | -| 0.3.32 | 2022-04-20 | [12085](https://github.com/airbytehq/airbyte/pull/12085) | Fixed bug with switching between INSERT and COPY config | -| 0.3.31 | 2022-04-19 | [\#12064](https://github.com/airbytehq/airbyte/pull/12064) | Added option to support SUPER datatype in _airbyte_raw_** table | -| 0.3.29 | 2022-04-05 | [11729](https://github.com/airbytehq/airbyte/pull/11729) | Fixed bug with dashes in schema name | | -| 0.3.28 | 2022-03-18 | [\#11254](https://github.com/airbytehq/airbyte/pull/11254) | Fixed missing records during S3 staging | -| 0.3.27 | 2022-02-25 | [10421](https://github.com/airbytehq/airbyte/pull/10421) | Refactor JDBC parameters handling | -| 0.3.25 | 2022-02-14 | [#9920](https://github.com/airbytehq/airbyte/pull/9920) | Updated the size of staging files for S3 staging. Also, added closure of S3 writers to staging files when data has been written to an staging file. | -| 0.3.24 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | -| 0.3.23 | 2021-12-16 | [\#8855](https://github.com/airbytehq/airbyte/pull/8855) | Add `purgeStagingData` option to enable/disable deleting the staging data | -| 0.3.22 | 2021-12-15 | [#8607](https://github.com/airbytehq/airbyte/pull/8607) | Accept a path for the staging data | -| 0.3.21 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management | -| 0.3.20 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | -| 0.3.19 | 2021-10-21 | [7234](https://github.com/airbytehq/airbyte/pull/7234) | Allow SSL traffic only | -| 0.3.17 | 2021-10-12 | [6965](https://github.com/airbytehq/airbyte/pull/6965) | Added SSL Support | -| 0.3.16 | 2021-10-11 | [6949](https://github.com/airbytehq/airbyte/pull/6949) | Each stream was split into files of 10,000 records each for copying using S3 or GCS | -| 0.3.14 | 2021-10-08 | [5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | -| 0.3.13 | 2021-09-02 | [5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance | -| 0.3.12 | 2021-07-21 | [3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs | -| 0.3.11 | 2021-07-20 | [4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec | +| Version | Date | Pull Request | Subject | +|:--------|:------------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method.
**PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. | +| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. | +| 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | +| 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type | +| 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check | +| 0.3.33 | 2022-05-04 | [12601](https://github.com/airbytehq/airbyte/pull/12601) | Apply buffering strategy for S3 staging | +| 0.3.32 | 2022-04-20 | [12085](https://github.com/airbytehq/airbyte/pull/12085) | Fixed bug with switching between INSERT and COPY config | +| 0.3.31 | 2022-04-19 | [\#12064](https://github.com/airbytehq/airbyte/pull/12064) | Added option to support SUPER datatype in _airbyte_raw_** table | +| 0.3.29 | 2022-04-05 | [11729](https://github.com/airbytehq/airbyte/pull/11729) | Fixed bug with dashes in schema name | | +| 0.3.28 | 2022-03-18 | [\#11254](https://github.com/airbytehq/airbyte/pull/11254) | Fixed missing records during S3 staging | +| 0.3.27 | 2022-02-25 | [10421](https://github.com/airbytehq/airbyte/pull/10421) | Refactor JDBC parameters handling | +| 0.3.25 | 2022-02-14 | [#9920](https://github.com/airbytehq/airbyte/pull/9920) | Updated the size of staging files for S3 staging. Also, added closure of S3 writers to staging files when data has been written to an staging file. | +| 0.3.24 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | +| 0.3.23 | 2021-12-16 | [\#8855](https://github.com/airbytehq/airbyte/pull/8855) | Add `purgeStagingData` option to enable/disable deleting the staging data | +| 0.3.22 | 2021-12-15 | [#8607](https://github.com/airbytehq/airbyte/pull/8607) | Accept a path for the staging data | +| 0.3.21 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management | +| 0.3.20 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | +| 0.3.19 | 2021-10-21 | [7234](https://github.com/airbytehq/airbyte/pull/7234) | Allow SSL traffic only | +| 0.3.17 | 2021-10-12 | [6965](https://github.com/airbytehq/airbyte/pull/6965) | Added SSL Support | +| 0.3.16 | 2021-10-11 | [6949](https://github.com/airbytehq/airbyte/pull/6949) | Each stream was split into files of 10,000 records each for copying using S3 or GCS | +| 0.3.14 | 2021-10-08 | [5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | +| 0.3.13 | 2021-09-02 | [5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance | +| 0.3.12 | 2021-07-21 | [3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs | +| 0.3.11 | 2021-07-20 | [4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec |