Skip to content

Commit

Permalink
Deprecate PART_SIZE_MB in connectors using S3/GCS storage (#13753)
Browse files Browse the repository at this point in the history
* Removed part_size from connectors that use StreamTransferManager

* fixed S3DestinationConfigTest

* fixed S3JsonlFormatConfigTest

* upadate changelog and bump version

* auto-bump connector version

* auto-bump connector version

* auto-bump connector version

* auto-bump connector version

* upadate changelog and bump version for Redshift and Snowflake destinations

* auto-bump connector version

* fix GCS staging test

* fix GCS staging test

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
VitaliiMaltsev and octavia-squidington-iii committed Jun 20, 2022
1 parent c5783aa commit c283d9d
Show file tree
Hide file tree
Showing 58 changed files with 118 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.1.8
dockerImageTag: 1.1.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand All @@ -40,7 +40,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 1.1.8
dockerImageTag: 1.1.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down Expand Up @@ -100,7 +100,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.2.7
dockerImageTag: 0.2.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
resourceRequirements:
Expand Down Expand Up @@ -225,7 +225,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.39
dockerImageTag: 0.3.40
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand All @@ -244,7 +244,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.7
dockerImageTag: 0.3.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand All @@ -264,7 +264,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.28
dockerImageTag: 0.4.29
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
resourceRequirements:
Expand Down
107 changes: 10 additions & 97 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.1.8"
- dockerImage: "airbyte/destination-bigquery:1.1.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -438,19 +438,6 @@
examples:
- "data_sync/test"
order: 3
part_size_mb:
title: "Block Size (MB) for GCS Multipart Upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes more\
\ memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
minimum: 5
maximum: 525
examples:
- 5
order: 4
keep_files_in_gcs-bucket:
type: "string"
description: "This upload method is supposed to temporary store records\
Expand All @@ -462,7 +449,7 @@
enum:
- "Delete all tmp files from GCS"
- "Keep all tmp files in GCS"
order: 5
order: 4
credentials_json:
type: "string"
description: "The contents of the JSON service account key. Check out the\
Expand Down Expand Up @@ -510,7 +497,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.1.8"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.1.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -627,19 +614,6 @@
examples:
- "data_sync/test"
order: 3
part_size_mb:
title: "Block Size (MB) for GCS Multipart Upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes more\
\ memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
minimum: 5
maximum: 525
examples:
- 5
order: 4
keep_files_in_gcs-bucket:
type: "string"
description: "This upload method is supposed to temporary store records\
Expand All @@ -651,7 +625,7 @@
enum:
- "Delete all tmp files from GCS"
- "Keep all tmp files in GCS"
order: 5
order: 4
credentials_json:
type: "string"
description: "The contents of the JSON service account key. Check out the\
Expand Down Expand Up @@ -1486,7 +1460,7 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-gcs:0.2.7"
- dockerImage: "airbyte/destination-gcs:0.2.8"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
connectionSpecification:
Expand Down Expand Up @@ -1720,16 +1694,6 @@
enum:
- "snappy"
default: "snappy"
part_size_mb:
title: "Block Size (MB) for GCS multipart upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes9\
\ more memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
examples:
- 5
- title: "CSV: Comma-Separated Values"
required:
- "format_type"
Expand All @@ -1748,16 +1712,6 @@
enum:
- "No flattening"
- "Root level flattening"
part_size_mb:
title: "Block Size (MB) for GCS multipart upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes9\
\ more memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
examples:
- 5
compression:
title: "Compression"
type: "object"
Expand Down Expand Up @@ -1792,16 +1746,6 @@
enum:
- "JSONL"
default: "JSONL"
part_size_mb:
title: "Block Size (MB) for GCS multipart upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes9\
\ more memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
examples:
- 5
compression:
title: "Compression"
type: "object"
Expand Down Expand Up @@ -3678,7 +3622,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.39"
- dockerImage: "airbyte/destination-redshift:0.3.40"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -3822,22 +3766,6 @@
\ key."
title: "S3 Access Key"
airbyte_secret: true
part_size:
type: "integer"
minimum: 10
maximum: 100
examples:
- "10"
description: "Increase this if syncing tables larger than 100GB. Only\
\ relevant for COPY. Files are streamed to S3 in parts. This determines\
\ the size of each part, in MBs. As S3 has a limit of 10,000 parts\
\ per file, part size affects the table size. This is 10MB by default,\
\ resulting in a default limit of 100GB tables. Note: a larger part\
\ size will result in larger memory requirements. A rule of thumb\
\ is to multiply the part size by 10 to get the memory requirement.\
\ Modify this with care. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=above%20key%20id.-,Part%20Size,-Affects%20the%20size\"\
,> docs</a> for details."
title: "Stream Part Size (Optional)"
purge_staging_data:
title: "Purge Staging Files and Tables (Optional)"
type: "boolean"
Expand Down Expand Up @@ -3895,7 +3823,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.7"
- dockerImage: "airbyte/destination-s3:0.3.8"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down Expand Up @@ -4314,7 +4242,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.28"
- dockerImage: "airbyte/destination-snowflake:0.4.29"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down Expand Up @@ -4546,36 +4474,21 @@
title: "S3 Access Key"
airbyte_secret: true
order: 4
part_size:
type: "integer"
default: 5
examples:
- 5
description: "Optional. Increase this if syncing tables larger than\
\ 100GB. Only relevant for COPY. Files are streamed to S3 in parts.\
\ This determines the size of each part, in MBs. As S3 has a limit\
\ of 10,000 parts per file, part size affects the table size. This\
\ is 10MB by default, resulting in a default limit of 100GB tables.\
\ Note, a larger part size will result in larger memory requirements.\
\ A rule of thumb is to multiply the part size by 10 to get the\
\ memory requirement. Modify this with care."
title: "Stream Part Size"
order: 5
purge_staging_data:
title: "Purge Staging Files and Tables"
type: "boolean"
description: "Whether to delete the staging files from S3 after completing\
\ the sync. See the docs for details. Only relevant for COPY. Defaults\
\ to true."
default: true
order: 6
order: 5
encryption:
title: "Encryption"
type: "object"
description: "How to encrypt the staging data"
default:
encryption_type: "none"
order: 7
order: 6
oneOf:
- title: "No encryption"
description: "Staging data will be stored in plaintext."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,6 @@
"examples": ["data_sync/test"],
"order": 3
},
"part_size_mb": {
"title": "Block Size (MB) for GCS Multipart Upload (Optional)",
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
"type": "integer",
"default": 5,
"minimum": 5,
"maximum": 525,
"examples": [5],
"order": 4
},
"keep_files_in_gcs-bucket": {
"type": "string",
"description": "This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.",
Expand All @@ -131,7 +121,7 @@
"Delete all tmp files from GCS",
"Keep all tmp files in GCS"
],
"order": 5
"order": 4
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ protected JsonNode createConfig() throws IOException {
.put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING)
.put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME))
.put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis())
.put(BigQueryConsts.PART_SIZE, gcsConfigFromSecretFile.get(BigQueryConsts.PART_SIZE))
.put(BigQueryConsts.CREDENTIAL, credential)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class BigQueryConsts {
public static final String FORMAT = "format";
public static final String KEEP_GCS_FILES = "keep_files_in_gcs-bucket";
public static final String KEEP_GCS_FILES_VAL = "Keep all tmp files in GCS";
public static final String PART_SIZE = "part_size_mb";

public static final String NAMESPACE_PREFIX = "n";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ public static JsonNode getGcsJsonNodeConfig(final JsonNode config) {
.put(BigQueryConsts.CREDENTIAL, loadingMethod.get(BigQueryConsts.CREDENTIAL))
.put(BigQueryConsts.FORMAT, Jsons.deserialize("{\n"
+ " \"format_type\": \"CSV\",\n"
+ " \"flattening\": \"No flattening\",\n"
+ " \"part_size_mb\": \"" + loadingMethod.get(BigQueryConsts.PART_SIZE) + "\"\n"
+ " \"flattening\": \"No flattening\"\n"
+ "}"))
.build());

Expand All @@ -165,8 +164,7 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) {
.put(BigQueryConsts.CREDENTIAL, loadingMethod.get(BigQueryConsts.CREDENTIAL))
.put(BigQueryConsts.FORMAT, Jsons.deserialize("{\n"
+ " \"format_type\": \"AVRO\",\n"
+ " \"flattening\": \"No flattening\",\n"
+ " \"part_size_mb\": \"" + loadingMethod.get(BigQueryConsts.PART_SIZE) + "\"\n"
+ " \"flattening\": \"No flattening\"\n"
+ "}"))
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,6 @@
"examples": ["data_sync/test"],
"order": 3
},
"part_size_mb": {
"title": "Block Size (MB) for GCS Multipart Upload (Optional)",
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
"type": "integer",
"default": 5,
"minimum": 5,
"maximum": 525,
"examples": [5],
"order": 4
},
"keep_files_in_gcs-bucket": {
"type": "string",
"description": "This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.",
Expand All @@ -168,7 +158,7 @@
"Delete all tmp files from GCS",
"Keep all tmp files in GCS"
],
"order": 5
"order": 4
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
.put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING)
.put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME))
.put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis())
.put(BigQueryConsts.PART_SIZE, gcsConfigFromSecretFile.get(BigQueryConsts.PART_SIZE))
.put(BigQueryConsts.CREDENTIAL, credential)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,5 @@ protected void compareObjects(JsonNode expectedObject, JsonNode actualObject) {
JsonNode actualJsonNode = (actualObject.isTextual() ? Jsons.deserialize(actualObject.textValue()) : actualObject);
super.compareObjects(expectedObject, actualJsonNode);
}

}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/destination-gcs
Loading

0 comments on commit c283d9d

Please sign in to comment.