From b0ea378c64d5c0d787029c0330e3acc022d93f51 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Sun, 19 Apr 2026 21:08:02 +0530 Subject: [PATCH 1/3] [FLINK-39482][filesystem] Support configurable maxConnections in S3ClientProvider --- .../fs/s3native/NativeS3BulkCopyHelper.java | 99 +++++++++++++++++-- .../flink/fs/s3native/NativeS3FileSystem.java | 6 ++ .../s3native/NativeS3FileSystemFactory.java | 15 ++- .../flink/fs/s3native/S3ClientProvider.java | 4 +- .../s3native/NativeS3BulkCopyHelperTest.java | 45 ++++++++- .../NativeS3FileSystemFactoryTest.java | 81 +++++++++++++++ 6 files changed, 237 insertions(+), 13 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java index 2dc8fb10d19b8..f57368ab3f532 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java @@ -19,6 +19,7 @@ package org.apache.flink.fs.s3native; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.ICloseableRegistry; import org.apache.flink.core.fs.PathsCopyingFileSystem; @@ -37,15 +38,18 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * Helper class for performing bulk S3 to local file system copies using S3TransferManager. * *

Concurrency Model: Uses batch-based concurrency control with {@code - * maxConcurrentCopies} to limit parallel downloads. The current implementation waits for each batch - * to complete before starting the next batch. A future enhancement could use a bounded thread pool - * (e.g., {@link java.util.concurrent.Semaphore} or bounded executor) to allow continuous submission - * of new downloads as slots become available, which would provide better throughput by avoiding the - * "slowest task in batch" bottleneck. + * maxConcurrentCopies} to limit parallel downloads. The effective concurrency is clamped to the + * HTTP connection pool size ({@code maxConnections}) to prevent connection pool exhaustion. The + * current implementation waits for each batch to complete before starting the next batch. A future + * enhancement could use a bounded thread pool (e.g., {@link java.util.concurrent.Semaphore} or + * bounded executor) to allow continuous submission of new downloads as slots become available, + * which would provide better throughput by avoiding the "slowest task in batch" bottleneck. * *

Retry Handling: Relies on the S3TransferManager's built-in retry mechanism for * transient failures. If a download fails after retries: @@ -70,10 +74,37 @@ class NativeS3BulkCopyHelper { private final S3TransferManager transferManager; private final int maxConcurrentCopies; + private final int maxConnections; - public NativeS3BulkCopyHelper(S3TransferManager transferManager, int maxConcurrentCopies) { + /** + * Creates a new bulk copy helper. + * + * @param transferManager the S3 transfer manager for async downloads + * @param maxConcurrentCopies the requested maximum number of concurrent copy operations + * @param maxConnections the HTTP connection pool size; if {@code maxConcurrentCopies} exceeds + * this value, it is clamped down to prevent connection pool exhaustion + */ + NativeS3BulkCopyHelper( + S3TransferManager transferManager, int maxConcurrentCopies, int maxConnections) { + checkArgument(maxConcurrentCopies > 0, "maxConcurrentCopies must be positive"); + checkArgument(maxConnections > 0, "maxConnections must be positive"); this.transferManager = transferManager; - this.maxConcurrentCopies = maxConcurrentCopies; + this.maxConnections = maxConnections; + if (maxConcurrentCopies > maxConnections) { + LOG.warn( + "s3.bulk-copy.max-concurrent ({}) exceeds s3.connection.max ({}). " + + "Clamping concurrent copies to {} to prevent connection pool exhaustion.", + maxConcurrentCopies, + maxConnections, + maxConnections); + this.maxConcurrentCopies = maxConnections; + } else { + this.maxConcurrentCopies = maxConcurrentCopies; + } + } + + int getEffectiveMaxConcurrentCopies() { + return maxConcurrentCopies; } /** @@ -97,9 +128,17 @@ public void copyFiles( return; } - LOG.info("Starting bulk copy of {} files using S3TransferManager", requests.size()); + int totalFiles = requests.size(); + int totalBatches = (totalFiles + maxConcurrentCopies - 1) / maxConcurrentCopies; + LOG.info( + "Starting bulk copy of {} files using S3TransferManager " + + "(batch size: {}, total batches: {})", + totalFiles, + maxConcurrentCopies, + totalBatches); List> copyFutures = new ArrayList<>(); + int batchNumber = 0; try { for (int i = 0; i < requests.size(); i++) { @@ -113,12 +152,18 @@ public void copyFiles( } if (copyFutures.size() >= maxConcurrentCopies || i == requests.size() - 1) { + batchNumber++; + LOG.debug( + "Waiting for batch {}/{} ({} files)", + batchNumber, + totalBatches, + copyFutures.size()); waitForCopies(copyFutures); copyFutures.clear(); } } - LOG.info("Completed bulk copy of {} files", requests.size()); + LOG.info("Completed bulk copy of {} files", totalFiles); } catch (Exception e) { if (!copyFutures.isEmpty()) { LOG.warn( @@ -181,8 +226,42 @@ private void waitForCopies(List> futures) throw Thread.currentThread().interrupt(); throw new IOException("Bulk copy interrupted", e); } catch (ExecutionException e) { - throw new IOException("Bulk copy failed", e.getCause()); + Throwable cause = e.getCause(); + if (isConnectionPoolExhausted(cause)) { + throw new IOException( + String.format( + "S3 connection pool exhausted during bulk copy. " + + "The configured connection pool size (%d) could not serve " + + "the concurrent download requests (%d). " + + "Consider reducing '%s' or increasing '%s'.", + maxConnections, + maxConcurrentCopies, + "s3.bulk-copy.max-concurrent", + "s3.connection.max"), + cause); + } + throw new IOException("Bulk copy failed", cause); + } + } + + /** + * Checks whether a failure was caused by HTTP connection pool exhaustion. + * + *

Walks the causal chain looking for the SDK's characteristic message about connection + * acquire timeouts. This detection is deliberately broad (substring match on the message) to + * remain resilient to minor SDK wording changes across versions. + */ + @VisibleForTesting + static boolean isConnectionPoolExhausted(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + String message = current.getMessage(); + if (message != null && message.contains("Acquire operation took longer than")) { + return true; + } + current = current.getCause(); } + return false; } /** diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java index 40e88d8ae5a13..64795cc8c76fe 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java @@ -170,6 +170,12 @@ S3ClientProvider getClientProvider() { return clientProvider; } + @VisibleForTesting + @Nullable + NativeS3BulkCopyHelper getBulkCopyHelper() { + return bulkCopyHelper; + } + @Override public URI getUri() { return uri; diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java index 60b151035cbb0..5a4b0b6b865d5 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java @@ -122,6 +122,15 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { .defaultValue(true) .withDescription("Enable bulk copy operations using S3TransferManager"); + public static final ConfigOption MAX_CONNECTIONS = + ConfigOptions.key("s3.connection.max") + .intType() + .defaultValue(50) + .withDescription( + "Maximum number of HTTP connections in the S3 client connection pool. " + + "Applies to both the sync client (Apache HTTP) and the async client (Netty). " + + "Must be at least as large as 's3.bulk-copy.max-concurrent'."); + public static final ConfigOption BULK_COPY_MAX_CONCURRENT = ConfigOptions.key("s3.bulk-copy.max-concurrent") .intType() @@ -348,6 +357,8 @@ public FileSystem create(URI fsUri) throws IOException { readBufferSize); } + final int maxConnections = config.get(MAX_CONNECTIONS); + S3ClientProvider clientProvider = S3ClientProvider.builder() .accessKey(accessKey) @@ -355,6 +366,7 @@ public FileSystem create(URI fsUri) throws IOException { .region(region) .endpoint(endpoint) .pathStyleAccess(pathStyleAccess) + .maxConnections(maxConnections) .connectionTimeout(config.get(CONNECTION_TIMEOUT)) .socketTimeout(config.get(SOCKET_TIMEOUT)) .connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME)) @@ -374,7 +386,8 @@ public FileSystem create(URI fsUri) throws IOException { bulkCopyHelper = new NativeS3BulkCopyHelper( clientProvider.getTransferManager(), - config.get(BULK_COPY_MAX_CONCURRENT)); + config.get(BULK_COPY_MAX_CONCURRENT), + maxConnections); } return new NativeS3FileSystem( diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java index cee92d1ce5d50..daccd6247cdb8 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java @@ -389,7 +389,9 @@ public S3ClientProvider build() { NettyNioAsyncHttpClient.builder() .maxConcurrency(maxConnections) .connectionTimeout(connectionTimeout) - .readTimeout(socketTimeout)) + .readTimeout(socketTimeout) + .connectionAcquisitionTimeout( + connectionTimeout)) .overrideConfiguration(overrideConfig) .endpointOverride(endpointUri) .build()) diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java index 37b35f88acae8..a92d28fd16a83 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java @@ -21,6 +21,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import software.amazon.awssdk.core.exception.SdkClientException; + +import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNullPointerException; @@ -28,7 +31,9 @@ /** Tests for {@link NativeS3BulkCopyHelper}. */ class NativeS3BulkCopyHelperTest { - private static final NativeS3BulkCopyHelper helper = new NativeS3BulkCopyHelper(null, 1); + private static final NativeS3BulkCopyHelper helper = new NativeS3BulkCopyHelper(null, 1, 1); + + // --- URI parsing tests --- @ParameterizedTest @CsvSource({ @@ -105,4 +110,42 @@ void testExtractKeyVeryLongPath() { path.append("file.txt"); assertThat(helper.extractKey("s3://bucket/" + path)).isEqualTo(path.toString()); } + + @Test + void testConnectionPoolExhaustedDetection() { + assertThat( + NativeS3BulkCopyHelper.isConnectionPoolExhausted( + SdkClientException.builder() + .message( + "Unable to execute HTTP request: " + + "Acquire operation took longer than the configured maximum time.") + .build())) + .isTrue(); + assertThat( + NativeS3BulkCopyHelper.isConnectionPoolExhausted( + SdkClientException.builder() + .message("Unable to execute HTTP request") + .cause( + new RuntimeException( + "channel acquisition failed", + new java.util.concurrent.TimeoutException( + "Acquire operation took longer than 10000 milliseconds."))) + .build())) + .isTrue(); + assertThat( + NativeS3BulkCopyHelper.isConnectionPoolExhausted( + SdkClientException.builder().message("Access Denied").build())) + .isFalse(); + assertThat( + NativeS3BulkCopyHelper.isConnectionPoolExhausted( + new RuntimeException((String) null))) + .isFalse(); + assertThat(NativeS3BulkCopyHelper.isConnectionPoolExhausted(null)).isFalse(); + } + + @Test + void testEmptyRequestListIsNoOp() throws Exception { + NativeS3BulkCopyHelper noOpHelper = new NativeS3BulkCopyHelper(null, 16, 50); + noOpHelper.copyFiles(Collections.emptyList(), null); + } } diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java index 31e6814ab1b0e..95f7d4aba0641 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java @@ -353,6 +353,87 @@ void testEmptyRegionFallsBackToAutodiscovery() throws Exception { } } + @Test + void testInvalidMaxConnectionsThrowsException() { + NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); + Configuration config = new Configuration(); + config.setString("s3.access-key", "test-access-key"); + config.setString("s3.secret-key", "test-secret-key"); + config.setString("s3.region", "us-east-1"); + config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0); + config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); + + factory.configure(config); + + URI fsUri = URI.create("s3://test-bucket/"); + assertThatThrownBy(() -> factory.create(fsUri)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testInvalidBulkCopyMaxConcurrentThrowsException() { + NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); + Configuration config = new Configuration(); + config.setString("s3.access-key", "test-access-key"); + config.setString("s3.secret-key", "test-secret-key"); + config.setString("s3.region", "us-east-1"); + config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0); + config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); + + factory.configure(config); + + URI fsUri = URI.create("s3://test-bucket/"); + assertThatThrownBy(() -> factory.create(fsUri)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("maxConcurrentCopies must be positive"); + } + + @Test + void testBulkCopyMaxConcurrentClampedToMaxConnections() throws Exception { + NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); + Configuration config = new Configuration(); + config.setString("s3.access-key", "test-access-key"); + config.setString("s3.secret-key", "test-secret-key"); + config.setString("s3.region", "us-east-1"); + config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true); + config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32); + config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 10); + config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); + + factory.configure(config); + + URI fsUri = URI.create("s3://test-bucket/"); + FileSystem fs = factory.create(fsUri); + + assertThat(fs).isInstanceOf(NativeS3FileSystem.class); + NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; + assertThat(nativeFs.getBulkCopyHelper()).isNotNull(); + assertThat(nativeFs.getBulkCopyHelper().getEffectiveMaxConcurrentCopies()).isEqualTo(10); + } + + @Test + void testBulkCopyMaxConcurrentPreservedWithinMaxConnections() throws Exception { + NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); + Configuration config = new Configuration(); + config.setString("s3.access-key", "test-access-key"); + config.setString("s3.secret-key", "test-secret-key"); + config.setString("s3.region", "us-east-1"); + config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true); + config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 10); + config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 50); + config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); + + factory.configure(config); + + URI fsUri = URI.create("s3://test-bucket/"); + FileSystem fs = factory.create(fsUri); + + assertThat(fs).isInstanceOf(NativeS3FileSystem.class); + NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; + assertThat(nativeFs.getBulkCopyHelper()).isNotNull(); + assertThat(nativeFs.getBulkCopyHelper().getEffectiveMaxConcurrentCopies()).isEqualTo(10); + } + @Test void testS3ASchemeReturnsS3A() { NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory(); From 28ef8d92ca5fd31c3fc3de57f0fe1e1a78c2574c Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Mon, 20 Apr 2026 18:23:57 +0530 Subject: [PATCH 2/3] [FLINK-39482][filesystem] Address to review comments : paramertized test, rename method and use key --- .../fs/s3native/NativeS3BulkCopyHelper.java | 10 +-- .../s3native/NativeS3FileSystemFactory.java | 5 ++ .../s3native/NativeS3BulkCopyHelperTest.java | 68 +++++++++++-------- .../NativeS3FileSystemFactoryTest.java | 8 ++- 4 files changed, 54 insertions(+), 37 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java index f57368ab3f532..5fde18815d391 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java @@ -92,9 +92,11 @@ class NativeS3BulkCopyHelper { this.maxConnections = maxConnections; if (maxConcurrentCopies > maxConnections) { LOG.warn( - "s3.bulk-copy.max-concurrent ({}) exceeds s3.connection.max ({}). " + "{} ({}) exceeds {} ({}). " + "Clamping concurrent copies to {} to prevent connection pool exhaustion.", + NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(), maxConcurrentCopies, + NativeS3FileSystemFactory.MAX_CONNECTIONS.key(), maxConnections, maxConnections); this.maxConcurrentCopies = maxConnections; @@ -103,7 +105,7 @@ class NativeS3BulkCopyHelper { } } - int getEffectiveMaxConcurrentCopies() { + int getMaxConcurrentCopies() { return maxConcurrentCopies; } @@ -236,8 +238,8 @@ private void waitForCopies(List> futures) throw + "Consider reducing '%s' or increasing '%s'.", maxConnections, maxConcurrentCopies, - "s3.bulk-copy.max-concurrent", - "s3.connection.max"), + NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(), + NativeS3FileSystemFactory.MAX_CONNECTIONS.key()), cause); } throw new IOException("Bulk copy failed", cause); diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java index 5a4b0b6b865d5..e96ca2fc1b314 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java @@ -358,6 +358,11 @@ public FileSystem create(URI fsUri) throws IOException { } final int maxConnections = config.get(MAX_CONNECTIONS); + Preconditions.checkArgument( + maxConnections > 0, + "'%s' must be a positive integer, but was %d", + MAX_CONNECTIONS.key(), + maxConnections); S3ClientProvider clientProvider = S3ClientProvider.builder() diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java index a92d28fd16a83..5b244850dcae6 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java @@ -20,10 +20,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.core.exception.SdkClientException; import java.util.Collections; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNullPointerException; @@ -111,36 +115,40 @@ void testExtractKeyVeryLongPath() { assertThat(helper.extractKey("s3://bucket/" + path)).isEqualTo(path.toString()); } - @Test - void testConnectionPoolExhaustedDetection() { - assertThat( - NativeS3BulkCopyHelper.isConnectionPoolExhausted( - SdkClientException.builder() - .message( - "Unable to execute HTTP request: " - + "Acquire operation took longer than the configured maximum time.") - .build())) - .isTrue(); - assertThat( - NativeS3BulkCopyHelper.isConnectionPoolExhausted( - SdkClientException.builder() - .message("Unable to execute HTTP request") - .cause( - new RuntimeException( - "channel acquisition failed", - new java.util.concurrent.TimeoutException( - "Acquire operation took longer than 10000 milliseconds."))) - .build())) - .isTrue(); - assertThat( - NativeS3BulkCopyHelper.isConnectionPoolExhausted( - SdkClientException.builder().message("Access Denied").build())) - .isFalse(); - assertThat( - NativeS3BulkCopyHelper.isConnectionPoolExhausted( - new RuntimeException((String) null))) - .isFalse(); - assertThat(NativeS3BulkCopyHelper.isConnectionPoolExhausted(null)).isFalse(); + private static Stream connectionPoolExhaustedCases() { + return Stream.of( + Arguments.of( + "direct message match", + SdkClientException.builder() + .message( + "Unable to execute HTTP request: " + + "Acquire operation took longer than the configured maximum time.") + .build(), + true), + Arguments.of( + "nested causal chain", + SdkClientException.builder() + .message("Unable to execute HTTP request") + .cause( + new RuntimeException( + "channel acquisition failed", + new TimeoutException( + "Acquire operation took longer than 10000 milliseconds."))) + .build(), + true), + Arguments.of( + "unrelated error", + SdkClientException.builder().message("Access Denied").build(), + false), + Arguments.of("null message", new RuntimeException((String) null), false), + Arguments.of("null throwable", null, false)); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("connectionPoolExhaustedCases") + void testConnectionPoolExhaustedDetection( + String description, Throwable throwable, boolean expected) { + assertThat(NativeS3BulkCopyHelper.isConnectionPoolExhausted(throwable)).isEqualTo(expected); } @Test diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java index 95f7d4aba0641..d9bd091e8c0a5 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java @@ -367,7 +367,9 @@ void testInvalidMaxConnectionsThrowsException() { URI fsUri = URI.create("s3://test-bucket/"); assertThatThrownBy(() -> factory.create(fsUri)) - .isInstanceOf(IllegalArgumentException.class); + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("s3.connection.max") + .hasMessageContaining("must be a positive integer"); } @Test @@ -408,7 +410,7 @@ void testBulkCopyMaxConcurrentClampedToMaxConnections() throws Exception { assertThat(fs).isInstanceOf(NativeS3FileSystem.class); NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; assertThat(nativeFs.getBulkCopyHelper()).isNotNull(); - assertThat(nativeFs.getBulkCopyHelper().getEffectiveMaxConcurrentCopies()).isEqualTo(10); + assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10); } @Test @@ -431,7 +433,7 @@ void testBulkCopyMaxConcurrentPreservedWithinMaxConnections() throws Exception { assertThat(fs).isInstanceOf(NativeS3FileSystem.class); NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; assertThat(nativeFs.getBulkCopyHelper()).isNotNull(); - assertThat(nativeFs.getBulkCopyHelper().getEffectiveMaxConcurrentCopies()).isEqualTo(10); + assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10); } @Test From d3429001461b2d924dc89672b100ba684c3701ab Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Mon, 20 Apr 2026 22:32:18 +0530 Subject: [PATCH 3/3] [FLINK-39482][filesystem] Address to review comments : Nits and suggestions --- .../flink/fs/s3native/NativeS3FileSystemFactory.java | 10 ++++++++-- .../fs/s3native/NativeS3FileSystemFactoryTest.java | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java index e96ca2fc1b314..81d92dc762bea 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java @@ -360,7 +360,7 @@ public FileSystem create(URI fsUri) throws IOException { final int maxConnections = config.get(MAX_CONNECTIONS); Preconditions.checkArgument( maxConnections > 0, - "'%s' must be a positive integer, but was %d", + "'%s' must be a positive integer, but was %s", MAX_CONNECTIONS.key(), maxConnections); @@ -388,10 +388,16 @@ public FileSystem create(URI fsUri) throws IOException { NativeS3BulkCopyHelper bulkCopyHelper = null; if (config.get(BULK_COPY_ENABLED)) { + final int bulkCopyMaxConcurrent = config.get(BULK_COPY_MAX_CONCURRENT); + Preconditions.checkArgument( + bulkCopyMaxConcurrent > 0, + "'%s' must be a positive integer, but was %s", + BULK_COPY_MAX_CONCURRENT.key(), + bulkCopyMaxConcurrent); bulkCopyHelper = new NativeS3BulkCopyHelper( clientProvider.getTransferManager(), - config.get(BULK_COPY_MAX_CONCURRENT), + bulkCopyMaxConcurrent, maxConnections); } diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java index d9bd091e8c0a5..f9e57b55858be 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java @@ -387,7 +387,8 @@ void testInvalidBulkCopyMaxConcurrentThrowsException() { URI fsUri = URI.create("s3://test-bucket/"); assertThatThrownBy(() -> factory.create(fsUri)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("maxConcurrentCopies must be positive"); + .hasMessageContaining("s3.bulk-copy.max-concurrent") + .hasMessageContaining("must be a positive integer"); } @Test