From 18043c8448615ba8007724fc8f8a73399816ab68 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Mon, 4 Apr 2022 14:18:14 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Make=20s3=20file=20name=20backwa?= =?UTF-8?q?rd=20compatible=20with=20<0.2.12=20(#11666)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix s3 file name --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 4 +- .../connectors/destination-s3/Dockerfile | 2 +- .../destination/s3/S3ConsumerFactory.java | 12 +++--- .../s3/S3DestinationConstants.java | 2 +- .../destination/s3/S3StorageOperations.java | 26 +++++++++--- .../destination/s3/WriteConfig.java | 8 ++++ .../src/main/resources/spec.json | 4 +- .../s3/S3DestinationAcceptanceTest.java | 3 +- .../s3/avro/AvroSerializedBufferTest.java | 2 +- docs/integrations/destinations/s3.md | 40 +++++++++++++++---- 11 files changed, 77 insertions(+), 28 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 1991011d72877..899f81358839e 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -203,7 +203,7 @@ - name: S3 destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.2.13 + dockerImageTag: 0.3.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 icon: s3.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 226a1a0ccb668..1304dbb58f8df 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3469,7 +3469,7 @@ supported_destination_sync_modes: - "append" - "overwrite" -- dockerImage: "airbyte/destination-s3:0.2.13" +- dockerImage: "airbyte/destination-s3:0.3.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3" connectionSpecification: @@ -3510,7 +3510,7 @@ \ bucket directory" type: "string" examples: - - "${NAMESPACE}/${STREAM_NAME}/" + - "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_" order: 3 s3_bucket_region: title: "S3 Bucket Region" diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index 852934d1f0680..560538d143e5e 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3 COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.13 +LABEL io.airbyte.version=0.3.0 LABEL io.airbyte.name=airbyte/destination-s3 diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java index c709e2ed547c2..4cc0f693866e5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java @@ -80,14 +80,14 @@ private static Function toWriteConfig( final AirbyteStream abStream = stream.getStream(); final String namespace = abStream.getNamespace(); final String streamName = abStream.getName(); - final String bucketPath = config.get(BUCKET_PATH_FIELD).asText(); + final String outputBucketPath = config.get(BUCKET_PATH_FIELD).asText(); final String customOutputFormat = String.join("/", - bucketPath, + outputBucketPath, config.has(PATH_FORMAT_FIELD) && !config.get(PATH_FORMAT_FIELD).asText().isBlank() ? config.get(PATH_FORMAT_FIELD).asText() : S3DestinationConstants.DEFAULT_PATH_FORMAT); - final String outputBucketPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat); + final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat); final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); - final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, syncMode); + final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, fullOutputPath, syncMode); LOGGER.info("Write config: {}", writeConfig); return writeConfig; }; @@ -139,7 +139,7 @@ private CheckedBiConsumer storedFiles; public WriteConfig(final String namespace, final String streamName, final String outputBucketPath, + final String fullOutputPath, final DestinationSyncMode syncMode) { this.namespace = namespace; this.streamName = streamName; this.outputBucketPath = outputBucketPath; + this.fullOutputPath = fullOutputPath; this.syncMode = syncMode; this.storedFiles = new ArrayList<>(); } @@ -42,6 +45,10 @@ public String getOutputBucketPath() { return outputBucketPath; } + public String getFullOutputPath() { + return fullOutputPath; + } + public DestinationSyncMode getSyncMode() { return syncMode; } @@ -64,6 +71,7 @@ public String toString() { "streamName=" + streamName + ", namespace=" + namespace + ", outputBucketPath=" + outputBucketPath + + ", fullOutputPath=" + fullOutputPath + ", syncMode=" + syncMode + '}'; } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json index e1bd828c2ab5e..5ad2611b98fb1 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json @@ -40,7 +40,9 @@ "s3_path_format": { "description": "Format string on how data will be organized inside the S3 bucket directory", "type": "string", - "examples": ["${NAMESPACE}/${STREAM_NAME}/"], + "examples": [ + "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_" + ], "order": 3 }, "s3_bucket_region": { diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java index 200f1ed9dd39d..e0f52d48934a5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java @@ -100,8 +100,9 @@ protected List getAllSyncedObjects(final String streamName, fin streamNameStr, DateTime.now(DateTimeZone.UTC), S3DestinationConstants.DEFAULT_PATH_FORMAT); + final String parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1); final List objectSummaries = s3Client - .listObjects(config.getBucketName(), outputPrefix) + .listObjects(config.getBucketName(), parentFolder) .getObjectSummaries() .stream() .filter(o -> o.getKey().contains(streamNameStr + "/")) diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java index 745aecd94d3d3..16694a4110ee8 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java @@ -55,7 +55,7 @@ public class AvroSerializedBufferTest { public void testSnappyAvroWriter() throws Exception { final S3AvroFormatConfig config = new S3AvroFormatConfig(Jsons.jsonNode(Map.of("compression_codec", Map.of( "codec", "snappy")))); - runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 970L, 980L, config, getExpectedString()); + runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 965L, 980L, config, getExpectedString()); } @Test diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index b96b2ab208cf6..8c4dc9f46c499 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -22,7 +22,7 @@ Check out common troubleshooting issues for the S3 destination connector on our | S3 Endpoint | string | URL to S3, If using AWS S3 just leave blank. | | S3 Bucket Name | string | Name of the bucket to sync data into. | | S3 Bucket Path | string | Subdirectory under the above bucket to sync the data into. | -| S3 Bucket Format | string | Additional subdirectories format under S3 Bucket Path. Default value is `${NAMESPACE}/${STREAM_NAME}/` and this can be further customized with variables such as `${YEAR}, ${MONTH}, ${DAY}, ${HOUR} etc` referring to the writing datetime. | +| S3 Bucket Format | string | Additional string format on how to store data under S3 Bucket Path. Default value is `${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_`. | | S3 Region | string | See [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions) for all region codes. | | Access Key ID | string | AWS/Minio credential. | | Secret Access Key | string | AWS/Minio credential. | @@ -30,20 +30,20 @@ Check out common troubleshooting issues for the S3 destination connector on our ⚠️ Please note that under "Full Refresh Sync" mode, data in the configured bucket and path will be wiped out before each sync. We recommend you to provision a dedicated S3 resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️ -The full path of the output data with S3 path format `${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}` is: +The full path of the output data with the default S3 path format `${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_` is: ```text -////. +///__. ``` For example: ```text -testing_bucket/data_output_path/public/users/2021_01_01/123e4567-e89b-12d3-a456-426614174000.csv.gz -↑ ↑ ↑ ↑ ↑ ↑ ↑ -| | | | | | format extension -| | | | | | -| | | | | uuid +testing_bucket/data_output_path/public/users/2021_01_01_1234567890_0.csv.gz +↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ +| | | | | | | format extension +| | | | | | unique incremental part id +| | | | | milliseconds since epoch | | | | upload date in YYYY_MM_DD | | | stream name | | source namespace (if it exists) @@ -51,6 +51,29 @@ testing_bucket/data_output_path/public/users/2021_01_01/123e4567-e89b-12d3-a456- bucket name ``` +The rationales behind this naming pattern are: + +1. Each stream has its own directory. +2. The data output files can be sorted by upload time. +3. The upload time composes of a date part and millis part so that it is both readable and unique. + +But it is possible to further customize by using the available variables to format the bucket path: +- `${NAMESPACE}`: Namespace where the stream comes from or configured by the connectionn namespace fields. +- `${STREAM_NAME}`: Name of the stream +- `${YEAR}`: Year in which the sync was writing the output data in. +- `${MONTH}`: Month in which the sync was writing the output data in. +- `${DAY}`: Day in which the sync was writing the output data in. +- `${HOUR}`: Hour in which the sync was writing the output data in. +- `${MINUTE}` : Minute in which the sync was writing the output data in. +- `${SECOND}`: Second in which the sync was writing the output data in. +- `${MILLISECOND}`: Millisecond in which the sync was writing the output data in. +- `${EPOCH}`: Milliseconds since Epoch in which the sync was writing the output data in. +- `${UUID}`: random uuid string + +Note: +- Multiple `/` characters in the S3 path are collapsed into a single `/` character. +- If the output bucket contains too many files, the part id variable is using a `UUID` instead. It uses sequential ID otherwise. + Please note that the stream name may contain a prefix, if it is configured on the connection. A data sync may create multiple files as the output files can be partitioned by size (targeting a size of 200MB compressed or lower) . @@ -228,6 +251,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | |:--------| :--- | :--- |:---------------------------------------------------------------------------------------------------------------------------| +| 0.3.0 | 2022-04-04 | [\#11666](https://github.com/airbytehq/airbyte/pull/11666) | 0.2.12 actually has breaking changes since files are compressed by default, this PR also fixes the naming to be more compatible with older versions. | | 0.2.13 | 2022-03-29 | [\#11496](https://github.com/airbytehq/airbyte/pull/11496) | Fix S3 bucket path to be included with S3 bucket format | | 0.2.12 | 2022-03-28 | [\#11294](https://github.com/airbytehq/airbyte/pull/11294) | Change to serialized buffering strategy to reduce memory consumption | | 0.2.11 | 2022-03-23 | [\#11173](https://github.com/airbytehq/airbyte/pull/11173) | Added support for AWS Glue crawler |