Skip to content

Commit

Permalink
🐛 Make s3 file name backward compatible with <0.2.12 (#11666)
Browse files Browse the repository at this point in the history
* fix s3 file name
  • Loading branch information
ChristopheDuong committed Apr 4, 2022
1 parent 291a86e commit 18043c8
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.2.13
dockerImageTag: 0.3.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3469,7 +3469,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.2.13"
- dockerImage: "airbyte/destination-s3:0.3.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down Expand Up @@ -3510,7 +3510,7 @@
\ bucket directory"
type: "string"
examples:
- "${NAMESPACE}/${STREAM_NAME}/"
- "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_"
order: 3
s3_bucket_region:
title: "S3 Bucket Region"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.13
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
final AirbyteStream abStream = stream.getStream();
final String namespace = abStream.getNamespace();
final String streamName = abStream.getName();
final String bucketPath = config.get(BUCKET_PATH_FIELD).asText();
final String outputBucketPath = config.get(BUCKET_PATH_FIELD).asText();
final String customOutputFormat = String.join("/",
bucketPath,
outputBucketPath,
config.has(PATH_FORMAT_FIELD) && !config.get(PATH_FORMAT_FIELD).asText().isBlank() ? config.get(PATH_FORMAT_FIELD).asText()
: S3DestinationConstants.DEFAULT_PATH_FORMAT);
final String outputBucketPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, syncMode);
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, fullOutputPath, syncMode);
LOGGER.info("Write config: {}", writeConfig);
return writeConfig;
};
Expand Down Expand Up @@ -139,7 +139,7 @@ private CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Ex
writer,
writeConfig.getNamespace(),
writeConfig.getStreamName(),
writeConfig.getOutputBucketPath()));
writeConfig.getFullOutputPath()));
} catch (final Exception e) {
LOGGER.error("Failed to flush and upload buffer to storage:", e);
throw new RuntimeException("Failed to upload buffer to storage", e);
Expand All @@ -153,7 +153,7 @@ private OnCloseFunction onCloseFunction(final BlobStorageOperations storageOpera
if (hasFailed) {
LOGGER.info("Cleaning up destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
storageOperations.cleanUpBucketObject(writeConfig.getOutputBucketPath(), writeConfig.getStoredFiles());
storageOperations.cleanUpBucketObject(writeConfig.getFullOutputPath(), writeConfig.getStoredFiles());
writeConfig.clearStoredFiles();
}
LOGGER.info("Cleaning up destination completed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public final class S3DestinationConstants {
public static final S3NameTransformer NAME_TRANSFORMER = new S3NameTransformer();
public static final String PART_SIZE_MB_ARG_NAME = "part_size_mb";
public static final int DEFAULT_PART_SIZE_MB = 10;
public static final String DEFAULT_PATH_FORMAT = "${NAMESPACE}/${STREAM_NAME}/";
public static final String DEFAULT_PATH_FORMAT = "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_";

private S3DestinationConstants() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.regex.Pattern;
import org.joda.time.DateTime;
import org.slf4j.Logger;
Expand Down Expand Up @@ -59,6 +59,9 @@ public String getBucketObjectPath(final String namespace, final String streamNam
.replaceAll(Pattern.quote("${HOUR}"), String.format("%02d", writeDatetime.hourOfDay().get()))
.replaceAll(Pattern.quote("${MINUTE}"), String.format("%02d", writeDatetime.minuteOfHour().get()))
.replaceAll(Pattern.quote("${SECOND}"), String.format("%02d", writeDatetime.secondOfMinute().get()))
.replaceAll(Pattern.quote("${MILLISECOND}"), String.format("%02d", writeDatetime.millisOfSecond().get()))
.replaceAll(Pattern.quote("${EPOCH}"), String.format("%d", writeDatetime.getMillis()))
.replaceAll(Pattern.quote("${UUID}"), String.format("%s", UUID.randomUUID()))
.replaceAll("/+", "/"));
}

Expand Down Expand Up @@ -102,12 +105,12 @@ public String uploadRecordsToBucket(final SerializableBuffer recordsData, final
return recordsData.getFilename();
}

private void loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException {
private void loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) {
final long partSize = s3Config.getFormatConfig() != null ? s3Config.getFormatConfig().getPartSize() : DEFAULT_PART_SIZE;
final String bucket = s3Config.getBucketName();
final String objectKey = String.format("%s%s", objectPath, recordsData.getFilename());
final String objectKeyWithPartId = String.format("%s%s", objectPath, getPartId(objectPath));
final StreamTransferManager uploadManager = StreamTransferManagerHelper
.getDefault(bucket, objectKey, s3Client, partSize)
.getDefault(bucket, objectKeyWithPartId, s3Client, partSize)
.checkIntegrity(true)
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
.queueCapacity(DEFAULT_QUEUE_CAPACITY);
Expand All @@ -126,12 +129,23 @@ private void loadDataIntoBucket(final String objectPath, final SerializableBuffe
uploadManager.complete();
}
}
if (!s3Client.doesObjectExist(bucket, objectKey)) {
LOGGER.error("Failed to upload data into storage, object {} not found", objectKey);
if (!s3Client.doesObjectExist(bucket, objectKeyWithPartId)) {
LOGGER.error("Failed to upload data into storage, object {} not found", objectKeyWithPartId);
throw new RuntimeException("Upload failed");
}
}

private String getPartId(final String objectPath) {
final String bucket = s3Config.getBucketName();
final ObjectListing objects = s3Client.listObjects(bucket, objectPath);
if (objects.isTruncated()) {
// bucket contains too many objects, use an uuid instead
return UUID.randomUUID().toString();
} else {
return Integer.toString(objects.getObjectSummaries().size());
}
}

@Override
public void dropBucketObject(final String objectPath) {
cleanUpBucketObject(objectPath, List.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ public class WriteConfig {
private final String namespace;
private final String streamName;
private final String outputBucketPath;
private final String fullOutputPath;
private final DestinationSyncMode syncMode;
private final List<String> storedFiles;

public WriteConfig(final String namespace,
final String streamName,
final String outputBucketPath,
final String fullOutputPath,
final DestinationSyncMode syncMode) {
this.namespace = namespace;
this.streamName = streamName;
this.outputBucketPath = outputBucketPath;
this.fullOutputPath = fullOutputPath;
this.syncMode = syncMode;
this.storedFiles = new ArrayList<>();
}
Expand All @@ -42,6 +45,10 @@ public String getOutputBucketPath() {
return outputBucketPath;
}

public String getFullOutputPath() {
return fullOutputPath;
}

public DestinationSyncMode getSyncMode() {
return syncMode;
}
Expand All @@ -64,6 +71,7 @@ public String toString() {
"streamName=" + streamName +
", namespace=" + namespace +
", outputBucketPath=" + outputBucketPath +
", fullOutputPath=" + fullOutputPath +
", syncMode=" + syncMode +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
"s3_path_format": {
"description": "Format string on how data will be organized inside the S3 bucket directory",
"type": "string",
"examples": ["${NAMESPACE}/${STREAM_NAME}/"],
"examples": [
"${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_"
],
"order": 3
},
"s3_bucket_region": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ protected List<S3ObjectSummary> getAllSyncedObjects(final String streamName, fin
streamNameStr,
DateTime.now(DateTimeZone.UTC),
S3DestinationConstants.DEFAULT_PATH_FORMAT);
final String parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1);
final List<S3ObjectSummary> objectSummaries = s3Client
.listObjects(config.getBucketName(), outputPrefix)
.listObjects(config.getBucketName(), parentFolder)
.getObjectSummaries()
.stream()
.filter(o -> o.getKey().contains(streamNameStr + "/"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class AvroSerializedBufferTest {
public void testSnappyAvroWriter() throws Exception {
final S3AvroFormatConfig config = new S3AvroFormatConfig(Jsons.jsonNode(Map.of("compression_codec", Map.of(
"codec", "snappy"))));
runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 970L, 980L, config, getExpectedString());
runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 965L, 980L, config, getExpectedString());
}

@Test
Expand Down
40 changes: 32 additions & 8 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,58 @@ Check out common troubleshooting issues for the S3 destination connector on our
| S3 Endpoint | string | URL to S3, If using AWS S3 just leave blank. |
| S3 Bucket Name | string | Name of the bucket to sync data into. |
| S3 Bucket Path | string | Subdirectory under the above bucket to sync the data into. |
| S3 Bucket Format | string | Additional subdirectories format under S3 Bucket Path. Default value is `${NAMESPACE}/${STREAM_NAME}/` and this can be further customized with variables such as `${YEAR}, ${MONTH}, ${DAY}, ${HOUR} etc` referring to the writing datetime. |
| S3 Bucket Format | string | Additional string format on how to store data under S3 Bucket Path. Default value is `${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_`. |
| S3 Region | string | See [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions) for all region codes. |
| Access Key ID | string | AWS/Minio credential. |
| Secret Access Key | string | AWS/Minio credential. |
| Format | object | Format specific configuration. See the [spec](/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json) for details. |

⚠️ Please note that under "Full Refresh Sync" mode, data in the configured bucket and path will be wiped out before each sync. We recommend you to provision a dedicated S3 resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️

The full path of the output data with S3 path format `${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}` is:
The full path of the output data with the default S3 path format `${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_` is:

```text
<bucket-name>/<source-namespace-if-exists>/<stream-name>/<upload-date>/<partition-uuid>.<format-extension>
<bucket-name>/<source-namespace-if-exists>/<stream-name>/<upload-date>_<epoch>_<partition-id>.<format-extension>
```

For example:

```text
testing_bucket/data_output_path/public/users/2021_01_01/123e4567-e89b-12d3-a456-426614174000.csv.gz
↑ ↑ ↑ ↑ ↑ ↑
| | | | | | format extension
| | | | | |
| | | | | uuid
testing_bucket/data_output_path/public/users/2021_01_01_1234567890_0.csv.gz
↑ ↑ ↑ ↑ ↑ ↑
| | | | | | | format extension
| | | | | | unique incremental part id
| | | | | milliseconds since epoch
| | | | upload date in YYYY_MM_DD
| | | stream name
| | source namespace (if it exists)
| bucket path
bucket name
```

The rationales behind this naming pattern are:

1. Each stream has its own directory.
2. The data output files can be sorted by upload time.
3. The upload time composes of a date part and millis part so that it is both readable and unique.

But it is possible to further customize by using the available variables to format the bucket path:
- `${NAMESPACE}`: Namespace where the stream comes from or configured by the connectionn namespace fields.
- `${STREAM_NAME}`: Name of the stream
- `${YEAR}`: Year in which the sync was writing the output data in.
- `${MONTH}`: Month in which the sync was writing the output data in.
- `${DAY}`: Day in which the sync was writing the output data in.
- `${HOUR}`: Hour in which the sync was writing the output data in.
- `${MINUTE}` : Minute in which the sync was writing the output data in.
- `${SECOND}`: Second in which the sync was writing the output data in.
- `${MILLISECOND}`: Millisecond in which the sync was writing the output data in.
- `${EPOCH}`: Milliseconds since Epoch in which the sync was writing the output data in.
- `${UUID}`: random uuid string

Note:
- Multiple `/` characters in the S3 path are collapsed into a single `/` character.
- If the output bucket contains too many files, the part id variable is using a `UUID` instead. It uses sequential ID otherwise.

Please note that the stream name may contain a prefix, if it is configured on the connection.
A data sync may create multiple files as the output files can be partitioned by size (targeting a size of 200MB compressed or lower) .

Expand Down Expand Up @@ -228,6 +251,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- |:---------------------------------------------------------------------------------------------------------------------------|
| 0.3.0 | 2022-04-04 | [\#11666](https://github.com/airbytehq/airbyte/pull/11666) | 0.2.12 actually has breaking changes since files are compressed by default, this PR also fixes the naming to be more compatible with older versions. |
| 0.2.13 | 2022-03-29 | [\#11496](https://github.com/airbytehq/airbyte/pull/11496) | Fix S3 bucket path to be included with S3 bucket format |
| 0.2.12 | 2022-03-28 | [\#11294](https://github.com/airbytehq/airbyte/pull/11294) | Change to serialized buffering strategy to reduce memory consumption |
| 0.2.11 | 2022-03-23 | [\#11173](https://github.com/airbytehq/airbyte/pull/11173) | Added support for AWS Glue crawler |
Expand Down

0 comments on commit 18043c8

Please sign in to comment.