From fd5f65afd5447204241a7f0bc215eb7c7e999bf0 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 29 Apr 2026 16:48:52 +0200 Subject: [PATCH 01/10] [FLINK-39113][s3] Fix s3.sse.kms.encryption-context config in native s3 connector --- .../flink-s3-fs-native/README.md | 5 +- .../s3native/NativeS3FileSystemFactory.java | 18 +- .../flink/fs/s3native/S3ClientProvider.java | 1 + .../flink/fs/s3native/S3EncryptionConfig.java | 13 +- .../fs/s3native/S3BlockLocationTest.java | 46 +++++ .../fs/s3native/S3EncryptionConfigTest.java | 193 ++++++++++++++++++ .../fs/s3native/S3ExceptionUtilsTest.java | 153 ++++++++++++++ .../flink/fs/s3native/S3FileStatusTest.java | 77 +++++++ 8 files changed, 496 insertions(+), 10 deletions(-) create mode 100644 flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java create mode 100644 flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java create mode 100644 flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java create mode 100644 flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java diff --git a/flink-filesystems/flink-s3-fs-native/README.md b/flink-filesystems/flink-s3-fs-native/README.md index 7f6cfc56bedb6..6dcf2c74ff020 100644 --- a/flink-filesystems/flink-s3-fs-native/README.md +++ b/flink-filesystems/flink-s3-fs-native/README.md @@ -79,6 +79,7 @@ input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"), |-----|---------|-------------| | s3.sse.type | none | Encryption type: `none`, `sse-s3` (AES256), `sse-kms` (AWS KMS) | | s3.sse.kms.key-id | (none) | KMS key ID/ARN/alias for SSE-KMS (uses default aws/s3 key if not specified) | +| s3.sse.kms.encryption-context | (none) | Encryption context key-value pairs for SSE-KMS. Format: `key1:value1,key2:value2`. Keys/values containing `:` must be quoted. | ### IAM Assume Role @@ -128,9 +129,7 @@ This enables IAM policies to restrict access based on context values: ```yaml s3.sse.type: sse-kms s3.sse.kms.key-id: alias/my-key -# Configure encryption context as key-value pairs -s3.sse.kms.encryption-context.department: finance -s3.sse.kms.encryption-context.project: budget-reports +s3.sse.kms.encryption-context: {"aws:s3:arn": "arn:aws:s3:::my-bucket/my-file"} ``` With encryption context, you can create IAM policies like: diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java index 21f0dc0319ef9..b7c0de3acfc06 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java @@ -34,6 +34,8 @@ import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.util.Collections; +import java.util.Map; /** * Factory for creating Native S3 FileSystem instances. @@ -190,6 +192,16 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { + "Example: 'arn:aws:kms:us-east-1:123456789:key/12345678-1234-1234-1234-123456789abc' " + "or 'alias/my-s3-key'"); + public static final ConfigOption> SSE_KMS_ENCRYPTION_CONTEXT = + ConfigOptions.key("s3.sse.kms.encryption-context") + .mapType() + .noDefaultValue() + .withDescription( + "Encryption context key-value pairs for SSE-KMS. " + + "Provides additional authenticated data and enables " + + "fine-grained IAM policy conditions. " + + "Format: 'key1:value1,key2:value2'"); + // IAM Assume Role Configuration public static final ConfigOption ASSUME_ROLE_ARN = ConfigOptions.key("s3.assume-role.arn") @@ -312,7 +324,11 @@ public FileSystem create(URI fsUri) throws IOException { boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS); S3EncryptionConfig encryptionConfig = - S3EncryptionConfig.fromConfig(config.get(SSE_TYPE), config.get(SSE_KMS_KEY_ID)); + S3EncryptionConfig.fromConfig( + config.get(SSE_TYPE), + config.get(SSE_KMS_KEY_ID), + config.getOptional(SSE_KMS_ENCRYPTION_CONTEXT) + .orElse(Collections.emptyMap())); String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION); int numEntropyChars = -1; if (entropyInjectionKey != null) { diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java index 656a3f567dae4..635b490eab3e5 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java @@ -135,6 +135,7 @@ public S3EncryptionConfig getEncryptionConfig() { } @VisibleForTesting + @Nullable AwsCredentialsProvider getCredentialsProvider() { return credentialsProvider; } diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java index 67e869ab99269..8462bacf552a8 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java @@ -26,7 +26,6 @@ import java.io.Serializable; import java.util.Collections; -import java.util.HashMap; import java.util.Map; /** @@ -80,9 +79,7 @@ private S3EncryptionConfig( this.encryptionType = encryptionType; this.kmsKeyId = kmsKeyId; this.encryptionContext = - encryptionContext != null - ? Collections.unmodifiableMap(new HashMap<>(encryptionContext)) - : Collections.emptyMap(); + encryptionContext != null ? Map.copyOf(encryptionContext) : Collections.emptyMap(); } /** Creates a config with no encryption. */ @@ -148,7 +145,9 @@ public static S3EncryptionConfig sseKms( * @throws IllegalArgumentException if the encryption type is invalid */ public static S3EncryptionConfig fromConfig( - @Nullable String encryptionTypeStr, @Nullable String kmsKeyId) { + @Nullable String encryptionTypeStr, + @Nullable String kmsKeyId, + Map encryptionContext) { if (encryptionTypeStr == null || encryptionTypeStr.isEmpty() || "none".equalsIgnoreCase(encryptionTypeStr)) { @@ -163,7 +162,9 @@ public static S3EncryptionConfig fromConfig( return sseS3(); case "SSE_KMS": case "AWS_KMS": - return kmsKeyId != null && !kmsKeyId.isEmpty() ? sseKms(kmsKeyId) : sseKms(); + return kmsKeyId != null && !kmsKeyId.isEmpty() + ? sseKms(kmsKeyId, encryptionContext) + : sseKms(); default: throw new IllegalArgumentException( "Unknown encryption type: " diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java new file mode 100644 index 0000000000000..3564a3a3015c4 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link S3BlockLocation}. */ +class S3BlockLocationTest { + + @Test + void testAccessors() { + S3BlockLocation loc = new S3BlockLocation(new String[] {"localhost"}, 100L, 512L); + + assertThat(loc.getHosts()).containsExactly("localhost"); + assertThat(loc.getOffset()).isEqualTo(100L); + assertThat(loc.getLength()).isEqualTo(512L); + } + + @Test + void testCompareToOrdersByOffset() { + S3BlockLocation first = new S3BlockLocation(new String[] {"localhost"}, 0L, 100L); + S3BlockLocation second = new S3BlockLocation(new String[] {"localhost"}, 100L, 100L); + + assertThat(first.compareTo(second)).isNegative(); + assertThat(second.compareTo(first)).isPositive(); + assertThat(first.compareTo(first)).isZero(); + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java new file mode 100644 index 0000000000000..b9aedfe4e8e13 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.services.s3.model.ServerSideEncryption; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.NONE; +import static org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_KMS; +import static org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_S3; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link S3EncryptionConfig}. */ +class S3EncryptionConfigTest { + + @ParameterizedTest + @MethodSource + void noArgFactories_encryptionTypeCorrect( + S3EncryptionConfig config, + S3EncryptionConfig.EncryptionType expectedType, + boolean expectedEnabled, + ServerSideEncryption expectedSse) { + assertThat(config.getEncryptionType()).isEqualTo(expectedType); + assertThat(config.isEnabled()).isEqualTo(expectedEnabled); + assertThat(config.getKmsKeyId()).isNull(); + assertThat(config.getServerSideEncryption()).isEqualTo(expectedSse); + } + + static Stream noArgFactories_encryptionTypeCorrect() { + return Stream.of( + Arguments.of(S3EncryptionConfig.none(), NONE, false, null), + Arguments.of( + S3EncryptionConfig.sseS3(), SSE_S3, true, ServerSideEncryption.AES256), + Arguments.of( + S3EncryptionConfig.sseKms(), SSE_KMS, true, ServerSideEncryption.AWS_KMS)); + } + + @Test + void sseKms_withKeyId_keyIdStoredAndEnabled() { + S3EncryptionConfig c = S3EncryptionConfig.sseKms("arn:aws:kms:us-east-1:123:key/abc"); + + assertThat(c.getKmsKeyId()).isEqualTo("arn:aws:kms:us-east-1:123:key/abc"); + assertThat(c.isEnabled()).isTrue(); + } + + @Test + void sseKms_withContext_contextStoredDefensively() { + Map ctx = new HashMap<>(Map.of("dept", "finance")); + S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", ctx); + ctx.put("extra", "value"); + + assertThat(c.getEncryptionContext()).isEqualTo(Map.of("dept", "finance")); + assertThat(c.hasEncryptionContext()).isTrue(); + } + + @Test + void sseKms_nullContext_contextIsEmpty() { + S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", null); + + assertThat(c.getEncryptionContext()).isEmpty(); + assertThat(c.hasEncryptionContext()).isFalse(); + } + + @ParameterizedTest + @MethodSource + void fromConfig_typeVariants_returnExpectedType( + String input, S3EncryptionConfig.EncryptionType expected) { + assertThat( + S3EncryptionConfig.fromConfig(input, null, Collections.emptyMap()) + .getEncryptionType()) + .isEqualTo(expected); + } + + static Stream fromConfig_typeVariants_returnExpectedType() { + return Stream.of( + Arguments.of(null, NONE), + Arguments.of("", NONE), + Arguments.of("none", NONE), + Arguments.of("NONE", NONE), + Arguments.of("sse-s3", SSE_S3), + Arguments.of("AES256", SSE_S3), + Arguments.of("sse-kms", SSE_KMS), + Arguments.of("aws:kms", SSE_KMS)); + } + + @Test + void fromConfig_sseKmsWithKeyAndContext_keyAndContextPreserved() { + S3EncryptionConfig result = + S3EncryptionConfig.fromConfig("sse-kms", "my-key", Map.of("env", "prod")); + + assertThat(result.getKmsKeyId()).isEqualTo("my-key"); + assertThat(result.getEncryptionContext()).isEqualTo(Map.of("env", "prod")); + } + + @ParameterizedTest + @MethodSource + void fromConfig_sseKmsWithNoKeyId_keyIdIsNull(String kmsKeyId) { + assertThat( + S3EncryptionConfig.fromConfig("sse-kms", kmsKeyId, Collections.emptyMap()) + .getKmsKeyId()) + .isNull(); + } + + static Stream fromConfig_sseKmsWithNoKeyId_keyIdIsNull() { + return Stream.of(Arguments.of((Object) null), Arguments.of("")); + } + + @Test + void fromConfig_sseKmsNoKeyIdWithContext_contextIgnored() { + assertThat( + S3EncryptionConfig.fromConfig("sse-kms", null, Map.of("env", "prod")) + .hasEncryptionContext()) + .isFalse(); + } + + @Test + void fromConfig_unknownType_throwsIllegalArgument() { + assertThatThrownBy( + () -> + S3EncryptionConfig.fromConfig( + "invalid-type", null, Collections.emptyMap())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("invalid-type"); + } + + @Test + void toString_noKeyOrContext_containsTypeOnly() { + S3EncryptionConfig c = S3EncryptionConfig.none(); + + assertThat(c.toString()).contains("NONE"); + assertThat(c.toString()).doesNotContain("kmsKeyId"); + assertThat(c.toString()).doesNotContain("encryptionContext"); + } + + @Test + void toString_withKeyId_includesKeyId() { + S3EncryptionConfig c = S3EncryptionConfig.sseKms("my-key"); + + assertThat(c.toString()).contains("my-key"); + } + + @Test + void toString_withContext_includesContextKeys() { + S3EncryptionConfig c = S3EncryptionConfig.sseKms("k", Map.of("dept", "finance")); + + assertThat(c.toString()).contains("dept"); + } + + @Test + void serialization_roundTrip_preservesAllFields() throws Exception { + S3EncryptionConfig original = S3EncryptionConfig.sseKms("key-id", Map.of("k", "v")); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + new ObjectOutputStream(bos).writeObject(original); + S3EncryptionConfig copy = + (S3EncryptionConfig) + new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())) + .readObject(); + + assertThat(copy.getEncryptionType()).isEqualTo(original.getEncryptionType()); + assertThat(copy.getKmsKeyId()).isEqualTo(original.getKmsKeyId()); + assertThat(copy.getEncryptionContext()).isEqualTo(original.getEncryptionContext()); + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java new file mode 100644 index 0000000000000..02e6727e4e18d --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.io.IOException; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link S3ExceptionUtils}. */ +class S3ExceptionUtilsTest { + + static Stream toIoExceptionCases() { + return Stream.of( + Arguments.of( + s3Exception( + 404, + AwsErrorDetails.builder() + .errorMessage("key not found") + .build()), + "delete object", + "delete object (HTTP 404: key not found)"), + Arguments.of( + s3Exception(500, "internal error"), + "put object", + "put object (HTTP 500: internal error")); + } + + @ParameterizedTest + @MethodSource("toIoExceptionCases") + void toIOException_contextAndStatusCode_formatsMessageAndPreservesCause( + S3Exception cause, String context, String expectedMessage) { + IOException result = S3ExceptionUtils.toIOException(context, cause); + + assertThat(result.getMessage()).contains(expectedMessage); + assertThat(result.getCause()).isSameAs(cause); + } + + @Test + void errorMessage_detailsHasMessage_returnsDetailsMessage() { + S3Exception e = + s3Exception( + 404, + AwsErrorDetails.builder() + .errorMessage("The specified key does not exist.") + .build()); + + assertThat(S3ExceptionUtils.errorMessage(e)) + .isEqualTo("The specified key does not exist."); + } + + static Stream errorMessageFallbackCases() { + return Stream.of( + Arguments.of(s3Exception(500, "connection timeout"), "connection timeout"), + Arguments.of( + s3ExceptionWithMessageAndDetails( + 500, + "fallback message", + AwsErrorDetails.builder().errorCode("InternalError").build()), + "fallback message")); + } + + @ParameterizedTest + @MethodSource("errorMessageFallbackCases") + void errorMessage_noDetailsMessage_fallsBackToExceptionMessage( + S3Exception e, String expectedMessage) { + assertThat(S3ExceptionUtils.errorMessage(e)).contains(expectedMessage); + } + + @Test + void errorMessage_noMessageAvailable_returnsUnknownS3Error() { + S3Exception e = s3ExceptionStatusOnly(500); + + assertThat(S3ExceptionUtils.errorMessage(e)).isEqualTo("Unknown S3 error"); + } + + @Test + void errorCode_detailsHasCode_returnsErrorCode() { + S3Exception e = + s3Exception(404, AwsErrorDetails.builder().errorCode("NoSuchKey").build()); + + assertThat(S3ExceptionUtils.errorCode(e)).isEqualTo("NoSuchKey"); + } + + static Stream errorCodeUnknownCases() { + return Stream.of( + Arguments.of(s3Exception(500, "some error")), + Arguments.of( + s3Exception( + 500, + AwsErrorDetails.builder() + .errorMessage("Something went wrong") + .build()))); + } + + @ParameterizedTest + @MethodSource("errorCodeUnknownCases") + void errorCode_noCodeAvailable_returnsUnknown(S3Exception e) { + assertThat(S3ExceptionUtils.errorCode(e)).isEqualTo("Unknown"); + } + + private static S3Exception s3Exception(int statusCode, AwsErrorDetails details) { + S3Exception.Builder b = S3Exception.builder(); + b.statusCode(statusCode); + b.awsErrorDetails(details); + return (S3Exception) b.build(); + } + + private static S3Exception s3Exception(int statusCode, String message) { + S3Exception.Builder b = S3Exception.builder(); + b.statusCode(statusCode); + b.message(message); + return (S3Exception) b.build(); + } + + private static S3Exception s3ExceptionWithMessageAndDetails( + int statusCode, String message, AwsErrorDetails details) { + S3Exception.Builder b = S3Exception.builder(); + b.statusCode(statusCode); + b.message(message); + b.awsErrorDetails(details); + return (S3Exception) b.build(); + } + + private static S3Exception s3ExceptionStatusOnly(int statusCode) { + S3Exception.Builder b = S3Exception.builder(); + b.statusCode(statusCode); + return (S3Exception) b.build(); + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java new file mode 100644 index 0000000000000..aab9625898920 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.core.fs.Path; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link S3FileStatus}. */ +class S3FileStatusTest { + + private static final Path PATH = new Path("s3://bucket/key"); + + static Stream fileSizeAndModTime() { + return Stream.of( + Arguments.of(0L, 0L), + Arguments.of(1024L, 987654321L), + Arguments.of(Long.MAX_VALUE, Long.MAX_VALUE)); + } + + @ParameterizedTest + @MethodSource("fileSizeAndModTime") + void withFile_variousSizes_allFieldsSetCorrectly(long size, long modTime) { + S3FileStatus s = S3FileStatus.withFile(size, modTime, PATH); + + assertThat(s.getLen()).isEqualTo(size); + assertThat(s.getBlockSize()).isEqualTo(size); + assertThat(s.getModificationTime()).isEqualTo(modTime); + assertThat(s.getAccessTime()).isEqualTo(0L); + assertThat(s.isDir()).isFalse(); + assertThat(s.getPath()).isEqualTo(PATH); + assertThat(s.getReplication()).isEqualTo((short) 1); + } + + @Test + void withDirectory_anyPath_allFieldsSetCorrectly() { + S3FileStatus s = S3FileStatus.withDirectory(PATH); + + assertThat(s.getLen()).isEqualTo(0L); + assertThat(s.getBlockSize()).isEqualTo(0L); + assertThat(s.getModificationTime()).isEqualTo(0L); + assertThat(s.getAccessTime()).isEqualTo(0L); + assertThat(s.isDir()).isTrue(); + assertThat(s.getPath()).isEqualTo(PATH); + assertThat(s.getReplication()).isEqualTo((short) 1); + } + + @Test + void constructor_nonZeroAccessTime_accessTimeIsPreserved() { + S3FileStatus s = new S3FileStatus(1024L, 1024L, 100L, 999L, false, PATH); + + assertThat(s.getAccessTime()).isEqualTo(999L); + } +} From 69d4381e7ebe18641030da1e53a96c2ec3f6b36e Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 30 Apr 2026 14:33:51 +0200 Subject: [PATCH 02/10] [FLINK-39113][s3] Address review feedback for native s3 connector encryption context - Switch fromConfig() normalization to toLowerCase(Locale.ROOT), removing the SSE_KMS placeholder case - Add sseKms(Map) factory overload so encryption context is preserved when using the default AWS-managed key - Revert Map.copyOf() to null-tolerant unmodifiableMap(new HashMap<>()) --- .../flink/fs/s3native/S3EncryptionConfig.java | 31 +++++++++++++------ .../fs/s3native/S3EncryptionConfigTest.java | 7 ++--- .../fs/s3native/S3ExceptionUtilsTest.java | 10 ++---- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java index 8462bacf552a8..88d7d42baa70b 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java @@ -19,6 +19,7 @@ package org.apache.flink.fs.s3native; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.StringUtils; import software.amazon.awssdk.services.s3.model.ServerSideEncryption; @@ -26,6 +27,8 @@ import java.io.Serializable; import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; import java.util.Map; /** @@ -79,7 +82,9 @@ private S3EncryptionConfig( this.encryptionType = encryptionType; this.kmsKeyId = kmsKeyId; this.encryptionContext = - encryptionContext != null ? Map.copyOf(encryptionContext) : Collections.emptyMap(); + encryptionContext != null + ? Collections.unmodifiableMap(new HashMap<>(encryptionContext)) + : Collections.emptyMap(); } /** Creates a config with no encryption. */ @@ -102,8 +107,15 @@ public static S3EncryptionConfig sseKms() { } /** - * Creates a config for SSE-KMS encryption with a specific KMS key. + * Creates a config for SSE-KMS encryption with the default KMS key and an encryption context. * + * @param encryptionContext The encryption context key-value pairs + */ + public static S3EncryptionConfig sseKms(Map encryptionContext) { + return new S3EncryptionConfig(EncryptionType.SSE_KMS, null, encryptionContext); + } + + /** * @param kmsKeyId The KMS key ID, ARN, or alias (e.g., "arn:aws:kms:region:account:key/key-id" * or "alias/my-key") */ @@ -148,23 +160,22 @@ public static S3EncryptionConfig fromConfig( @Nullable String encryptionTypeStr, @Nullable String kmsKeyId, Map encryptionContext) { - if (encryptionTypeStr == null - || encryptionTypeStr.isEmpty() + if (StringUtils.isNullOrWhitespaceOnly(encryptionTypeStr) || "none".equalsIgnoreCase(encryptionTypeStr)) { return none(); } - String normalizedType = encryptionTypeStr.toUpperCase().replace("-", "_").replace(":", "_"); + String normalizedType = encryptionTypeStr.toLowerCase(Locale.ROOT); switch (normalizedType) { - case "SSE_S3": - case "AES256": + case "sse-s3": + case "aes256": return sseS3(); - case "SSE_KMS": - case "AWS_KMS": + case "sse-kms": + case "aws:kms": return kmsKeyId != null && !kmsKeyId.isEmpty() ? sseKms(kmsKeyId, encryptionContext) - : sseKms(); + : sseKms(encryptionContext); default: throw new IllegalArgumentException( "Unknown encryption type: " diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java index b9aedfe4e8e13..7aff29a37bde1 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java @@ -58,8 +58,7 @@ void noArgFactories_encryptionTypeCorrect( static Stream noArgFactories_encryptionTypeCorrect() { return Stream.of( Arguments.of(S3EncryptionConfig.none(), NONE, false, null), - Arguments.of( - S3EncryptionConfig.sseS3(), SSE_S3, true, ServerSideEncryption.AES256), + Arguments.of(S3EncryptionConfig.sseS3(), SSE_S3, true, ServerSideEncryption.AES256), Arguments.of( S3EncryptionConfig.sseKms(), SSE_KMS, true, ServerSideEncryption.AWS_KMS)); } @@ -135,11 +134,11 @@ static Stream fromConfig_sseKmsWithNoKeyId_keyIdIsNull() { } @Test - void fromConfig_sseKmsNoKeyIdWithContext_contextIgnored() { + void fromConfig_sseKmsDefaultKeyWithContext_contextPreserved() { assertThat( S3EncryptionConfig.fromConfig("sse-kms", null, Map.of("env", "prod")) .hasEncryptionContext()) - .isFalse(); + .isTrue(); } @Test diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java index 02e6727e4e18d..0f114affd4231 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java @@ -38,9 +38,7 @@ static Stream toIoExceptionCases() { Arguments.of( s3Exception( 404, - AwsErrorDetails.builder() - .errorMessage("key not found") - .build()), + AwsErrorDetails.builder().errorMessage("key not found").build()), "delete object", "delete object (HTTP 404: key not found)"), Arguments.of( @@ -68,8 +66,7 @@ void errorMessage_detailsHasMessage_returnsDetailsMessage() { .errorMessage("The specified key does not exist.") .build()); - assertThat(S3ExceptionUtils.errorMessage(e)) - .isEqualTo("The specified key does not exist."); + assertThat(S3ExceptionUtils.errorMessage(e)).isEqualTo("The specified key does not exist."); } static Stream errorMessageFallbackCases() { @@ -99,8 +96,7 @@ void errorMessage_noMessageAvailable_returnsUnknownS3Error() { @Test void errorCode_detailsHasCode_returnsErrorCode() { - S3Exception e = - s3Exception(404, AwsErrorDetails.builder().errorCode("NoSuchKey").build()); + S3Exception e = s3Exception(404, AwsErrorDetails.builder().errorCode("NoSuchKey").build()); assertThat(S3ExceptionUtils.errorCode(e)).isEqualTo("NoSuchKey"); } From 6ecf80aa8e7c33eb14fabefb6460a0e88dc069ed Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 30 Apr 2026 14:42:53 +0200 Subject: [PATCH 03/10] [FLINK-39113][s3] Fix missing encryption context in NativeS3OutputStream write path Move serializeEncryptionContext to S3EncryptionConfig and apply it in NativeS3OutputStream.uploadToS3(), which previously dropped the KMS encryption context for small-file (non-multipart) writes. --- .../fs/s3native/NativeS3OutputStream.java | 11 ++++-- .../flink/fs/s3native/S3EncryptionConfig.java | 23 ++++++++++++ .../writer/NativeS3ObjectOperations.java | 35 +++---------------- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java index d125d05fd2715..ac3562f5e7252 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java @@ -195,9 +195,14 @@ private void uploadToS3() throws IOException { if (encryptionConfig.isEnabled()) { putRequestBuilder.serverSideEncryption(encryptionConfig.getServerSideEncryption()); if (encryptionConfig.getEncryptionType() - == S3EncryptionConfig.EncryptionType.SSE_KMS - && encryptionConfig.getKmsKeyId() != null) { - putRequestBuilder.ssekmsKeyId(encryptionConfig.getKmsKeyId()); + == S3EncryptionConfig.EncryptionType.SSE_KMS) { + if (encryptionConfig.getKmsKeyId() != null) { + putRequestBuilder.ssekmsKeyId(encryptionConfig.getKmsKeyId()); + } + if (encryptionConfig.hasEncryptionContext()) { + putRequestBuilder.ssekmsEncryptionContext( + encryptionConfig.serializeEncryptionContext()); + } } } diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java index 88d7d42baa70b..7b7b54f70dac0 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.Locale; @@ -232,6 +233,28 @@ public ServerSideEncryption getServerSideEncryption() { } } + public String serializeEncryptionContext() { + StringBuilder json = new StringBuilder("{"); + boolean first = true; + for (Map.Entry entry : encryptionContext.entrySet()) { + if (!first) { + json.append(","); + } + json.append("\"") + .append(escapeJson(entry.getKey())) + .append("\":\"") + .append(escapeJson(entry.getValue())) + .append("\""); + first = false; + } + json.append("}"); + return Base64.getEncoder().encodeToString(json.toString().getBytes()); + } + + private String escapeJson(String value) { + return value.replace("\\", "\\\\").replace("\"", "\\\""); + } + @Override public String toString() { StringBuilder sb = new StringBuilder("S3EncryptionConfig{type=").append(encryptionType); diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java index 01bc660f3ac79..14ff1eeab12b8 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java @@ -85,7 +85,6 @@ *
    *
  • SSE-C (customer-provided keys) via a KeyProvider interface *
  • Client-side encryption via an EncryptionHandler interface - *
  • Encryption context for SSE-KMS (see HADOOP-19197) *
* *

S3 URI Handling: The {@link #extractKey(Path)} and {@link #extractBucketName(Path)} @@ -155,7 +154,7 @@ private void applyEncryption(CreateMultipartUploadRequest.Builder requestBuilder } if (encryptionConfig.hasEncryptionContext()) { requestBuilder.ssekmsEncryptionContext( - serializeEncryptionContext(encryptionConfig.getEncryptionContext())); + encryptionConfig.serializeEncryptionContext()); } } } @@ -171,36 +170,11 @@ private void applyEncryption(PutObjectRequest.Builder requestBuilder) { } if (encryptionConfig.hasEncryptionContext()) { requestBuilder.ssekmsEncryptionContext( - serializeEncryptionContext(encryptionConfig.getEncryptionContext())); + encryptionConfig.serializeEncryptionContext()); } } } - /** - * Serializes the encryption context map to a Base64-encoded JSON string as required by S3 API. - */ - private String serializeEncryptionContext(java.util.Map context) { - StringBuilder json = new StringBuilder("{"); - boolean first = true; - for (java.util.Map.Entry entry : context.entrySet()) { - if (!first) { - json.append(","); - } - json.append("\"") - .append(escapeJson(entry.getKey())) - .append("\":\"") - .append(escapeJson(entry.getValue())) - .append("\""); - first = false; - } - json.append("}"); - return java.util.Base64.getEncoder().encodeToString(json.toString().getBytes()); - } - - private String escapeJson(String value) { - return value.replace("\\", "\\\\").replace("\"", "\\\""); - } - public UploadPartResult uploadPart( String key, String uploadId, int partNumber, File inputFile, long length) throws IOException { @@ -275,9 +249,8 @@ private PutObjectResult putObjectViaTransferManager(String key, File inputFile) } if (encryptionConfig.hasEncryptionContext()) { req.ssekmsEncryptionContext( - serializeEncryptionContext( - encryptionConfig - .getEncryptionContext())); + encryptionConfig + .serializeEncryptionContext()); } } } From 675c6ade87ba756f330156df9eb54fbd6c7bab00 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 30 Apr 2026 14:44:41 +0200 Subject: [PATCH 04/10] [FLINK-39113][s3] Fix stale Javadoc on fromConfig after normalization change --- .../java/org/apache/flink/fs/s3native/S3EncryptionConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java index 7b7b54f70dac0..2e34d89694303 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java @@ -151,8 +151,8 @@ public static S3EncryptionConfig sseKms( /** * Creates an encryption config from configuration strings. * - * @param encryptionTypeStr The encryption type: "none", "sse-s3", "sse-kms", or "SSE_S3", - * "SSE_KMS" + * @param encryptionTypeStr The encryption type: "none", "sse-s3", "sse-kms", "aws:kms", + * "aes256" (case-insensitive) * @param kmsKeyId The KMS key ID (required for SSE-KMS, ignored for other types) * @return The encryption configuration * @throws IllegalArgumentException if the encryption type is invalid From 98dd5a7a24eb827f5e84e239b03ccf2af34f2642 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 30 Apr 2026 16:32:34 +0200 Subject: [PATCH 05/10] Test simplification and restructuring --- .../fs/s3native/S3EncryptionConfigTest.java | 145 ++++++++++++------ .../fs/s3native/S3ExceptionUtilsTest.java | 59 ++++--- 2 files changed, 123 insertions(+), 81 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java index 7aff29a37bde1..0c505da476433 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java @@ -28,6 +28,7 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -42,35 +43,6 @@ /** Tests for {@link S3EncryptionConfig}. */ class S3EncryptionConfigTest { - @ParameterizedTest - @MethodSource - void noArgFactories_encryptionTypeCorrect( - S3EncryptionConfig config, - S3EncryptionConfig.EncryptionType expectedType, - boolean expectedEnabled, - ServerSideEncryption expectedSse) { - assertThat(config.getEncryptionType()).isEqualTo(expectedType); - assertThat(config.isEnabled()).isEqualTo(expectedEnabled); - assertThat(config.getKmsKeyId()).isNull(); - assertThat(config.getServerSideEncryption()).isEqualTo(expectedSse); - } - - static Stream noArgFactories_encryptionTypeCorrect() { - return Stream.of( - Arguments.of(S3EncryptionConfig.none(), NONE, false, null), - Arguments.of(S3EncryptionConfig.sseS3(), SSE_S3, true, ServerSideEncryption.AES256), - Arguments.of( - S3EncryptionConfig.sseKms(), SSE_KMS, true, ServerSideEncryption.AWS_KMS)); - } - - @Test - void sseKms_withKeyId_keyIdStoredAndEnabled() { - S3EncryptionConfig c = S3EncryptionConfig.sseKms("arn:aws:kms:us-east-1:123:key/abc"); - - assertThat(c.getKmsKeyId()).isEqualTo("arn:aws:kms:us-east-1:123:key/abc"); - assertThat(c.isEnabled()).isTrue(); - } - @Test void sseKms_withContext_contextStoredDefensively() { Map ctx = new HashMap<>(Map.of("dept", "finance")); @@ -89,6 +61,46 @@ void sseKms_nullContext_contextIsEmpty() { assertThat(c.hasEncryptionContext()).isFalse(); } + @ParameterizedTest + @MethodSource + void sseKms_contextOnlyFactory_absentContext_hasEncryptionContextFalse( + Map context) { + S3EncryptionConfig c = S3EncryptionConfig.sseKms(context); + + assertThat(c.getEncryptionContext()).isEmpty(); + assertThat(c.hasEncryptionContext()).isFalse(); + } + + static Stream sseKms_contextOnlyFactory_absentContext_hasEncryptionContextFalse() { + return Stream.of(Arguments.of(Collections.emptyMap()), Arguments.of((Object) null)); + } + + @Test + void sseKms_contextOnlyFactory_contextMutatedAfterCreation_contextUnchanged() { + Map ctx = new HashMap<>(Map.of("dept", "finance")); + S3EncryptionConfig c = S3EncryptionConfig.sseKms(ctx); + ctx.put("extra", "value"); + + assertThat(c.getEncryptionContext()).isEqualTo(Map.of("dept", "finance")); + } + + @ParameterizedTest + @MethodSource + void getServerSideEncryption_allTypes_returnsCorrectSseValue( + String configType, ServerSideEncryption expected) { + S3EncryptionConfig c = + S3EncryptionConfig.fromConfig(configType, null, Collections.emptyMap()); + + assertThat(c.getServerSideEncryption()).isEqualTo(expected); + } + + static Stream getServerSideEncryption_allTypes_returnsCorrectSseValue() { + return Stream.of( + Arguments.of(null, null), + Arguments.of("sse-s3", ServerSideEncryption.AES256), + Arguments.of("sse-kms", ServerSideEncryption.AWS_KMS)); + } + @ParameterizedTest @MethodSource void fromConfig_typeVariants_returnExpectedType( @@ -105,6 +117,7 @@ static Stream fromConfig_typeVariants_returnExpectedType() { Arguments.of("", NONE), Arguments.of("none", NONE), Arguments.of("NONE", NONE), + Arguments.of(" ", NONE), Arguments.of("sse-s3", SSE_S3), Arguments.of("AES256", SSE_S3), Arguments.of("sse-kms", SSE_KMS), @@ -141,6 +154,15 @@ void fromConfig_sseKmsDefaultKeyWithContext_contextPreserved() { .isTrue(); } + @Test + void fromConfig_sseS3WithContext_contextIgnored() { + S3EncryptionConfig c = + S3EncryptionConfig.fromConfig("sse-s3", null, Map.of("dept", "finance")); + + assertThat(c.getEncryptionType()).isEqualTo(SSE_S3); + assertThat(c.getEncryptionContext()).isEmpty(); + } + @Test void fromConfig_unknownType_throwsIllegalArgument() { assertThatThrownBy( @@ -151,42 +173,67 @@ void fromConfig_unknownType_throwsIllegalArgument() { .hasMessageContaining("invalid-type"); } - @Test - void toString_noKeyOrContext_containsTypeOnly() { - S3EncryptionConfig c = S3EncryptionConfig.none(); + @ParameterizedTest + @MethodSource + void serializeEncryptionContext_exactOutput_correctBase64Json( + Map context, String expectedDecoded) { + S3EncryptionConfig c = S3EncryptionConfig.sseKms(context); + String decoded = new String(Base64.getDecoder().decode(c.serializeEncryptionContext())); - assertThat(c.toString()).contains("NONE"); - assertThat(c.toString()).doesNotContain("kmsKeyId"); - assertThat(c.toString()).doesNotContain("encryptionContext"); + assertThat(decoded).isEqualTo(expectedDecoded); + } + + static Stream serializeEncryptionContext_exactOutput_correctBase64Json() { + return Stream.of( + Arguments.of(Collections.emptyMap(), "{}"), + Arguments.of(Map.of("k", "v"), "{\"k\":\"v\"}")); } @Test - void toString_withKeyId_includesKeyId() { - S3EncryptionConfig c = S3EncryptionConfig.sseKms("my-key"); + void serializeEncryptionContext_multipleEntries_allEntriesPresent() { + S3EncryptionConfig c = S3EncryptionConfig.sseKms(Map.of("k1", "v1", "k2", "v2")); + String decoded = new String(Base64.getDecoder().decode(c.serializeEncryptionContext())); - assertThat(c.toString()).contains("my-key"); + assertThat(decoded).contains("\"k1\":\"v1\"", "\"k2\":\"v2\""); } - @Test - void toString_withContext_includesContextKeys() { - S3EncryptionConfig c = S3EncryptionConfig.sseKms("k", Map.of("dept", "finance")); + @ParameterizedTest + @MethodSource + void serializeEncryptionContext_jsonSpecialChars_escapedCorrectly( + String key, String value, String expectedFragment) { + S3EncryptionConfig c = S3EncryptionConfig.sseKms(Map.of(key, value)); + String decoded = new String(Base64.getDecoder().decode(c.serializeEncryptionContext())); - assertThat(c.toString()).contains("dept"); + assertThat(decoded).contains(expectedFragment); } - @Test - void serialization_roundTrip_preservesAllFields() throws Exception { - S3EncryptionConfig original = S3EncryptionConfig.sseKms("key-id", Map.of("k", "v")); + static Stream serializeEncryptionContext_jsonSpecialChars_escapedCorrectly() { + return Stream.of( + Arguments.of("k", "val\"ue", "\"k\":\"val\\\"ue\""), + Arguments.of("k", "val\\ue", "\"k\":\"val\\\\ue\""), + Arguments.of("k\"ey", "v", "\"k\\\"ey\":\"v\"")); + } + @ParameterizedTest + @MethodSource + void serialization_roundTrip_preservesAllFields(S3EncryptionConfig config) throws Exception { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - new ObjectOutputStream(bos).writeObject(original); + new ObjectOutputStream(bos).writeObject(config); S3EncryptionConfig copy = (S3EncryptionConfig) new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())) .readObject(); - assertThat(copy.getEncryptionType()).isEqualTo(original.getEncryptionType()); - assertThat(copy.getKmsKeyId()).isEqualTo(original.getKmsKeyId()); - assertThat(copy.getEncryptionContext()).isEqualTo(original.getEncryptionContext()); + assertThat(copy.getEncryptionType()).isEqualTo(config.getEncryptionType()); + assertThat(copy.getKmsKeyId()).isEqualTo(config.getKmsKeyId()); + assertThat(copy.getEncryptionContext()).isEqualTo(config.getEncryptionContext()); + } + + static Stream serialization_roundTrip_preservesAllFields() { + return Stream.of( + Arguments.of(S3EncryptionConfig.sseKms("key-id", Map.of("k", "v"))), + Arguments.of(S3EncryptionConfig.none()), + Arguments.of(S3EncryptionConfig.sseS3()), + Arguments.of(S3EncryptionConfig.sseKms())); } } diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java index 0f114affd4231..699f85318fa3e 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java @@ -18,7 +18,6 @@ package org.apache.flink.fs.s3native; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -44,7 +43,7 @@ static Stream toIoExceptionCases() { Arguments.of( s3Exception(500, "internal error"), "put object", - "put object (HTTP 500: internal error")); + "put object (HTTP 500: internal error)")); } @ParameterizedTest @@ -57,16 +56,22 @@ void toIOException_contextAndStatusCode_formatsMessageAndPreservesCause( assertThat(result.getCause()).isSameAs(cause); } - @Test - void errorMessage_detailsHasMessage_returnsDetailsMessage() { - S3Exception e = - s3Exception( - 404, - AwsErrorDetails.builder() - .errorMessage("The specified key does not exist.") - .build()); + static Stream errorMessageExactCases() { + return Stream.of( + Arguments.of( + s3Exception( + 404, + AwsErrorDetails.builder() + .errorMessage("The specified key does not exist.") + .build()), + "The specified key does not exist."), + Arguments.of(s3ExceptionStatusOnly(500), "Unknown S3 error")); + } - assertThat(S3ExceptionUtils.errorMessage(e)).isEqualTo("The specified key does not exist."); + @ParameterizedTest + @MethodSource("errorMessageExactCases") + void errorMessage_exactReturn_matchesExpected(S3Exception e, String expected) { + assertThat(S3ExceptionUtils.errorMessage(e)).isEqualTo(expected); } static Stream errorMessageFallbackCases() { @@ -82,40 +87,30 @@ static Stream errorMessageFallbackCases() { @ParameterizedTest @MethodSource("errorMessageFallbackCases") - void errorMessage_noDetailsMessage_fallsBackToExceptionMessage( + void errorMessage_fallbackToExceptionMessage_containsExpected( S3Exception e, String expectedMessage) { assertThat(S3ExceptionUtils.errorMessage(e)).contains(expectedMessage); } - @Test - void errorMessage_noMessageAvailable_returnsUnknownS3Error() { - S3Exception e = s3ExceptionStatusOnly(500); - - assertThat(S3ExceptionUtils.errorMessage(e)).isEqualTo("Unknown S3 error"); - } - - @Test - void errorCode_detailsHasCode_returnsErrorCode() { - S3Exception e = s3Exception(404, AwsErrorDetails.builder().errorCode("NoSuchKey").build()); - - assertThat(S3ExceptionUtils.errorCode(e)).isEqualTo("NoSuchKey"); - } - - static Stream errorCodeUnknownCases() { + static Stream errorCodeAllCases() { return Stream.of( - Arguments.of(s3Exception(500, "some error")), + Arguments.of( + s3Exception(404, AwsErrorDetails.builder().errorCode("NoSuchKey").build()), + "NoSuchKey"), + Arguments.of(s3Exception(500, "some error"), "Unknown"), Arguments.of( s3Exception( 500, AwsErrorDetails.builder() .errorMessage("Something went wrong") - .build()))); + .build()), + "Unknown")); } @ParameterizedTest - @MethodSource("errorCodeUnknownCases") - void errorCode_noCodeAvailable_returnsUnknown(S3Exception e) { - assertThat(S3ExceptionUtils.errorCode(e)).isEqualTo("Unknown"); + @MethodSource("errorCodeAllCases") + void errorCode_allCases_returnsExpected(S3Exception e, String expected) { + assertThat(S3ExceptionUtils.errorCode(e)).isEqualTo(expected); } private static S3Exception s3Exception(int statusCode, AwsErrorDetails details) { From 5872ce893a4a2f9880754eda8b5f4b2ab426a82b Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 4 May 2026 12:51:59 +0200 Subject: [PATCH 06/10] Test fixes --- .../fs/s3native/S3ExceptionUtilsTest.java | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java index 699f85318fa3e..c60fd422c6e9d 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java @@ -22,6 +22,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.IOException; @@ -113,32 +114,24 @@ void errorCode_allCases_returnsExpected(S3Exception e, String expected) { assertThat(S3ExceptionUtils.errorCode(e)).isEqualTo(expected); } - private static S3Exception s3Exception(int statusCode, AwsErrorDetails details) { - S3Exception.Builder b = S3Exception.builder(); - b.statusCode(statusCode); - b.awsErrorDetails(details); - return (S3Exception) b.build(); + private static AwsServiceException s3Exception(int statusCode, AwsErrorDetails details) { + return S3Exception.builder().statusCode(statusCode).awsErrorDetails(details).build(); } - private static S3Exception s3Exception(int statusCode, String message) { - S3Exception.Builder b = S3Exception.builder(); - b.statusCode(statusCode); - b.message(message); - return (S3Exception) b.build(); + private static AwsServiceException s3Exception(int statusCode, String message) { + return S3Exception.builder().statusCode(statusCode).message(message).build(); } - private static S3Exception s3ExceptionWithMessageAndDetails( + private static AwsServiceException s3ExceptionWithMessageAndDetails( int statusCode, String message, AwsErrorDetails details) { - S3Exception.Builder b = S3Exception.builder(); - b.statusCode(statusCode); - b.message(message); - b.awsErrorDetails(details); - return (S3Exception) b.build(); + return S3Exception.builder() + .statusCode(statusCode) + .message(message) + .awsErrorDetails(details) + .build(); } - private static S3Exception s3ExceptionStatusOnly(int statusCode) { - S3Exception.Builder b = S3Exception.builder(); - b.statusCode(statusCode); - return (S3Exception) b.build(); + private static AwsServiceException s3ExceptionStatusOnly(int statusCode) { + return S3Exception.builder().statusCode(statusCode).build(); } } From b9957e00352a8cf3ff10879d52055acc5876d2bc Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 4 May 2026 14:14:41 +0200 Subject: [PATCH 07/10] Single sseKms function --- .../flink/fs/s3native/S3EncryptionConfig.java | 55 +++----------- .../fs/s3native/S3EncryptionConfigTest.java | 72 +++++++++++++------ 2 files changed, 59 insertions(+), 68 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java index 2e34d89694303..3e729905a8b05 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java @@ -99,53 +99,20 @@ public static S3EncryptionConfig sseS3() { } /** - * Creates a config for SSE-KMS encryption with the default KMS key. + * Creates a config for SSE-KMS encryption. * - *

Uses the AWS-managed KMS key (aws/s3) for the S3 bucket. - */ - public static S3EncryptionConfig sseKms() { - return new S3EncryptionConfig(EncryptionType.SSE_KMS, null); - } - - /** - * Creates a config for SSE-KMS encryption with the default KMS key and an encryption context. - * - * @param encryptionContext The encryption context key-value pairs - */ - public static S3EncryptionConfig sseKms(Map encryptionContext) { - return new S3EncryptionConfig(EncryptionType.SSE_KMS, null, encryptionContext); - } - - /** - * @param kmsKeyId The KMS key ID, ARN, or alias (e.g., "arn:aws:kms:region:account:key/key-id" - * or "alias/my-key") - */ - public static S3EncryptionConfig sseKms(String kmsKeyId) { - return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId); - } - - /** - * Creates a config for SSE-KMS encryption with a specific KMS key and encryption context. - * - *

The encryption context is a set of key-value pairs that: - * - *

    - *
  • Provides additional authenticated data (AAD) for encryption - *
  • Can be used in IAM policy conditions for fine-grained access control - *
  • Is logged in AWS CloudTrail for auditing - *
- * - *

Example: You might include context like {"department": "finance", "project": "budget"} to - * restrict which principals can encrypt/decrypt based on these values. - * - * @param kmsKeyId The KMS key ID, ARN, or alias - * @param encryptionContext The encryption context key-value pairs + * @param kmsKeyId The KMS key ID, ARN, or alias; null uses the AWS-managed default key + * @param encryptionContext Optional key-value pairs for IAM policy conditions and CloudTrail + * auditing; null or empty means no context * @see AWS * KMS Encryption Context */ public static S3EncryptionConfig sseKms( - String kmsKeyId, Map encryptionContext) { - return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId, encryptionContext); + @Nullable String kmsKeyId, @Nullable Map encryptionContext) { + return new S3EncryptionConfig( + EncryptionType.SSE_KMS, + StringUtils.isNullOrWhitespaceOnly(kmsKeyId) ? null : kmsKeyId, + encryptionContext); } /** @@ -174,9 +141,7 @@ public static S3EncryptionConfig fromConfig( return sseS3(); case "sse-kms": case "aws:kms": - return kmsKeyId != null && !kmsKeyId.isEmpty() - ? sseKms(kmsKeyId, encryptionContext) - : sseKms(encryptionContext); + return sseKms(kmsKeyId, encryptionContext); default: throw new IllegalArgumentException( "Unknown encryption type: " diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java index 0c505da476433..f0f1744f4aa24 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java @@ -54,34 +54,31 @@ void sseKms_withContext_contextStoredDefensively() { } @Test - void sseKms_nullContext_contextIsEmpty() { - S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", null); + void sseKms_nullKeyId_keyIdIsNull() { + S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, Collections.emptyMap()); - assertThat(c.getEncryptionContext()).isEmpty(); - assertThat(c.hasEncryptionContext()).isFalse(); + assertThat(c.getKmsKeyId()).isNull(); + } + + @Test + void sseKms_returnedContext_isUnmodifiable() { + S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", Map.of("dept", "finance")); + + assertThatThrownBy(() -> c.getEncryptionContext().put("x", "y")) + .isInstanceOf(UnsupportedOperationException.class); } @ParameterizedTest @MethodSource - void sseKms_contextOnlyFactory_absentContext_hasEncryptionContextFalse( - Map context) { - S3EncryptionConfig c = S3EncryptionConfig.sseKms(context); + void sseKms_contextAbsent_contextIsEmpty(Map context) { + S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", context); assertThat(c.getEncryptionContext()).isEmpty(); assertThat(c.hasEncryptionContext()).isFalse(); } - static Stream sseKms_contextOnlyFactory_absentContext_hasEncryptionContextFalse() { - return Stream.of(Arguments.of(Collections.emptyMap()), Arguments.of((Object) null)); - } - - @Test - void sseKms_contextOnlyFactory_contextMutatedAfterCreation_contextUnchanged() { - Map ctx = new HashMap<>(Map.of("dept", "finance")); - S3EncryptionConfig c = S3EncryptionConfig.sseKms(ctx); - ctx.put("extra", "value"); - - assertThat(c.getEncryptionContext()).isEqualTo(Map.of("dept", "finance")); + static Stream sseKms_contextAbsent_contextIsEmpty() { + return Stream.of(Arguments.of((Object) null), Arguments.of(Collections.emptyMap())); } @ParameterizedTest @@ -163,6 +160,14 @@ void fromConfig_sseS3WithContext_contextIgnored() { assertThat(c.getEncryptionContext()).isEmpty(); } + @Test + void fromConfig_sseS3_kmsKeyIdIgnored() { + S3EncryptionConfig c = + S3EncryptionConfig.fromConfig("sse-s3", "some-key", Collections.emptyMap()); + + assertThat(c.getKmsKeyId()).isNull(); + } + @Test void fromConfig_unknownType_throwsIllegalArgument() { assertThatThrownBy( @@ -173,11 +178,30 @@ void fromConfig_unknownType_throwsIllegalArgument() { .hasMessageContaining("invalid-type"); } + @ParameterizedTest + @MethodSource + void isEnabled_encryptionType_returnsCorrectState( + S3EncryptionConfig config, boolean expected) { + assertThat(config.isEnabled()).isEqualTo(expected); + } + + static Stream isEnabled_encryptionType_returnsCorrectState() { + return Stream.of( + Arguments.of( + S3EncryptionConfig.fromConfig(null, null, Collections.emptyMap()), false), + Arguments.of( + S3EncryptionConfig.fromConfig("sse-s3", null, Collections.emptyMap()), + true), + Arguments.of( + S3EncryptionConfig.fromConfig("sse-kms", null, Collections.emptyMap()), + true)); + } + @ParameterizedTest @MethodSource void serializeEncryptionContext_exactOutput_correctBase64Json( Map context, String expectedDecoded) { - S3EncryptionConfig c = S3EncryptionConfig.sseKms(context); + S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, context); String decoded = new String(Base64.getDecoder().decode(c.serializeEncryptionContext())); assertThat(decoded).isEqualTo(expectedDecoded); @@ -191,7 +215,7 @@ static Stream serializeEncryptionContext_exactOutput_correctBase64Jso @Test void serializeEncryptionContext_multipleEntries_allEntriesPresent() { - S3EncryptionConfig c = S3EncryptionConfig.sseKms(Map.of("k1", "v1", "k2", "v2")); + S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, Map.of("k1", "v1", "k2", "v2")); String decoded = new String(Base64.getDecoder().decode(c.serializeEncryptionContext())); assertThat(decoded).contains("\"k1\":\"v1\"", "\"k2\":\"v2\""); @@ -201,7 +225,7 @@ void serializeEncryptionContext_multipleEntries_allEntriesPresent() { @MethodSource void serializeEncryptionContext_jsonSpecialChars_escapedCorrectly( String key, String value, String expectedFragment) { - S3EncryptionConfig c = S3EncryptionConfig.sseKms(Map.of(key, value)); + S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, Map.of(key, value)); String decoded = new String(Base64.getDecoder().decode(c.serializeEncryptionContext())); assertThat(decoded).contains(expectedFragment); @@ -211,7 +235,9 @@ static Stream serializeEncryptionContext_jsonSpecialChars_escapedCorr return Stream.of( Arguments.of("k", "val\"ue", "\"k\":\"val\\\"ue\""), Arguments.of("k", "val\\ue", "\"k\":\"val\\\\ue\""), - Arguments.of("k\"ey", "v", "\"k\\\"ey\":\"v\"")); + Arguments.of("k\"ey", "v", "\"k\\\"ey\":\"v\""), + Arguments.of("k\\ey", "v", "\"k\\\\ey\":\"v\""), + Arguments.of("k", "\\\"", "\"k\":\"\\\\\\\"\"")); } @ParameterizedTest @@ -234,6 +260,6 @@ static Stream serialization_roundTrip_preservesAllFields() { Arguments.of(S3EncryptionConfig.sseKms("key-id", Map.of("k", "v"))), Arguments.of(S3EncryptionConfig.none()), Arguments.of(S3EncryptionConfig.sseS3()), - Arguments.of(S3EncryptionConfig.sseKms())); + Arguments.of(S3EncryptionConfig.sseKms(null, null))); } } From 99ba7e6490e8e5ffa82705fb08ee819b000ff772 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 4 May 2026 14:38:51 +0200 Subject: [PATCH 08/10] Make nullables explicit --- .../flink/fs/s3native/S3ClientProvider.java | 29 ++++++++++++------- .../flink/fs/s3native/S3EncryptionConfig.java | 6 ++-- .../fs/s3native/S3EncryptionConfigTest.java | 3 +- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java index 635b490eab3e5..46d089ec23ed8 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java @@ -54,6 +54,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,7 +75,7 @@ class S3ClientProvider implements AutoCloseableAsync { private final S3Client s3Client; private final S3TransferManager transferManager; private final S3EncryptionConfig encryptionConfig; - @Nullable private final AwsCredentialsProvider credentialsProvider; + private final AwsCredentialsProvider credentialsProvider; @Nullable private final StsClient stsClient; private final Duration clientCloseTimeout; private final Duration connectionTimeout; @@ -90,8 +91,8 @@ class S3ClientProvider implements AutoCloseableAsync { private S3ClientProvider( S3Client s3Client, S3TransferManager transferManager, - S3EncryptionConfig encryptionConfig, - @Nullable AwsCredentialsProvider credentialsProvider, + @Nullable S3EncryptionConfig encryptionConfig, + AwsCredentialsProvider credentialsProvider, @Nullable StsClient stsClient, Duration clientCloseTimeout, Duration connectionTimeout, @@ -102,16 +103,23 @@ private S3ClientProvider( boolean checksumValidation, int maxConnections, int maxRetries) { - this.s3Client = s3Client; - this.transferManager = transferManager; + this.s3Client = Objects.requireNonNull(s3Client, "s3Client must not be null"); + this.transferManager = + Objects.requireNonNull(transferManager, "transferManager must not be null"); this.encryptionConfig = encryptionConfig != null ? encryptionConfig : S3EncryptionConfig.none(); - this.credentialsProvider = credentialsProvider; + this.credentialsProvider = + Objects.requireNonNull(credentialsProvider, "credentialsProvider must not be null"); this.stsClient = stsClient; - this.clientCloseTimeout = clientCloseTimeout; - this.connectionTimeout = connectionTimeout; - this.socketTimeout = socketTimeout; - this.connectionMaxIdleTime = connectionMaxIdleTime; + this.clientCloseTimeout = + Objects.requireNonNull(clientCloseTimeout, "clientCloseTimeout must not be null"); + this.connectionTimeout = + Objects.requireNonNull(connectionTimeout, "connectionTimeout must not be null"); + this.socketTimeout = + Objects.requireNonNull(socketTimeout, "socketTimeout must not be null"); + this.connectionMaxIdleTime = + Objects.requireNonNull( + connectionMaxIdleTime, "connectionMaxIdleTime must not be null"); this.pathStyleAccess = pathStyleAccess; this.chunkedEncoding = chunkedEncoding; this.checksumValidation = checksumValidation; @@ -135,7 +143,6 @@ public S3EncryptionConfig getEncryptionConfig() { } @VisibleForTesting - @Nullable AwsCredentialsProvider getCredentialsProvider() { return credentialsProvider; } diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java index 3e729905a8b05..8bf8b5f921fde 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Objects; /** * Configuration for S3 server-side encryption (SSE). @@ -79,8 +80,9 @@ private S3EncryptionConfig(EncryptionType encryptionType, @Nullable String kmsKe private S3EncryptionConfig( EncryptionType encryptionType, @Nullable String kmsKeyId, - Map encryptionContext) { - this.encryptionType = encryptionType; + @Nullable Map encryptionContext) { + this.encryptionType = + Objects.requireNonNull(encryptionType, "encryptionType must not be null"); this.kmsKeyId = kmsKeyId; this.encryptionContext = encryptionContext != null diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java index f0f1744f4aa24..5bf7a04d444de 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java @@ -180,8 +180,7 @@ void fromConfig_unknownType_throwsIllegalArgument() { @ParameterizedTest @MethodSource - void isEnabled_encryptionType_returnsCorrectState( - S3EncryptionConfig config, boolean expected) { + void isEnabled_encryptionType_returnsCorrectState(S3EncryptionConfig config, boolean expected) { assertThat(config.isEnabled()).isEqualTo(expected); } From dbd4474c81af5f0a80a404590c481be33036c02a Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 4 May 2026 14:50:04 +0200 Subject: [PATCH 09/10] Use realistic encryption context examples in tests --- .../fs/s3native/S3EncryptionConfigTest.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java index 5bf7a04d444de..cfda10a14527f 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java @@ -45,11 +45,13 @@ class S3EncryptionConfigTest { @Test void sseKms_withContext_contextStoredDefensively() { - Map ctx = new HashMap<>(Map.of("dept", "finance")); + Map ctx = + new HashMap<>(Map.of("aws:s3:arn", "arn:aws:s3:::my-bucket/my-file")); S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", ctx); ctx.put("extra", "value"); - assertThat(c.getEncryptionContext()).isEqualTo(Map.of("dept", "finance")); + assertThat(c.getEncryptionContext()) + .isEqualTo(Map.of("aws:s3:arn", "arn:aws:s3:::my-bucket/my-file")); assertThat(c.hasEncryptionContext()).isTrue(); } @@ -62,7 +64,9 @@ void sseKms_nullKeyId_keyIdIsNull() { @Test void sseKms_returnedContext_isUnmodifiable() { - S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", Map.of("dept", "finance")); + S3EncryptionConfig c = + S3EncryptionConfig.sseKms( + "key-id", Map.of("aws:s3:arn", "arn:aws:s3:::my-bucket/my-file")); assertThatThrownBy(() -> c.getEncryptionContext().put("x", "y")) .isInstanceOf(UnsupportedOperationException.class); @@ -124,10 +128,14 @@ static Stream fromConfig_typeVariants_returnExpectedType() { @Test void fromConfig_sseKmsWithKeyAndContext_keyAndContextPreserved() { S3EncryptionConfig result = - S3EncryptionConfig.fromConfig("sse-kms", "my-key", Map.of("env", "prod")); + S3EncryptionConfig.fromConfig( + "sse-kms", + "my-key", + Map.of("aws:s3:arn", "arn:aws:s3:::my-bucket/my-file")); assertThat(result.getKmsKeyId()).isEqualTo("my-key"); - assertThat(result.getEncryptionContext()).isEqualTo(Map.of("env", "prod")); + assertThat(result.getEncryptionContext()) + .isEqualTo(Map.of("aws:s3:arn", "arn:aws:s3:::my-bucket/my-file")); } @ParameterizedTest @@ -146,7 +154,10 @@ static Stream fromConfig_sseKmsWithNoKeyId_keyIdIsNull() { @Test void fromConfig_sseKmsDefaultKeyWithContext_contextPreserved() { assertThat( - S3EncryptionConfig.fromConfig("sse-kms", null, Map.of("env", "prod")) + S3EncryptionConfig.fromConfig( + "sse-kms", + null, + Map.of("aws:s3:arn", "arn:aws:s3:::my-bucket/my-file")) .hasEncryptionContext()) .isTrue(); } @@ -154,7 +165,8 @@ void fromConfig_sseKmsDefaultKeyWithContext_contextPreserved() { @Test void fromConfig_sseS3WithContext_contextIgnored() { S3EncryptionConfig c = - S3EncryptionConfig.fromConfig("sse-s3", null, Map.of("dept", "finance")); + S3EncryptionConfig.fromConfig( + "sse-s3", null, Map.of("aws:s3:arn", "arn:aws:s3:::my-bucket/my-file")); assertThat(c.getEncryptionType()).isEqualTo(SSE_S3); assertThat(c.getEncryptionContext()).isEmpty(); From dfbdb4062d0866481851b72f75a94d56ed9e2372 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 4 May 2026 18:02:36 +0200 Subject: [PATCH 10/10] getBytes fix --- .../java/org/apache/flink/fs/s3native/S3EncryptionConfig.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java index 8bf8b5f921fde..deae79113d8c3 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Collections; import java.util.HashMap; @@ -215,7 +216,7 @@ public String serializeEncryptionContext() { first = false; } json.append("}"); - return Base64.getEncoder().encodeToString(json.toString().getBytes()); + return Base64.getEncoder().encodeToString(json.toString().getBytes(StandardCharsets.UTF_8)); } private String escapeJson(String value) {