From 3b2083c7d942f4cce066465a6b64afbb9aaebecf Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 23 Apr 2026 15:15:41 +0200 Subject: [PATCH] [FLINK-39530][s3] Replace S3 compatible auto-detection with explicit chunked-encoding and checksum-validation config options --- .../s3native/NativeS3FileSystemFactory.java | 28 +- .../flink/fs/s3native/S3ClientProvider.java | 87 ++- .../NativeS3FileSystemFactoryTest.java | 686 ++++++------------ 3 files changed, 330 insertions(+), 471 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java index 81d92dc762bea..e8cc953fc1e2a 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java @@ -88,7 +88,27 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { .booleanType() .defaultValue(false) .withFallbackKeys("s3.path.style.access") - .withDescription("Use path-style access for S3 (for S3-compatible storage)"); + .withDescription( + "Use path-style access for S3. Required for most S3-compatible servers " + + "(e.g. MinIO, Ceph) which do not support virtual-hosted style."); + + public static final ConfigOption CHUNKED_ENCODING_ENABLED = + ConfigOptions.key("s3.chunked-encoding.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Enable AWS chunked transfer encoding for S3 requests. " + + "Most S3-compatible servers do not support this AWS-specific " + + "extension and require it to be disabled."); + + public static final ConfigOption CHECKSUM_VALIDATION_ENABLED = + ConfigOptions.key("s3.checksum-validation.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Enable checksum validation for S3 requests. " + + "Most S3-compatible servers do not support AWS checksum " + + "headers and require it to be disabled."); public static final ConfigOption PART_UPLOAD_MIN_SIZE = ConfigOptions.key("s3.upload.min.part.size") @@ -295,10 +315,6 @@ public FileSystem create(URI fsUri) throws IOException { String endpoint = config.get(ENDPOINT); boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS); - if (endpoint != null && !pathStyleAccess) { - pathStyleAccess = true; - } - S3EncryptionConfig encryptionConfig = S3EncryptionConfig.fromConfig(config.get(SSE_TYPE), config.get(SSE_KMS_KEY_ID)); String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION); @@ -371,6 +387,8 @@ public FileSystem create(URI fsUri) throws IOException { .region(region) .endpoint(endpoint) .pathStyleAccess(pathStyleAccess) + .chunkedEncoding(config.get(CHUNKED_ENCODING_ENABLED)) + .checksumValidation(config.get(CHECKSUM_VALIDATION_ENABLED)) .maxConnections(maxConnections) .connectionTimeout(config.get(CONNECTION_TIMEOUT)) .socketTimeout(config.get(SOCKET_TIMEOUT)) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java index daccd6247cdb8..656a3f567dae4 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java @@ -80,6 +80,11 @@ class S3ClientProvider implements AutoCloseableAsync { private final Duration connectionTimeout; private final Duration socketTimeout; private final Duration connectionMaxIdleTime; + private final boolean pathStyleAccess; + private final boolean chunkedEncoding; + private final boolean checksumValidation; + private final int maxConnections; + private final int maxRetries; private final AtomicBoolean closed = new AtomicBoolean(false); private S3ClientProvider( @@ -91,7 +96,12 @@ private S3ClientProvider( Duration clientCloseTimeout, Duration connectionTimeout, Duration socketTimeout, - Duration connectionMaxIdleTime) { + Duration connectionMaxIdleTime, + boolean pathStyleAccess, + boolean chunkedEncoding, + boolean checksumValidation, + int maxConnections, + int maxRetries) { this.s3Client = s3Client; this.transferManager = transferManager; this.encryptionConfig = @@ -102,6 +112,11 @@ private S3ClientProvider( this.connectionTimeout = connectionTimeout; this.socketTimeout = socketTimeout; this.connectionMaxIdleTime = connectionMaxIdleTime; + this.pathStyleAccess = pathStyleAccess; + this.chunkedEncoding = chunkedEncoding; + this.checksumValidation = checksumValidation; + this.maxConnections = maxConnections; + this.maxRetries = maxRetries; } public S3Client getS3Client() { @@ -144,6 +159,31 @@ Duration getConnectionMaxIdleTime() { return connectionMaxIdleTime; } + @VisibleForTesting + boolean isPathStyleAccess() { + return pathStyleAccess; + } + + @VisibleForTesting + boolean isChunkedEncoding() { + return chunkedEncoding; + } + + @VisibleForTesting + boolean isChecksumValidation() { + return checksumValidation; + } + + @VisibleForTesting + int getMaxConnections() { + return maxConnections; + } + + @VisibleForTesting + int getMaxRetries() { + return maxRetries; + } + @Override public CompletableFuture closeAsync() { if (!closed.compareAndSet(false, true)) { @@ -204,11 +244,12 @@ public static class Builder { private String region; private String endpoint; private boolean pathStyleAccess = false; + private boolean chunkedEncoding = true; + private boolean checksumValidation = true; private int maxConnections = 50; private Duration connectionTimeout = Duration.ofSeconds(60); private Duration socketTimeout = Duration.ofSeconds(60); private Duration connectionMaxIdleTime = Duration.ofSeconds(60); - private boolean disableCertCheck = false; private int maxRetries = 3; private Duration clientCloseTimeout = Duration.ofSeconds(30); @@ -249,6 +290,16 @@ public Builder pathStyleAccess(boolean pathStyleAccess) { return this; } + public Builder chunkedEncoding(boolean chunkedEncoding) { + this.chunkedEncoding = chunkedEncoding; + return this; + } + + public Builder checksumValidation(boolean checksumValidation) { + this.checksumValidation = checksumValidation; + return this; + } + public Builder maxConnections(int maxConnections) { this.maxConnections = maxConnections; return this; @@ -274,11 +325,6 @@ public Builder clientCloseTimeout(Duration clientCloseTimeout) { return this; } - public Builder disableCertCheck(boolean disableCertCheck) { - this.disableCertCheck = disableCertCheck; - return this; - } - public Builder maxRetries(int maxRetries) { this.maxRetries = maxRetries; return this; @@ -326,14 +372,6 @@ public S3ClientProvider build() { } URI endpointUri = (endpoint != null) ? URI.create(endpoint) : null; - boolean isS3Compatible = endpointUri != null; - - if (isS3Compatible && !pathStyleAccess) { - pathStyleAccess = true; - } - if (isS3Compatible && "http".equalsIgnoreCase(endpointUri.getScheme())) { - disableCertCheck = true; - } Region awsRegion = resolveRegion(region); StsClient stsClient = null; @@ -347,12 +385,12 @@ public S3ClientProvider build() { credentialsProvider = baseProvider; } - S3Configuration.Builder s3ConfigBuilder = - S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccess); - if (isS3Compatible) { - s3ConfigBuilder.chunkedEncodingEnabled(false).checksumValidationEnabled(false); - } - S3Configuration s3Config = s3ConfigBuilder.build(); + S3Configuration s3Config = + S3Configuration.builder() + .pathStyleAccessEnabled(pathStyleAccess) + .chunkedEncodingEnabled(chunkedEncoding) + .checksumValidationEnabled(checksumValidation) + .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() @@ -406,7 +444,12 @@ public S3ClientProvider build() { clientCloseTimeout, connectionTimeout, socketTimeout, - connectionMaxIdleTime); + connectionMaxIdleTime, + pathStyleAccess, + chunkedEncoding, + checksumValidation, + maxConnections, + maxRetries); } private AwsCredentialsProvider buildBaseCredentialsProvider() { diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java index f9e57b55858be..21d863012335d 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.FileSystem; import org.junit.jupiter.api.Test; @@ -33,565 +32,364 @@ /** Tests for {@link NativeS3FileSystemFactory}. */ class NativeS3FileSystemFactoryTest { - @Test - void testSchemeReturnsS3() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - assertThat(factory.getScheme()).isEqualTo("s3"); - } - - @Test - void testConfigureAcceptsConfiguration() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-key"); - config.setString("s3.secret-key", "test-secret"); - - // Should not throw - factory.configure(config); - } - - @Test - void testCreateFileSystemWithMinimalConfiguration() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); + private static Configuration baseConfig() { Configuration config = new Configuration(); config.setString("s3.access-key", "test-access-key"); config.setString("s3.secret-key", "test-secret-key"); config.setString("s3.region", "us-east-1"); config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = new URI("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); - - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); - assertThat(fs.getUri()).isEqualTo(fsUri); + return config; } - @Test - void testCreateFileSystemWithCustomEndpoint() throws Exception { + private static NativeS3FileSystem createFs(Configuration config) throws Exception { NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.endpoint", "http://localhost:9000"); - config.setString("s3.region", "us-east-1"); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - factory.configure(config); - - URI fsUri = new URI("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); - - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); + return (NativeS3FileSystem) factory.create(URI.create("s3://test-bucket/")); } - @Test - void testPartSizeTooSmallThrowsException() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 1024L); // Too small (< 5MB) - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - + private static NativeS3FileSystem createS3aFs(Configuration config) throws Exception { + NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory(); factory.configure(config); - - URI fsUri = URI.create("s3://test-bucket/"); - assertThatThrownBy(() -> factory.create(fsUri)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("must be at least"); + return (NativeS3FileSystem) factory.create(URI.create("s3a://test-bucket/")); } @Test - void testPartSizeTooLargeThrowsException() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.set( - NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 6L * 1024 * 1024 * 1024); // > 5GB - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3://test-bucket/"); - assertThatThrownBy(() -> factory.create(fsUri)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("must not exceed 5GB"); + void testSchemeReturnsS3() { + assertThat(new NativeS3FileSystemFactory().getScheme()).isEqualTo("s3"); } @Test - void testInvalidMaxConcurrentUploadsThrowsException() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.set(NativeS3FileSystemFactory.MAX_CONCURRENT_UPLOADS, 0); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3://test-bucket/"); - assertThatThrownBy(() -> factory.create(fsUri)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("must be positive"); + void testS3ASchemeReturnsS3A() { + assertThat(new NativeS3AFileSystemFactory().getScheme()).isEqualTo("s3a"); } @Test - void testInvalidEntropyKeyThrowsException() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.setString("s3.entropy.key", "__INVALID#KEY__"); // Contains # - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3://test-bucket/"); - assertThatThrownBy(() -> factory.create(fsUri)) - .isInstanceOf(IllegalConfigurationException.class) - .hasMessageContaining("Invalid character"); + void testCreateFileSystemWithMinimalConfiguration() throws Exception { + NativeS3FileSystem fs = createFs(baseConfig()); + assertThat(fs.getUri()).isEqualTo(URI.create("s3://test-bucket/")); } @Test - void testInvalidEntropyLengthThrowsException() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.setString("s3.entropy.key", "__ENTROPY__"); - config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 0); // Invalid - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); + void testCustomEndpointDoesNotTriggerS3CompatibleMode() throws Exception { + Configuration config = baseConfig(); + config.setString("s3.endpoint", "http://localhost:9000"); + S3ClientProvider provider = createFs(config).getClientProvider(); + assertThat(provider.isPathStyleAccess()).isFalse(); + assertThat(provider.isChunkedEncoding()).isTrue(); + assertThat(provider.isChecksumValidation()).isTrue(); + } - URI fsUri = URI.create("s3://test-bucket/"); - assertThatThrownBy(() -> factory.create(fsUri)) - .isInstanceOf(IllegalConfigurationException.class) - .hasMessageContaining("must be > 0"); + @Test + void testS3CompatibleServerExplicitConfiguration() throws Exception { + Configuration config = baseConfig(); + config.setString("s3.endpoint", "http://localhost:9000"); + config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true); + config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false); + config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED, false); + S3ClientProvider provider = createFs(config).getClientProvider(); + assertThat(provider.isPathStyleAccess()).isTrue(); + assertThat(provider.isChunkedEncoding()).isFalse(); + assertThat(provider.isChecksumValidation()).isFalse(); } @Test - void testEntropyInjectionWithValidConfiguration() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.setString("s3.entropy.key", "__ENTROPY__"); - config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 4); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); + void testS3ACreateFileSystemWithMinimalConfiguration() throws Exception { + NativeS3FileSystem fs = createS3aFs(baseConfig()); + assertThat(fs.getUri()).isEqualTo(URI.create("s3a://test-bucket/")); + } - factory.configure(config); + @Test + void testS3ACreateFileSystemWithCustomEndpoint() throws Exception { + Configuration config = baseConfig(); + config.setString("s3.endpoint", "http://localhost:9000"); + assertThat(createS3aFs(config)).isNotNull(); + } - URI fsUri = URI.create("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); + // --- Path-style access --- - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); + @Test + void testPathStyleAccessDefaultIsFalse() throws Exception { + assertThat(createFs(baseConfig()).getClientProvider().isPathStyleAccess()).isFalse(); } @Test - void testPathStyleAccessAutoEnabledForCustomEndpoint() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.endpoint", "http://minio:9000"); - config.setString("s3.region", "us-east-1"); - config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, false); // Explicitly set to false - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); + void testPathStyleAccessExplicitlyEnabled() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true); + assertThat(createFs(config).getClientProvider().isPathStyleAccess()).isTrue(); + } - URI fsUri = URI.create("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); + // --- Chunked encoding --- - // Should succeed - path-style access is auto-enabled - assertThat(fs).isNotNull(); + @Test + void testChunkedEncodingDefaultIsTrue() throws Exception { + assertThat(createFs(baseConfig()).getClientProvider().isChunkedEncoding()).isTrue(); } @Test - void testBulkCopyConfiguration() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true); - config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); + void testChunkedEncodingExplicitlyDisabled() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false); + assertThat(createFs(config).getClientProvider().isChunkedEncoding()).isFalse(); + } - URI fsUri = URI.create("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); + // --- Checksum validation --- - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); + @Test + void testChecksumValidationDefaultIsTrue() throws Exception { + assertThat(createFs(baseConfig()).getClientProvider().isChecksumValidation()).isTrue(); } @Test - void testExplicitRegionConfiguration() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "eu-west-1"); // Explicit non-default region - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); + void testChecksumValidationExplicitlyDisabled() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED, false); + assertThat(createFs(config).getClientProvider().isChecksumValidation()).isFalse(); + } - URI fsUri = URI.create("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); + // --- Max connections --- - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); + @Test + void testMaxConnectionsDefault() throws Exception { + assertThat(createFs(baseConfig()).getClientProvider().getMaxConnections()).isEqualTo(50); } @Test - void testExplicitRegionTakesPriorityOverAutodiscovery() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "ap-southeast-1"); // Explicit region - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); + void testMaxConnectionsExplicitlyConfigured() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 100); + assertThat(createFs(config).getClientProvider().getMaxConnections()).isEqualTo(100); + } - factory.configure(config); + @Test + void testInvalidMaxConnectionsThrowsException() { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0); + assertThatThrownBy(() -> createFs(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("s3.connection.max") + .hasMessageContaining("must be a positive integer"); + } - URI fsUri = URI.create("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); + // --- Max retries --- - // Should succeed with explicit region regardless of environment - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); + @Test + void testMaxRetriesDefault() throws Exception { + assertThat(createFs(baseConfig()).getClientProvider().getMaxRetries()).isEqualTo(3); } @Test - void testRegionAutodiscoveryWithoutExplicitConfig() throws Exception { - // This test verifies that region autodiscovery works when no explicit region is set. - // The test will either succeed (if AWS region can be auto-detected from environment) - // or fail with a helpful error message. - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - // Intentionally not setting s3.region to test autodiscovery - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); + void testMaxRetriesExplicitlyConfigured() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.MAX_RETRIES, 5); + assertThat(createFs(config).getClientProvider().getMaxRetries()).isEqualTo(5); + } - factory.configure(config); + // --- Timeouts --- - URI fsUri = URI.create("s3://test-bucket/"); + @Test + void testCustomTimeoutConfiguration() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.CONNECTION_TIMEOUT, Duration.ofSeconds(30)); + config.set(NativeS3FileSystemFactory.SOCKET_TIMEOUT, Duration.ofSeconds(45)); + config.set(NativeS3FileSystemFactory.CONNECTION_MAX_IDLE_TIME, Duration.ofMinutes(2)); + config.set(NativeS3FileSystemFactory.FS_CLOSE_TIMEOUT, Duration.ofSeconds(90)); + config.set(NativeS3FileSystemFactory.CLIENT_CLOSE_TIMEOUT, Duration.ofSeconds(15)); - try { - FileSystem fs = factory.create(fsUri); - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); - } catch (IllegalArgumentException e) { - // If no region can be auto-detected, verify the error message is helpful - assertThat(e.getMessage()).contains("AWS region could not be determined"); - assertThat(e.getMessage()).contains("s3.region"); - assertThat(e.getMessage()).contains("AWS_REGION"); - } + NativeS3FileSystem fs = createFs(config); + assertThat(fs.getFsCloseTimeout()).isEqualTo(Duration.ofSeconds(90)); + + S3ClientProvider clientProvider = fs.getClientProvider(); + assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30)); + assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofSeconds(45)); + assertThat(clientProvider.getConnectionMaxIdleTime()).isEqualTo(Duration.ofMinutes(2)); + assertThat(clientProvider.getClientCloseTimeout()).isEqualTo(Duration.ofSeconds(15)); } @Test - void testRegionAutodiscoveryWithCustomEndpoint() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.endpoint", "http://localhost:9000"); - // Intentionally not setting s3.region with custom endpoint - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); + void testTimeoutConfigurationWithStringDuration() throws Exception { + Configuration config = baseConfig(); + config.setString("s3.connection.timeout", "30 s"); + config.setString("s3.socket.timeout", "2 min"); + config.setString("s3.close.timeout", "1 min"); - URI fsUri = URI.create("s3://test-bucket/"); + NativeS3FileSystem fs = createFs(config); + assertThat(fs.getFsCloseTimeout()).isEqualTo(Duration.ofMinutes(1)); - try { - FileSystem fs = factory.create(fsUri); - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage()).contains("AWS region could not be determined"); - assertThat(e.getMessage()).contains("AWS_REGION"); - assertThat(e.getMessage()).contains("~/.aws/config"); - } + S3ClientProvider clientProvider = fs.getClientProvider(); + assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30)); + assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofMinutes(2)); } @Test - void testEmptyRegionFallsBackToAutodiscovery() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", ""); // Empty region should trigger autodiscovery - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); + void testInvalidTimeoutConfigurationThrowsException() { + Configuration config = baseConfig(); + config.setString("s3.connection.timeout", "not-a-duration"); + assertThatThrownBy(() -> createFs(config)).isInstanceOf(IllegalArgumentException.class); + } - URI fsUri = URI.create("s3://test-bucket/"); + // --- Part upload size --- - try { - FileSystem fs = factory.create(fsUri); - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); - } catch (IllegalArgumentException e) { - // If no region can be auto-detected, verify the error message is helpful - assertThat(e.getMessage()).contains("AWS region could not be determined"); - } + @Test + void testPartSizeTooSmallThrowsException() { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 1024L); + assertThatThrownBy(() -> createFs(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be at least"); } @Test - void testInvalidMaxConnectionsThrowsException() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); + void testPartSizeTooLargeThrowsException() { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 6L * 1024 * 1024 * 1024); + assertThatThrownBy(() -> createFs(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not exceed 5GB"); + } - factory.configure(config); + // --- Max concurrent uploads --- - URI fsUri = URI.create("s3://test-bucket/"); - assertThatThrownBy(() -> factory.create(fsUri)) + @Test + void testInvalidMaxConcurrentUploadsThrowsException() { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.MAX_CONCURRENT_UPLOADS, 0); + assertThatThrownBy(() -> createFs(config)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("s3.connection.max") - .hasMessageContaining("must be a positive integer"); + .hasMessageContaining("must be positive"); } + // --- Entropy injection --- + @Test - void testInvalidBulkCopyMaxConcurrentThrowsException() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); + void testInvalidEntropyKeyThrowsException() { + Configuration config = baseConfig(); + config.setString("s3.entropy.key", "__INVALID#KEY__"); + assertThatThrownBy(() -> createFs(config)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("Invalid character"); + } - factory.configure(config); + @Test + void testInvalidEntropyLengthThrowsException() { + Configuration config = baseConfig(); + config.setString("s3.entropy.key", "__ENTROPY__"); + config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 0); + assertThatThrownBy(() -> createFs(config)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("must be > 0"); + } - URI fsUri = URI.create("s3://test-bucket/"); - assertThatThrownBy(() -> factory.create(fsUri)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("s3.bulk-copy.max-concurrent") - .hasMessageContaining("must be a positive integer"); + @Test + void testEntropyInjectionWithValidConfiguration() throws Exception { + Configuration config = baseConfig(); + config.setString("s3.entropy.key", "__ENTROPY__"); + config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 4); + assertThat(createFs(config)).isNotNull(); } + // --- Bulk copy --- + @Test void testBulkCopyMaxConcurrentClampedToMaxConnections() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); + Configuration config = baseConfig(); config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true); config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32); config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 10); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - URI fsUri = URI.create("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); - - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); - NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; - assertThat(nativeFs.getBulkCopyHelper()).isNotNull(); - assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10); + NativeS3FileSystem fs = createFs(config); + assertThat(fs.getBulkCopyHelper()).isNotNull(); + assertThat(fs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10); } @Test void testBulkCopyMaxConcurrentPreservedWithinMaxConnections() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); + Configuration config = baseConfig(); config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true); config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 10); config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 50); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - URI fsUri = URI.create("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); - - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); - NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; - assertThat(nativeFs.getBulkCopyHelper()).isNotNull(); - assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10); + NativeS3FileSystem fs = createFs(config); + assertThat(fs.getBulkCopyHelper()).isNotNull(); + assertThat(fs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10); } @Test - void testS3ASchemeReturnsS3A() { - NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory(); - assertThat(factory.getScheme()).isEqualTo("s3a"); + void testInvalidBulkCopyMaxConcurrentThrowsException() { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0); + assertThatThrownBy(() -> createFs(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("s3.bulk-copy.max-concurrent") + .hasMessageContaining("must be a positive integer"); } - @Test - void testS3ACreateFileSystemWithMinimalConfiguration() throws Exception { - NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3a://test-bucket/"); - FileSystem fs = factory.create(fsUri); - - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); - } + // --- Region --- @Test - void testS3ACreateFileSystemWithCustomEndpoint() throws Exception { - NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.endpoint", "http://localhost:9000"); - config.setString("s3.region", "us-east-1"); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3a://test-bucket/path/to/file"); - FileSystem fs = factory.create(fsUri); - - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); - } - - @Test - void testS3AInheritsAllS3Configuration() throws Exception { - NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); + void testExplicitRegionConfiguration() throws Exception { + Configuration config = baseConfig(); config.setString("s3.region", "eu-west-1"); - config.setString("s3.entropy.key", "__ENTROPY__"); - config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 6); - config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true); - config.set(NativeS3FileSystemFactory.USE_ASYNC_OPERATIONS, true); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3a://test-bucket/"); - FileSystem fs = factory.create(fsUri); - - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); + assertThat(createFs(config)).isNotNull(); } @Test - void testS3AWithSSEConfiguration() throws Exception { - NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.setString("s3.sse.type", "sse-s3"); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3a://test-bucket/"); - FileSystem fs = factory.create(fsUri); - - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); + void testRegionAutodiscoveryWithoutExplicitConfig() throws Exception { + Configuration config = baseConfig(); + config.removeConfig(NativeS3FileSystemFactory.REGION); + try { + assertThat(createFs(config)).isNotNull(); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage()).contains("AWS region could not be determined"); + assertThat(e.getMessage()).contains("s3.region"); + assertThat(e.getMessage()).contains("AWS_REGION"); + } } @Test - void testCustomTimeoutConfiguration() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.set(NativeS3FileSystemFactory.CONNECTION_TIMEOUT, Duration.ofSeconds(30)); - config.set(NativeS3FileSystemFactory.SOCKET_TIMEOUT, Duration.ofSeconds(45)); - config.set(NativeS3FileSystemFactory.CONNECTION_MAX_IDLE_TIME, Duration.ofMinutes(2)); - config.set(NativeS3FileSystemFactory.FS_CLOSE_TIMEOUT, Duration.ofSeconds(90)); - config.set(NativeS3FileSystemFactory.CLIENT_CLOSE_TIMEOUT, Duration.ofSeconds(15)); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); - - assertThat(fs).isNotNull(); - assertThat(fs).isInstanceOf(NativeS3FileSystem.class); - NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; - assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofSeconds(90)); - - S3ClientProvider clientProvider = nativeFs.getClientProvider(); - assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30)); - assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofSeconds(45)); - assertThat(clientProvider.getConnectionMaxIdleTime()).isEqualTo(Duration.ofMinutes(2)); - assertThat(clientProvider.getClientCloseTimeout()).isEqualTo(Duration.ofSeconds(15)); + void testRegionAutodiscoveryWithCustomEndpoint() throws Exception { + Configuration config = baseConfig(); + config.setString("s3.endpoint", "http://localhost:9000"); + config.removeConfig(NativeS3FileSystemFactory.REGION); + try { + assertThat(createFs(config)).isNotNull(); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage()).contains("AWS region could not be determined"); + assertThat(e.getMessage()).contains("AWS_REGION"); + assertThat(e.getMessage()).contains("~/.aws/config"); + } } @Test - void testTimeoutConfigurationWithStringDuration() throws Exception { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.setString("s3.connection.timeout", "30 s"); - config.setString("s3.socket.timeout", "2 min"); - config.setString("s3.close.timeout", "1 min"); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3://test-bucket/"); - FileSystem fs = factory.create(fsUri); + void testEmptyRegionFallsBackToAutodiscovery() throws Exception { + Configuration config = baseConfig(); + config.setString("s3.region", ""); + try { + assertThat(createFs(config)).isNotNull(); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage()).contains("AWS region could not be determined"); + } + } - assertThat(fs).isNotNull(); - NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; - assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofMinutes(1)); + // --- s3a scheme --- - S3ClientProvider clientProvider = nativeFs.getClientProvider(); - assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30)); - assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofMinutes(2)); + @Test + void testS3AWithSSEConfiguration() throws Exception { + Configuration config = baseConfig(); + config.setString("s3.sse.type", "sse-s3"); + assertThat(createS3aFs(config)).isNotNull(); } @Test - void testInvalidTimeoutConfigurationThrowsException() { - NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); - Configuration config = new Configuration(); - config.setString("s3.access-key", "test-access-key"); - config.setString("s3.secret-key", "test-secret-key"); - config.setString("s3.region", "us-east-1"); - config.setString("s3.connection.timeout", "not-a-duration"); - config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); - - factory.configure(config); - - URI fsUri = URI.create("s3://test-bucket/"); - assertThatThrownBy(() -> factory.create(fsUri)) - .isInstanceOf(IllegalArgumentException.class); + void testS3AInheritsS3Configuration() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true); + config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false); + config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED, false); + + NativeS3FileSystem fs = createS3aFs(config); + assertThat(fs.getClientProvider().isPathStyleAccess()).isTrue(); + assertThat(fs.getClientProvider().isChunkedEncoding()).isFalse(); + assertThat(fs.getClientProvider().isChecksumValidation()).isFalse(); } }