diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/adapter/v1/AwsSdkV1Adapters.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/adapter/v1/AwsSdkV1Adapters.java new file mode 100644 index 0000000000000..d4cd7a96f8a23 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/adapter/v1/AwsSdkV1Adapters.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common.adapter.v1; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.fs.s3.common.model.FlinkCompleteMultipartUploadResult; +import org.apache.flink.fs.s3.common.model.FlinkObjectMetadata; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; +import org.apache.flink.fs.s3.common.model.FlinkPutObjectResult; +import org.apache.flink.fs.s3.common.model.FlinkUploadPartResult; + +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.UploadPartResult; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Adapters to convert between Flink SDK-agnostic types and AWS SDK v1 types. + * + *

This class provides static utility methods for converting between Flink's SDK-agnostic model + * classes and AWS SDK v1 model classes. These adapters enable the S3 filesystem implementation to + * use AWS SDK v1 (e.g., for Presto-based implementations) while working with SDK-agnostic + * interfaces. + */ +@Internal +public final class AwsSdkV1Adapters { + + private AwsSdkV1Adapters() { + // Utility class, no instantiation + } + + // ========== PartETag Conversions ========== + + /** + * Converts an AWS SDK v1 PartETag to a Flink PartETag. + * + * @param partETag the AWS SDK v1 PartETag + * @return the Flink PartETag + */ + public static FlinkPartETag toFlinkPartETag(PartETag partETag) { + return new FlinkPartETag(partETag.getPartNumber(), partETag.getETag()); + } + + /** + * Converts a Flink PartETag to an AWS SDK v1 PartETag. + * + * @param flinkPartETag the Flink PartETag + * @return the AWS SDK v1 PartETag + */ + public static PartETag toAwsPartETag(FlinkPartETag flinkPartETag) { + return new PartETag(flinkPartETag.getPartNumber(), flinkPartETag.getETag()); + } + + /** + * Converts a list of AWS SDK v1 PartETags to Flink PartETags. + * + * @param partETags the list of AWS SDK v1 PartETags + * @return the list of Flink PartETags + */ + public static List toFlinkPartETags(List partETags) { + return partETags.stream() + .map(AwsSdkV1Adapters::toFlinkPartETag) + .collect(Collectors.toList()); + } + + /** + * Converts a list of Flink PartETags to AWS SDK v1 PartETags. + * + * @param flinkPartETags the list of Flink PartETags + * @return the list of AWS SDK v1 PartETags + */ + public static List toAwsPartETags(List flinkPartETags) { + return flinkPartETags.stream() + .map(AwsSdkV1Adapters::toAwsPartETag) + .collect(Collectors.toList()); + } + + // ========== UploadPartResult Conversions ========== + + /** + * Converts an AWS SDK v1 UploadPartResult to a Flink UploadPartResult. + * + * @param result the AWS SDK v1 UploadPartResult + * @return the Flink UploadPartResult + */ + public static FlinkUploadPartResult toFlinkUploadPartResult(UploadPartResult result) { + return new FlinkUploadPartResult(result.getPartNumber(), result.getETag()); + } + + // ========== PutObjectResult Conversions ========== + + /** + * Converts an AWS SDK v1 PutObjectResult to a Flink PutObjectResult. + * + * @param result the AWS SDK v1 PutObjectResult + * @return the Flink PutObjectResult + */ + public static FlinkPutObjectResult toFlinkPutObjectResult(PutObjectResult result) { + return new FlinkPutObjectResult(result.getETag(), result.getVersionId()); + } + + // ========== CompleteMultipartUploadResult Conversions ========== + + /** + * Converts an AWS SDK v1 CompleteMultipartUploadResult to a Flink + * CompleteMultipartUploadResult. + * + * @param result the AWS SDK v1 CompleteMultipartUploadResult + * @return the Flink CompleteMultipartUploadResult + */ + public static FlinkCompleteMultipartUploadResult toFlinkCompleteMultipartUploadResult( + CompleteMultipartUploadResult result) { + return FlinkCompleteMultipartUploadResult.builder() + .bucket(result.getBucketName()) + .key(result.getKey()) + .eTag(result.getETag()) + .location(result.getLocation()) + .versionId(result.getVersionId()) + .build(); + } + + // ========== ObjectMetadata Conversions ========== + + /** + * Converts an AWS SDK v1 ObjectMetadata to a Flink ObjectMetadata. + * + * @param metadata the AWS SDK v1 ObjectMetadata + * @return the Flink ObjectMetadata + */ + public static FlinkObjectMetadata toFlinkObjectMetadata(ObjectMetadata metadata) { + return FlinkObjectMetadata.builder() + .contentLength(metadata.getContentLength()) + .contentType(metadata.getContentType()) + .eTag(metadata.getETag()) + .lastModified(metadata.getLastModified()) + .userMetadata(metadata.getUserMetadata()) + .build(); + } +} diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/adapter/v2/AwsSdkV2Adapters.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/adapter/v2/AwsSdkV2Adapters.java new file mode 100644 index 0000000000000..63610e3a631b6 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/adapter/v2/AwsSdkV2Adapters.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common.adapter.v2; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.fs.s3.common.model.FlinkCompleteMultipartUploadResult; +import org.apache.flink.fs.s3.common.model.FlinkObjectMetadata; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; +import org.apache.flink.fs.s3.common.model.FlinkPutObjectResult; +import org.apache.flink.fs.s3.common.model.FlinkUploadPartResult; + +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; + +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Adapters to convert between Flink SDK-agnostic types and AWS SDK v2 types. + * + *

This class provides static utility methods for converting between Flink's SDK-agnostic model + * classes and AWS SDK v2 model classes. These adapters enable the S3 filesystem implementation to + * use AWS SDK v2 (e.g., for Hadoop 3.4+ based implementations) while working with SDK-agnostic + * interfaces. + */ +@Internal +public final class AwsSdkV2Adapters { + + private AwsSdkV2Adapters() { + // Utility class, no instantiation + } + + // ========== CompletedPart Conversions ========== + + /** + * Converts an AWS SDK v2 CompletedPart to a Flink PartETag. + * + * @param completedPart the AWS SDK v2 CompletedPart + * @return the Flink PartETag + */ + public static FlinkPartETag toFlinkPartETag(CompletedPart completedPart) { + return new FlinkPartETag(completedPart.partNumber(), completedPart.eTag()); + } + + /** + * Converts a Flink PartETag to an AWS SDK v2 CompletedPart. + * + * @param flinkPartETag the Flink PartETag + * @return the AWS SDK v2 CompletedPart + */ + public static CompletedPart toAwsCompletedPart(FlinkPartETag flinkPartETag) { + return CompletedPart.builder() + .partNumber(flinkPartETag.getPartNumber()) + .eTag(flinkPartETag.getETag()) + .build(); + } + + /** + * Converts a list of AWS SDK v2 CompletedParts to Flink PartETags. + * + * @param completedParts the list of AWS SDK v2 CompletedParts + * @return the list of Flink PartETags + */ + public static List toFlinkPartETags(List completedParts) { + return completedParts.stream() + .map(AwsSdkV2Adapters::toFlinkPartETag) + .collect(Collectors.toList()); + } + + /** + * Converts a list of Flink PartETags to AWS SDK v2 CompletedParts. + * + * @param flinkPartETags the list of Flink PartETags + * @return the list of AWS SDK v2 CompletedParts + */ + public static List toAwsCompletedParts(List flinkPartETags) { + return flinkPartETags.stream() + .map(AwsSdkV2Adapters::toAwsCompletedPart) + .collect(Collectors.toList()); + } + + /** + * Creates an AWS SDK v2 CompletedMultipartUpload from a list of Flink PartETags. + * + * @param flinkPartETags the list of Flink PartETags + * @return the AWS SDK v2 CompletedMultipartUpload + */ + public static CompletedMultipartUpload toCompletedMultipartUpload( + List flinkPartETags) { + return CompletedMultipartUpload.builder() + .parts(toAwsCompletedParts(flinkPartETags)) + .build(); + } + + // ========== UploadPartResponse Conversions ========== + + /** + * Converts an AWS SDK v2 UploadPartResponse to a Flink UploadPartResult. + * + *

Note: AWS SDK v2 does not return the part number in the response, so the part number must + * be tracked separately by the caller and provided here. + * + * @param response the AWS SDK v2 UploadPartResponse + * @param partNumber the part number (tracked by caller) + * @return the Flink UploadPartResult + */ + public static FlinkUploadPartResult toFlinkUploadPartResult( + UploadPartResponse response, int partNumber) { + return new FlinkUploadPartResult(partNumber, response.eTag()); + } + + // ========== PutObjectResponse Conversions ========== + + /** + * Converts an AWS SDK v2 PutObjectResponse to a Flink PutObjectResult. + * + * @param response the AWS SDK v2 PutObjectResponse + * @return the Flink PutObjectResult + */ + public static FlinkPutObjectResult toFlinkPutObjectResult(PutObjectResponse response) { + return new FlinkPutObjectResult(response.eTag(), response.versionId()); + } + + // ========== CompleteMultipartUploadResponse Conversions ========== + + /** + * Converts an AWS SDK v2 CompleteMultipartUploadResponse to a Flink + * CompleteMultipartUploadResult. + * + * @param response the AWS SDK v2 CompleteMultipartUploadResponse + * @return the Flink CompleteMultipartUploadResult + */ + public static FlinkCompleteMultipartUploadResult toFlinkCompleteMultipartUploadResult( + CompleteMultipartUploadResponse response) { + return FlinkCompleteMultipartUploadResult.builder() + .bucket(response.bucket()) + .key(response.key()) + .eTag(response.eTag()) + .location(response.location()) + .versionId(response.versionId()) + .build(); + } + + // ========== HeadObjectResponse Conversions ========== + + /** + * Converts an AWS SDK v2 HeadObjectResponse to a Flink ObjectMetadata. + * + * @param response the AWS SDK v2 HeadObjectResponse + * @return the Flink ObjectMetadata + */ + public static FlinkObjectMetadata toFlinkObjectMetadata(HeadObjectResponse response) { + return FlinkObjectMetadata.builder() + .contentLength(response.contentLength()) + .contentType(response.contentType()) + .eTag(response.eTag()) + .lastModified( + response.lastModified() != null ? Date.from(response.lastModified()) : null) + .userMetadata(response.metadata()) + .build(); + } + + // ========== GetObjectResponse Conversions ========== + + /** + * Converts an AWS SDK v2 GetObjectResponse to a Flink ObjectMetadata. + * + * @param response the AWS SDK v2 GetObjectResponse + * @return the Flink ObjectMetadata + */ + public static FlinkObjectMetadata toFlinkObjectMetadata(GetObjectResponse response) { + return FlinkObjectMetadata.builder() + .contentLength(response.contentLength()) + .contentType(response.contentType()) + .eTag(response.eTag()) + .lastModified( + response.lastModified() != null ? Date.from(response.lastModified()) : null) + .userMetadata(response.metadata()) + .build(); + } +} diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkCompleteMultipartUploadResult.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkCompleteMultipartUploadResult.java new file mode 100644 index 0000000000000..a2caa17ff11ca --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkCompleteMultipartUploadResult.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common.model; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * SDK-agnostic representation of the result of completing a multipart upload. + * + *

This class maintains compatibility with both AWS SDK v1 {@code CompleteMultipartUploadResult} + * and AWS SDK v2 {@code CompleteMultipartUploadResponse}, allowing the base S3 filesystem + * implementation to work with either SDK version. + */ +@Internal +public final class FlinkCompleteMultipartUploadResult { + + @Nullable private final String bucket; + @Nullable private final String key; + @Nullable private final String eTag; + @Nullable private final String location; + @Nullable private final String versionId; + + private FlinkCompleteMultipartUploadResult(Builder builder) { + this.bucket = builder.bucket; + this.key = builder.key; + this.eTag = builder.eTag; + this.location = builder.location; + this.versionId = builder.versionId; + } + + /** Returns the bucket name. */ + @Nullable + public String getBucket() { + return bucket; + } + + /** Returns the object key. */ + @Nullable + public String getKey() { + return key; + } + + /** Returns the ETag of the completed object. */ + @Nullable + public String getETag() { + return eTag; + } + + /** Returns the location (URL) of the completed object. */ + @Nullable + public String getLocation() { + return location; + } + + /** Returns the version ID of the object (if versioning is enabled). */ + @Nullable + public String getVersionId() { + return versionId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FlinkCompleteMultipartUploadResult that = (FlinkCompleteMultipartUploadResult) o; + return Objects.equals(bucket, that.bucket) + && Objects.equals(key, that.key) + && Objects.equals(eTag, that.eTag) + && Objects.equals(location, that.location) + && Objects.equals(versionId, that.versionId); + } + + @Override + public int hashCode() { + return Objects.hash(bucket, key, eTag, location, versionId); + } + + @Override + public String toString() { + return "FlinkCompleteMultipartUploadResult{" + + "bucket='" + + bucket + + '\'' + + ", key='" + + key + + '\'' + + ", eTag='" + + eTag + + '\'' + + ", location='" + + location + + '\'' + + ", versionId='" + + versionId + + '\'' + + '}'; + } + + /** Creates a new builder. */ + public static Builder builder() { + return new Builder(); + } + + /** Builder for {@link FlinkCompleteMultipartUploadResult}. */ + public static class Builder { + private String bucket; + private String key; + private String eTag; + private String location; + private String versionId; + + private Builder() {} + + public Builder bucket(String bucket) { + this.bucket = bucket; + return this; + } + + public Builder key(String key) { + this.key = key; + return this; + } + + public Builder eTag(String eTag) { + this.eTag = eTag; + return this; + } + + public Builder location(String location) { + this.location = location; + return this; + } + + public Builder versionId(String versionId) { + this.versionId = versionId; + return this; + } + + public FlinkCompleteMultipartUploadResult build() { + return new FlinkCompleteMultipartUploadResult(this); + } + } +} diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkObjectMetadata.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkObjectMetadata.java new file mode 100644 index 0000000000000..b6c2b0d523cca --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkObjectMetadata.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common.model; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * SDK-agnostic representation of S3 object metadata. + * + *

This class maintains compatibility with both AWS SDK v1 {@code ObjectMetadata} and AWS SDK v2 + * {@code HeadObjectResponse}/{@code GetObjectResponse}, allowing the base S3 filesystem + * implementation to work with either SDK version. + */ +@Internal +public final class FlinkObjectMetadata { + + private final long contentLength; + @Nullable private final String contentType; + @Nullable private final String eTag; + @Nullable private final Date lastModified; + private final Map userMetadata; + + private FlinkObjectMetadata(Builder builder) { + this.contentLength = builder.contentLength; + this.contentType = builder.contentType; + this.eTag = builder.eTag; + this.lastModified = builder.lastModified; + this.userMetadata = + builder.userMetadata != null + ? Collections.unmodifiableMap(new HashMap<>(builder.userMetadata)) + : Collections.emptyMap(); + } + + /** Returns the size of the object in bytes. */ + public long getContentLength() { + return contentLength; + } + + /** Returns the content type of the object. */ + @Nullable + public String getContentType() { + return contentType; + } + + /** Returns the ETag of the object. */ + @Nullable + public String getETag() { + return eTag; + } + + /** Returns the last modified date of the object. */ + @Nullable + public Date getLastModified() { + return lastModified; + } + + /** Returns the user-defined metadata for the object. */ + public Map getUserMetadata() { + return userMetadata; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FlinkObjectMetadata that = (FlinkObjectMetadata) o; + return contentLength == that.contentLength + && Objects.equals(contentType, that.contentType) + && Objects.equals(eTag, that.eTag) + && Objects.equals(lastModified, that.lastModified) + && userMetadata.equals(that.userMetadata); + } + + @Override + public int hashCode() { + return Objects.hash(contentLength, contentType, eTag, lastModified, userMetadata); + } + + @Override + public String toString() { + return "FlinkObjectMetadata{" + + "contentLength=" + + contentLength + + ", contentType='" + + contentType + + '\'' + + ", eTag='" + + eTag + + '\'' + + ", lastModified=" + + lastModified + + ", userMetadata=" + + userMetadata + + '}'; + } + + /** Creates a new builder. */ + public static Builder builder() { + return new Builder(); + } + + /** Builder for {@link FlinkObjectMetadata}. */ + public static class Builder { + private long contentLength; + private String contentType; + private String eTag; + private Date lastModified; + private Map userMetadata; + + private Builder() {} + + public Builder contentLength(long contentLength) { + this.contentLength = contentLength; + return this; + } + + public Builder contentType(String contentType) { + this.contentType = contentType; + return this; + } + + public Builder eTag(String eTag) { + this.eTag = eTag; + return this; + } + + public Builder lastModified(Date lastModified) { + this.lastModified = lastModified; + return this; + } + + public Builder userMetadata(Map userMetadata) { + this.userMetadata = userMetadata; + return this; + } + + public FlinkObjectMetadata build() { + return new FlinkObjectMetadata(this); + } + } +} diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkPartETag.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkPartETag.java new file mode 100644 index 0000000000000..f6a23f9635800 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkPartETag.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common.model; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; +import java.util.Objects; + +/** + * SDK-agnostic representation of a part uploaded in a multipart upload. + * + *

This class maintains compatibility with both AWS SDK v1 {@code PartETag} and AWS SDK v2 {@code + * CompletedPart}, allowing the base S3 filesystem implementation to work with either SDK version. + * + *

This class is serializable to support checkpoint recovery. The serialization format is + * independent of any AWS SDK version, ensuring backward compatibility across SDK upgrades. + */ +@Internal +public final class FlinkPartETag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final int partNumber; + private final String eTag; + + /** + * Creates a new FlinkPartETag. + * + * @param partNumber the part number (must be between 1 and 10000) + * @param eTag the ETag of the uploaded part + */ + public FlinkPartETag(int partNumber, String eTag) { + this.partNumber = partNumber; + this.eTag = Objects.requireNonNull(eTag, "eTag must not be null"); + } + + /** + * Returns the part number of the uploaded part. + * + * @return the part number + */ + public int getPartNumber() { + return partNumber; + } + + /** + * Returns the ETag of the uploaded part. + * + * @return the ETag + */ + public String getETag() { + return eTag; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FlinkPartETag that = (FlinkPartETag) o; + return partNumber == that.partNumber && eTag.equals(that.eTag); + } + + @Override + public int hashCode() { + return Objects.hash(partNumber, eTag); + } + + @Override + public String toString() { + return "FlinkPartETag{partNumber=" + partNumber + ", eTag='" + eTag + "'}"; + } +} diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkPutObjectResult.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkPutObjectResult.java new file mode 100644 index 0000000000000..62359826cf066 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkPutObjectResult.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common.model; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * SDK-agnostic representation of the result of putting an object to S3. + * + *

This class maintains compatibility with both AWS SDK v1 {@code PutObjectResult} and AWS SDK v2 + * {@code PutObjectResponse}, allowing the base S3 filesystem implementation to work with either SDK + * version. + */ +@Internal +public final class FlinkPutObjectResult { + + @Nullable private final String eTag; + @Nullable private final String versionId; + + /** + * Creates a new FlinkPutObjectResult. + * + * @param eTag the ETag of the uploaded object + * @param versionId the version ID of the object (if versioning is enabled) + */ + public FlinkPutObjectResult(@Nullable String eTag, @Nullable String versionId) { + this.eTag = eTag; + this.versionId = versionId; + } + + /** + * Returns the ETag of the uploaded object. + * + * @return the ETag + */ + @Nullable + public String getETag() { + return eTag; + } + + /** + * Returns the version ID of the object (if versioning is enabled). + * + * @return the version ID, or null if versioning is not enabled + */ + @Nullable + public String getVersionId() { + return versionId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FlinkPutObjectResult that = (FlinkPutObjectResult) o; + return Objects.equals(eTag, that.eTag) && Objects.equals(versionId, that.versionId); + } + + @Override + public int hashCode() { + return Objects.hash(eTag, versionId); + } + + @Override + public String toString() { + return "FlinkPutObjectResult{eTag='" + eTag + "', versionId='" + versionId + "'}"; + } +} diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkUploadPartResult.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkUploadPartResult.java new file mode 100644 index 0000000000000..a12a0604dd9d2 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/model/FlinkUploadPartResult.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common.model; + +import org.apache.flink.annotation.Internal; + +import java.util.Objects; + +/** + * SDK-agnostic representation of the result of uploading a part in a multipart upload. + * + *

This class maintains compatibility with both AWS SDK v1 {@code UploadPartResult} and AWS SDK + * v2 {@code UploadPartResponse}, allowing the base S3 filesystem implementation to work with either + * SDK version. + */ +@Internal +public final class FlinkUploadPartResult { + + private final int partNumber; + private final String eTag; + + /** + * Creates a new FlinkUploadPartResult. + * + * @param partNumber the part number of the uploaded part + * @param eTag the ETag of the uploaded part + */ + public FlinkUploadPartResult(int partNumber, String eTag) { + this.partNumber = partNumber; + this.eTag = Objects.requireNonNull(eTag, "eTag must not be null"); + } + + /** + * Returns the part number of the uploaded part. + * + * @return the part number + */ + public int getPartNumber() { + return partNumber; + } + + /** + * Returns the ETag of the uploaded part. + * + * @return the ETag + */ + public String getETag() { + return eTag; + } + + /** + * Converts this result to a {@link FlinkPartETag} for use in multipart upload completion. + * + * @return a FlinkPartETag with the same part number and ETag + */ + public FlinkPartETag toPartETag() { + return new FlinkPartETag(partNumber, eTag); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FlinkUploadPartResult that = (FlinkUploadPartResult) o; + return partNumber == that.partNumber && eTag.equals(that.eTag); + } + + @Override + public int hashCode() { + return Objects.hash(partNumber, eTag); + } + + @Override + public String toString() { + return "FlinkUploadPartResult{partNumber=" + partNumber + ", eTag='" + eTag + "'}"; + } +} diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/MultiPartUploadInfo.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/MultiPartUploadInfo.java index d76d288951275..60518aea16f95 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/MultiPartUploadInfo.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/MultiPartUploadInfo.java @@ -19,8 +19,7 @@ package org.apache.flink.fs.s3.common.writer; import org.apache.flink.annotation.Internal; - -import com.amazonaws.services.s3.model.PartETag; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; import java.io.File; import java.util.ArrayList; @@ -38,7 +37,7 @@ final class MultiPartUploadInfo { private final String uploadId; - private final List completeParts; + private final List completeParts; private final Optional incompletePart; @@ -57,7 +56,7 @@ final class MultiPartUploadInfo { MultiPartUploadInfo( final String objectName, final String uploadId, - final List completeParts, + final List completeParts, final long numBytes, final Optional incompletePart) { @@ -92,7 +91,7 @@ Optional getIncompletePart() { return incompletePart; } - List getCopyOfEtagsOfCompleteParts() { + List getCopyOfEtagsOfCompleteParts() { return new ArrayList<>(completeParts); } @@ -101,7 +100,7 @@ void registerNewPart(long length) { this.numberOfRegisteredParts++; } - void registerCompletePart(PartETag eTag) { + void registerCompletePart(FlinkPartETag eTag) { completeParts.add(eTag); } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java index ddc6ceb83a6b5..b2647a7adab20 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java @@ -21,9 +21,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.RefCountedFSOutputStream; - -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.UploadPartResult; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; +import org.apache.flink.fs.s3.common.model.FlinkUploadPartResult; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -61,7 +60,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload private final Executor uploadThreadPool; - private final Deque> uploadsInProgress; + private final Deque> uploadsInProgress; private final String namePrefixForTempObjects; @@ -74,7 +73,7 @@ private RecoverableMultiPartUploadImpl( Executor uploadThreadPool, String uploadId, String objectName, - List partsSoFar, + List partsSoFar, long numBytes, Optional incompletePart) { checkArgument(numBytes >= 0L); @@ -102,7 +101,7 @@ public void uploadPart(RefCountedFSOutputStream file) throws IOException { // writing to the file we are uploading. checkState(file.isClosed()); - final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture future = new CompletableFuture<>(); uploadsInProgress.add(future); final long partLength = file.getPos(); @@ -153,7 +152,8 @@ public S3Recoverable snapshotAndGetRecoverable( final String objectName = currentUploadInfo.getObjectName(); final String uploadId = currentUploadInfo.getUploadId(); - final List completedParts = currentUploadInfo.getCopyOfEtagsOfCompleteParts(); + final List completedParts = + currentUploadInfo.getCopyOfEtagsOfCompleteParts(); final long sizeInBytes = currentUploadInfo.getExpectedSizeInBytes(); if (incompletePartObjectName == null) { @@ -219,16 +219,16 @@ private void awaitPendingPartsUpload() throws IOException { checkState(currentUploadInfo.getRemainingParts() == uploadsInProgress.size()); while (currentUploadInfo.getRemainingParts() > 0) { - CompletableFuture next = uploadsInProgress.peekFirst(); - PartETag nextPart = awaitPendingPartUploadToComplete(next); + CompletableFuture next = uploadsInProgress.peekFirst(); + FlinkPartETag nextPart = awaitPendingPartUploadToComplete(next); currentUploadInfo.registerCompletePart(nextPart); uploadsInProgress.removeFirst(); } } - private PartETag awaitPendingPartUploadToComplete(CompletableFuture upload) + private FlinkPartETag awaitPendingPartUploadToComplete(CompletableFuture upload) throws IOException { - final PartETag completedUploadEtag; + final FlinkPartETag completedUploadEtag; try { completedUploadEtag = upload.get(); } catch (InterruptedException e) { @@ -267,7 +267,7 @@ public static RecoverableMultiPartUploadImpl recoverUpload( final Executor uploadThreadPool, final String multipartUploadId, final String objectName, - final List partsSoFar, + final List partsSoFar, final long numBytesSoFar, final Optional incompletePart) { @@ -297,13 +297,13 @@ private static class UploadTask implements Runnable { private final RefCountedFSOutputStream file; - private final CompletableFuture future; + private final CompletableFuture future; UploadTask( final S3AccessHelper s3AccessHelper, final MultiPartUploadInfo currentUpload, final RefCountedFSOutputStream file, - final CompletableFuture future) { + final CompletableFuture future) { checkNotNull(currentUpload); @@ -322,14 +322,14 @@ private static class UploadTask implements Runnable { @Override public void run() { try { - final UploadPartResult result = + final FlinkUploadPartResult result = s3AccessHelper.uploadPart( objectName, uploadId, partNumber, file.getInputFile(), file.getPos()); - future.complete(new PartETag(result.getPartNumber(), result.getETag())); + future.complete(result.toPartETag()); file.release(); } catch (Throwable t) { future.completeExceptionally(t); diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java index 13f73ee4c9818..5c8c5ed68cf15 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java @@ -19,12 +19,11 @@ package org.apache.flink.fs.s3.common.writer; import org.apache.flink.annotation.Internal; - -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.model.UploadPartResult; +import org.apache.flink.fs.s3.common.model.FlinkCompleteMultipartUploadResult; +import org.apache.flink.fs.s3.common.model.FlinkObjectMetadata; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; +import org.apache.flink.fs.s3.common.model.FlinkPutObjectResult; +import org.apache.flink.fs.s3.common.model.FlinkUploadPartResult; import java.io.File; import java.io.IOException; @@ -33,12 +32,15 @@ /** * An interface that abstracts away the Multi-Part Upload (MPU) functionality offered by S3, from - * the specific implementation of the file system. This is needed so that we can accommodate both - * Hadoop S3 and Presto. + * both the specific implementation of the file system and the AWS SDK version. This is needed so + * that we can accommodate both Hadoop S3 (using AWS SDK v2) and Presto (using AWS SDK v1). * - *

Multipart uploads are convenient for large object. These will be uploaded in multiple parts - * and the mutli-part upload is the equivalent of a transaction, where the upload with all its parts + *

Multipart uploads are convenient for large objects. These will be uploaded in multiple parts + * and the multipart upload is the equivalent of a transaction, where the upload with all its parts * will be either committed or discarded. + * + *

This interface uses SDK-agnostic types from {@link org.apache.flink.fs.s3.common.model} + * package, allowing implementations to work with either AWS SDK v1 or v2. */ @Internal public interface S3AccessHelper { @@ -61,10 +63,10 @@ public interface S3AccessHelper { * @param partNumber the number of the part being uploaded (has to be in [1 ... 10000]). * @param inputFile the (local) file holding the part to be uploaded. * @param length the length of the part. - * @return The {@link UploadPartResult result} of the attempt to upload the part. + * @return The {@link FlinkUploadPartResult result} of the attempt to upload the part. * @throws IOException */ - UploadPartResult uploadPart( + FlinkUploadPartResult uploadPart( String key, String uploadId, int partNumber, File inputFile, long length) throws IOException; @@ -75,26 +77,27 @@ UploadPartResult uploadPart( * * @param key the key used to identify this part. * @param inputFile the (local) file holding the data to be uploaded. - * @return The {@link PutObjectResult result} of the attempt to stage the incomplete part. + * @return The {@link FlinkPutObjectResult result} of the attempt to stage the incomplete part. * @throws IOException */ - PutObjectResult putObject(String key, File inputFile) throws IOException; + FlinkPutObjectResult putObject(String key, File inputFile) throws IOException; /** * Finalizes a Multi-Part Upload. * * @param key the key identifying the object we finished uploading. * @param uploadId the id of the MPU. - * @param partETags the list of {@link PartETag ETags} associated with this MPU. + * @param partETags the list of {@link FlinkPartETag ETags} associated with this MPU. * @param length the size of the uploaded object. * @param errorCount a counter that will be used to count any failed attempts to commit the MPU. - * @return The {@link CompleteMultipartUploadResult result} of the attempt to finalize the MPU. + * @return The {@link FlinkCompleteMultipartUploadResult result} of the attempt to finalize the + * MPU. * @throws IOException */ - CompleteMultipartUploadResult commitMultiPartUpload( + FlinkCompleteMultipartUploadResult commitMultiPartUpload( String key, String uploadId, - List partETags, + List partETags, long length, AtomicInteger errorCount) throws IOException; @@ -124,8 +127,8 @@ CompleteMultipartUploadResult commitMultiPartUpload( * Fetches the metadata associated with a given key on S3. * * @param key the key. - * @return The associated {@link ObjectMetadata}. + * @return The associated {@link FlinkObjectMetadata}. * @throws IOException */ - ObjectMetadata getObjectMetadata(String key) throws IOException; + FlinkObjectMetadata getObjectMetadata(String key) throws IOException; } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java index 6cea633b30b05..d4a08f92007b2 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java @@ -20,9 +20,9 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.s3.common.model.FlinkObjectMetadata; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +44,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe private final String objectName; - private final List parts; + private final List parts; private final long totalLength; @@ -52,7 +52,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe S3AccessHelper s3AccessHelper, String objectName, String uploadId, - List parts, + List parts, long totalLength) { this.s3AccessHelper = checkNotNull(s3AccessHelper); this.objectName = checkNotNull(objectName); @@ -101,7 +101,7 @@ public void commitAfterRecovery() throws IOException { LOG.trace("Exception when committing:", e); try { - ObjectMetadata metadata = s3AccessHelper.getObjectMetadata(objectName); + FlinkObjectMetadata metadata = s3AccessHelper.getObjectMetadata(objectName); if (totalLength != metadata.getContentLength()) { String message = String.format( diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Recoverable.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Recoverable.java index cbe9e159b07fc..94cc3c9dc312e 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Recoverable.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Recoverable.java @@ -19,8 +19,7 @@ package org.apache.flink.fs.s3.common.writer; import org.apache.flink.core.fs.RecoverableWriter; - -import com.amazonaws.services.s3.model.PartETag; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; import javax.annotation.Nullable; @@ -36,7 +35,7 @@ public final class S3Recoverable implements RecoverableWriter.ResumeRecoverable private final String objectName; - private final List parts; + private final List parts; @Nullable private final String lastPartObject; @@ -44,14 +43,15 @@ public final class S3Recoverable implements RecoverableWriter.ResumeRecoverable private long lastPartObjectLength; - S3Recoverable(String objectName, String uploadId, List parts, long numBytesInParts) { + S3Recoverable( + String objectName, String uploadId, List parts, long numBytesInParts) { this(objectName, uploadId, parts, numBytesInParts, null, -1L); } S3Recoverable( String objectName, String uploadId, - List parts, + List parts, long numBytesInParts, @Nullable String lastPartObject, long lastPartObjectLength) { @@ -77,7 +77,7 @@ public String getObjectName() { return objectName; } - public List parts() { + public List parts() { return parts; } @@ -105,7 +105,7 @@ public String toString() { buf.append(", bytesInParts=").append(numBytesInParts); buf.append(", parts=["); int num = 0; - for (PartETag part : parts) { + for (FlinkPartETag part : parts) { if (0 != num++) { buf.append(", "); } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializer.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializer.java index a72e99be831d1..b8c917c846728 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializer.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializer.java @@ -20,8 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; - -import com.amazonaws.services.s3.model.PartETag; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; import java.io.IOException; import java.nio.ByteBuffer; @@ -51,8 +50,8 @@ public int getVersion() { @Override public byte[] serialize(S3Recoverable obj) throws IOException { - final List partList = obj.parts(); - final PartETag[] parts = partList.toArray(new PartETag[partList.size()]); + final List partList = obj.parts(); + final FlinkPartETag[] parts = partList.toArray(new FlinkPartETag[partList.size()]); final byte[] keyBytes = obj.getObjectName().getBytes(CHARSET); final byte[] uploadIdBytes = obj.uploadId().getBytes(CHARSET); @@ -93,7 +92,7 @@ public byte[] serialize(S3Recoverable obj) throws IOException { bb.putInt(etags.length); for (int i = 0; i < parts.length; i++) { - PartETag pe = parts[i]; + FlinkPartETag pe = parts[i]; bb.putInt(pe.getPartNumber()); bb.putInt(etags[i].length); bb.put(etags[i]); @@ -137,12 +136,12 @@ private static S3Recoverable deserializeV1(byte[] serialized) throws IOException bb.get(uploadIdBytes); final int numParts = bb.getInt(); - final ArrayList parts = new ArrayList<>(numParts); + final ArrayList parts = new ArrayList<>(numParts); for (int i = 0; i < numParts; i++) { final int partNum = bb.getInt(); final byte[] buffer = new byte[bb.getInt()]; bb.get(buffer); - parts.add(new PartETag(partNum, new String(buffer, CHARSET))); + parts.add(new FlinkPartETag(partNum, new String(buffer, CHARSET))); } final long numBytes = bb.getLong(); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java index 7ec0101d2e95e..c1a0bf36b5097 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java @@ -20,14 +20,14 @@ import org.apache.flink.core.fs.RefCountedBufferingFileStream; import org.apache.flink.core.fs.RefCountedFileWithStream; +import org.apache.flink.fs.s3.common.model.FlinkCompleteMultipartUploadResult; +import org.apache.flink.fs.s3.common.model.FlinkObjectMetadata; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; +import org.apache.flink.fs.s3.common.model.FlinkPutObjectResult; +import org.apache.flink.fs.s3.common.model.FlinkUploadPartResult; import org.apache.flink.util.IOUtils; import org.apache.flink.util.MathUtils; -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.model.UploadPartResult; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -168,8 +168,9 @@ private static void assertThatIsEqualTo( assertThat(actualRecoverable.incompleteObjectLength()) .isEqualTo(expectedRecoverable.incompleteObjectLength()); - assertThat(actualRecoverable.parts().stream().map(PartETag::getETag).toArray()) - .isEqualTo(expectedRecoverable.parts().stream().map(PartETag::getETag).toArray()); + assertThat(actualRecoverable.parts().stream().map(FlinkPartETag::getETag).toArray()) + .isEqualTo( + expectedRecoverable.parts().stream().map(FlinkPartETag::getETag).toArray()); } // ---------------------------------- Test Methods ------------------------------------------- @@ -180,12 +181,12 @@ private static byte[] bytesOf(String str) { private static S3Recoverable createS3Recoverable( byte[] incompletePart, byte[]... completeParts) { - final List eTags = new ArrayList<>(); + final List eTags = new ArrayList<>(); int index = 1; long bytesInPart = 0L; for (byte[] part : completeParts) { - eTags.add(new PartETag(index, createETag(TEST_OBJECT_NAME, index))); + eTags.add(new FlinkPartETag(index, createETag(TEST_OBJECT_NAME, index))); bytesInPart += part.length; index++; } @@ -201,21 +202,14 @@ private static S3Recoverable createS3Recoverable( private static RecoverableMultiPartUploadImplTest.TestPutObjectResult createPutObjectResult( String key, byte[] content) { - final RecoverableMultiPartUploadImplTest.TestPutObjectResult result = - new RecoverableMultiPartUploadImplTest.TestPutObjectResult(); - result.setETag(createETag(key, -1)); - result.setContent(content); - return result; + return new RecoverableMultiPartUploadImplTest.TestPutObjectResult( + createETag(key, -1), content); } private static RecoverableMultiPartUploadImplTest.TestUploadPartResult createUploadPartResult( String key, int number, byte[] payload) { - final RecoverableMultiPartUploadImplTest.TestUploadPartResult result = - new RecoverableMultiPartUploadImplTest.TestUploadPartResult(); - result.setETag(createETag(key, number)); - result.setPartNumber(number); - result.setContent(payload); - return result; + return new RecoverableMultiPartUploadImplTest.TestUploadPartResult( + number, createETag(key, number), payload); } private static String createMPUploadId(String key) { @@ -297,7 +291,7 @@ public String startMultiPartUpload(String key) throws IOException { } @Override - public UploadPartResult uploadPart( + public FlinkUploadPartResult uploadPart( String key, String uploadId, int partNumber, File inputFile, long length) throws IOException { final byte[] content = @@ -306,7 +300,7 @@ public UploadPartResult uploadPart( } @Override - public PutObjectResult putObject(String key, File inputFile) throws IOException { + public FlinkPutObjectResult putObject(String key, File inputFile) throws IOException { final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(inputFile.length())); return storeAndGetPutObjectResult(key, content); @@ -323,10 +317,10 @@ public long getObject(String key, File targetLocation) throws IOException { } @Override - public CompleteMultipartUploadResult commitMultiPartUpload( + public FlinkCompleteMultipartUploadResult commitMultiPartUpload( String key, String uploadId, - List partETags, + List partETags, long length, AtomicInteger errorCount) throws IOException { @@ -334,7 +328,7 @@ public CompleteMultipartUploadResult commitMultiPartUpload( } @Override - public ObjectMetadata getObjectMetadata(String key) throws IOException { + public FlinkObjectMetadata getObjectMetadata(String key) throws IOException { throw new UnsupportedOperationException(); } @@ -344,37 +338,47 @@ private byte[] getFileContentBytes(File file, int length) throws IOException { return content; } - private RecoverableMultiPartUploadImplTest.TestUploadPartResult storeAndGetUploadPartResult( + private FlinkUploadPartResult storeAndGetUploadPartResult( String key, int number, byte[] payload) { final RecoverableMultiPartUploadImplTest.TestUploadPartResult result = createUploadPartResult(key, number, payload); completePartsUploaded.add(result); - return result; + return result.toFlinkUploadPartResult(); } - private RecoverableMultiPartUploadImplTest.TestPutObjectResult storeAndGetPutObjectResult( - String key, byte[] payload) { + private FlinkPutObjectResult storeAndGetPutObjectResult(String key, byte[] payload) { final RecoverableMultiPartUploadImplTest.TestPutObjectResult result = createPutObjectResult(key, payload); incompletePartsUploaded.add(result); - return result; + return result.toFlinkPutObjectResult(); } } - /** A {@link PutObjectResult} that also contains the actual content of the uploaded part. */ - private static class TestPutObjectResult extends PutObjectResult { - private static final long serialVersionUID = 1L; + /** + * A wrapper for {@link FlinkPutObjectResult} that also contains the actual content of the + * uploaded part. + */ + private static class TestPutObjectResult { + private final FlinkPutObjectResult result; + private final byte[] content; - private byte[] content; + TestPutObjectResult(String eTag, byte[] content) { + this.result = new FlinkPutObjectResult(eTag, null); + this.content = content; + } - void setContent(byte[] payload) { - this.content = payload; + public String getETag() { + return result.getETag(); } public byte[] getContent() { return content; } + public FlinkPutObjectResult toFlinkPutObjectResult() { + return result; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -400,21 +404,35 @@ public String toString() { } } - /** A {@link UploadPartResult} that also contains the actual content of the uploaded part. */ - private static class TestUploadPartResult extends UploadPartResult { + /** + * A wrapper for {@link FlinkUploadPartResult} that also contains the actual content of the + * uploaded part. + */ + private static class TestUploadPartResult { + private final FlinkUploadPartResult result; + private final byte[] content; - private static final long serialVersionUID = 1L; + TestUploadPartResult(int partNumber, String eTag, byte[] content) { + this.result = new FlinkUploadPartResult(partNumber, eTag); + this.content = content; + } - private byte[] content; + public String getETag() { + return result.getETag(); + } - void setContent(byte[] content) { - this.content = content; + public int getPartNumber() { + return result.getPartNumber(); } public byte[] getContent() { return content; } + public FlinkUploadPartResult toFlinkUploadPartResult() { + return result; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializerTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializerTest.java index 58f7697f1bdb1..6e5789bc5a4e8 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializerTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializerTest.java @@ -18,7 +18,8 @@ package org.apache.flink.fs.s3.common.writer; -import com.amazonaws.services.s3.model.PartETag; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; + import org.junit.jupiter.api.Test; import java.io.IOException; @@ -94,15 +95,16 @@ private static void assertThatIsEqualTo( .isEqualTo(expectedRecoverable.incompleteObjectName()); assertThat(actualRecoverable.incompleteObjectLength()) .isEqualTo(expectedRecoverable.incompleteObjectLength()); - assertThat(actualRecoverable.parts().stream().map(PartETag::getETag).toArray()) - .isEqualTo(expectedRecoverable.parts().stream().map(PartETag::getETag).toArray()); + assertThat(actualRecoverable.parts().stream().map(FlinkPartETag::getETag).toArray()) + .isEqualTo( + expectedRecoverable.parts().stream().map(FlinkPartETag::getETag).toArray()); } // --------------------------------- Test Utils --------------------------------- private static S3Recoverable createTestS3Recoverable( boolean withIncompletePart, int... partNumbers) { - List etags = new ArrayList<>(); + List etags = new ArrayList<>(); for (int i : partNumbers) { etags.add(createEtag(i)); } @@ -120,7 +122,7 @@ private static S3Recoverable createTestS3Recoverable( } } - private static PartETag createEtag(int partNumber) { - return new PartETag(partNumber, ETAG_PREFIX + partNumber); + private static FlinkPartETag createEtag(int partNumber) { + return new FlinkPartETag(partNumber, ETAG_PREFIX + partNumber); } } diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java index c93dfb6de4985..3d5311fbe1970 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java @@ -18,111 +18,181 @@ package org.apache.flink.fs.s3hadoop; +import org.apache.flink.fs.s3.common.adapter.v2.AwsSdkV2Adapters; +import org.apache.flink.fs.s3.common.model.FlinkCompleteMultipartUploadResult; +import org.apache.flink.fs.s3.common.model.FlinkObjectMetadata; +import org.apache.flink.fs.s3.common.model.FlinkPartETag; +import org.apache.flink.fs.s3.common.model.FlinkPutObjectResult; +import org.apache.flink.fs.s3.common.model.FlinkUploadPartResult; import org.apache.flink.fs.s3.common.writer.S3AccessHelper; -import org.apache.flink.util.MathUtils; - -import com.amazonaws.SdkBaseException; -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.services.s3.model.UploadPartResult; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.WriteOperationHelper; -import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; -import org.apache.hadoop.fs.store.audit.AuditSpan; -import org.apache.hadoop.fs.store.audit.AuditSpanSource; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Paths; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; -/** An implementation of the {@link S3AccessHelper} for the Hadoop S3A filesystem. */ +/** + * An implementation of the {@link S3AccessHelper} for the Hadoop S3A filesystem. + * + *

This implementation uses the AWS SDK v2 S3Client from Hadoop's S3AFileSystem to perform + * low-level S3 operations required for Flink's recoverable writers. The S3Client is accessed via + * {@link org.apache.hadoop.fs.s3a.S3AInternals#getAmazonS3Client(String)}, which is the recommended + * approach for operations not available through the standard FileSystem API. + */ public class HadoopS3AccessHelper implements S3AccessHelper { private final S3AFileSystem s3a; - private final InternalWriteOperationHelper s3accessHelper; + private final S3Client s3Client; + + private final String bucket; + + private final WriteOperationHelper writeHelper; + + private final PutObjectOptions putOptions; public HadoopS3AccessHelper(S3AFileSystem s3a, Configuration conf) { - checkNotNull(s3a); - this.s3accessHelper = - new InternalWriteOperationHelper( - s3a, - checkNotNull(conf), - s3a.createStoreContext().getInstrumentation(), - s3a.getAuditSpanSource(), - s3a.getActiveAuditSpan()); - this.s3a = s3a; + this.s3a = checkNotNull(s3a); + // Use the S3AInternals API to get the S3Client for low-level operations + // This bypasses some S3A operations (like auditing) but is necessary for + // multipart upload operations that aren't exposed through the FileSystem API + this.s3Client = + s3a.getS3AInternals() + .getAmazonS3Client("Flink recoverable writer multipart upload operations"); + this.bucket = s3a.getBucket(); + this.writeHelper = s3a.getWriteOperationHelper(); + this.putOptions = PutObjectOptions.defaultOptions(); } @Override public String startMultiPartUpload(String key) throws IOException { - return s3accessHelper.initiateMultiPartUpload(key); + try { + CreateMultipartUploadRequest request = + CreateMultipartUploadRequest.builder().bucket(bucket).key(key).build(); + + CreateMultipartUploadResponse response = s3Client.createMultipartUpload(request); + return response.uploadId(); + } catch (SdkException e) { + throw new IOException("Failed to start multipart upload for key: " + key, e); + } } @Override - public UploadPartResult uploadPart( + public FlinkUploadPartResult uploadPart( String key, String uploadId, int partNumber, File inputFile, long length) throws IOException { - final UploadPartRequest uploadRequest = - s3accessHelper.newUploadPartRequest( - key, - uploadId, - partNumber, - MathUtils.checkedDownCast(length), - null, - inputFile, - 0L); - return s3accessHelper.uploadPart(uploadRequest); + try { + UploadPartRequest request = + UploadPartRequest.builder() + .bucket(bucket) + .key(key) + .uploadId(uploadId) + .partNumber(partNumber) + .contentLength(length) + .build(); + + RequestBody requestBody = RequestBody.fromFile(Paths.get(inputFile.getAbsolutePath())); + UploadPartResponse response = s3Client.uploadPart(request, requestBody); + + return AwsSdkV2Adapters.toFlinkUploadPartResult(response, partNumber); + } catch (SdkException e) { + throw new IOException("Failed to upload part " + partNumber + " for key: " + key, e); + } } @Override - public PutObjectResult putObject(String key, File inputFile) throws IOException { - final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputFile); - return s3accessHelper.putObject(putRequest); + public FlinkPutObjectResult putObject(String key, File inputFile) throws IOException { + try { + PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(key).build(); + + RequestBody requestBody = RequestBody.fromFile(Paths.get(inputFile.getAbsolutePath())); + PutObjectResponse response = s3Client.putObject(request, requestBody); + + return AwsSdkV2Adapters.toFlinkPutObjectResult(response); + } catch (SdkException e) { + throw new IOException("Failed to put object for key: " + key, e); + } } @Override - public CompleteMultipartUploadResult commitMultiPartUpload( + public FlinkCompleteMultipartUploadResult commitMultiPartUpload( String destKey, String uploadId, - List partETags, + List partETags, long length, AtomicInteger errorCount) throws IOException { - return s3accessHelper.completeMPUwithRetries( - destKey, uploadId, partETags, length, errorCount); + // Convert Flink part ETags to AWS SDK v2 CompletedPart + final List completedParts = + partETags.stream() + .map(AwsSdkV2Adapters::toAwsCompletedPart) + .collect(Collectors.toList()); + + // Use Hadoop's WriteOperationHelper which provides retry logic, error handling, + // and integration with S3A statistics and auditing + CompleteMultipartUploadResponse response = + writeHelper.completeMPUwithRetries( + destKey, uploadId, completedParts, length, errorCount, putOptions); + + return AwsSdkV2Adapters.toFlinkCompleteMultipartUploadResult(response); } @Override public boolean deleteObject(String key) throws IOException { - return s3a.delete(new org.apache.hadoop.fs.Path('/' + key), false); + try { + DeleteObjectRequest request = + DeleteObjectRequest.builder().bucket(bucket).key(key).build(); + s3Client.deleteObject(request); + return true; + } catch (SdkException e) { + throw new IOException("Failed to delete object for key: " + key, e); + } } @Override public long getObject(String key, File targetLocation) throws IOException { long numBytes = 0L; - try (final OutputStream outStream = new FileOutputStream(targetLocation); - final org.apache.hadoop.fs.FSDataInputStream inStream = - s3a.open(new org.apache.hadoop.fs.Path('/' + key))) { - final byte[] buffer = new byte[32 * 1024]; - - int numRead; - while ((numRead = inStream.read(buffer)) != -1) { - outStream.write(buffer, 0, numRead); - numBytes += numRead; + try (final OutputStream outStream = new FileOutputStream(targetLocation)) { + GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build(); + + try (InputStream inStream = s3Client.getObject(request)) { + final byte[] buffer = new byte[32 * 1024]; + int numRead; + while ((numRead = inStream.read(buffer)) != -1) { + outStream.write(buffer, 0, numRead); + numBytes += numRead; + } } + } catch (SdkException e) { + throw new IOException("Failed to get object for key: " + key, e); } // some sanity checks @@ -139,27 +209,13 @@ public long getObject(String key, File targetLocation) throws IOException { } @Override - public ObjectMetadata getObjectMetadata(String key) throws IOException { + public FlinkObjectMetadata getObjectMetadata(String key) throws IOException { try { - return s3a.getObjectMetadata(new Path('/' + key)); - } catch (SdkBaseException e) { + HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build(); + HeadObjectResponse response = s3Client.headObject(request); + return AwsSdkV2Adapters.toFlinkObjectMetadata(response); + } catch (SdkException e) { throw S3AUtils.translateException("getObjectMetadata", key, e); } } - - /** - * Internal {@link WriteOperationHelper} that is wrapped so that it only exposes the - * functionality we need for the {@link S3AccessHelper}. - */ - private static final class InternalWriteOperationHelper extends WriteOperationHelper { - - InternalWriteOperationHelper( - S3AFileSystem owner, - Configuration conf, - S3AStatisticsContext statisticsContext, - AuditSpanSource auditSpanSource, - AuditSpan auditSpan) { - super(owner, conf, statisticsContext, auditSpanSource, auditSpan); - } - } } diff --git a/flink-filesystems/pom.xml b/flink-filesystems/pom.xml index 25e4c6907ae4d..32e5f52c0dae7 100644 --- a/flink-filesystems/pom.xml +++ b/flink-filesystems/pom.xml @@ -34,7 +34,7 @@ under the License. pom - 3.3.4 + 3.4.2