[#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use future and solve hang problem#9282
Conversation
Replaced Awaitility-based polling with CompletableFuture for FileSystem retrieval using a dedicated ThreadPoolExecutor. This improves performance and simplifies timeout handling by leveraging async execution and modern concurrency utilities. Added cancellation handling and clearer exception propagation for robustness.
|
Gravitino still hangs on, although this has nothing to do with file system initialization, the following is the stack: Let me solve it by the way. |
There was a problem hiding this comment.
Pull request overview
This PR refactors FileSystem retrieval to use CompletableFuture instead of Awaitility-based polling to address a hang issue where provider.getFileSystem(path, config) could hang indefinitely. The changes include adding a dedicated ThreadPoolExecutor for async FileSystem retrieval with proper timeout and cancellation handling, plus adding default timeout configurations to various FileSystem providers (HDFS, GCS, Azure, S3, OSS) to speed up test failures.
- Replaces Awaitility polling with
Future.get()with timeout for more robust timeout handling - Introduces a static
ThreadPoolExecutorfor async FileSystem retrieval operations - Adds default connection timeout and retry configurations across all FileSystem providers
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| FilesetCatalogOperations.java | Replaces Awaitility with Future-based timeout mechanism using a new ThreadPoolExecutor |
| HDFSFileSystemProvider.java | Adds default HDFS connection timeout and ping configurations |
| GCSFileSystemProvider.java | Adds default GCS HTTP connect timeout and retry limit configurations |
| AzureFileSystemProvider.java | Adds default Azure retry limit configuration |
| S3FileSystemProvider.java | Adds default S3 retry limits and connection timeout configurations |
| OSSFileSystemProvider.java | Adds default OSS connection timeout and retry limit configurations |
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
...doop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
Reduced thread pool keep-alive time from 50ms to 5s for better resource management. Added proper shutdown for the executor and improved logging for filesystem retrieval timeouts to enhance debugging and reliability.
future and solve hang problem
bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
Show resolved
Hide resolved
bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Show resolved
Hide resolved
...doop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
Show resolved
Hide resolved
bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
Show resolved
Hide resolved
bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
Show resolved
Hide resolved
bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
Show resolved
Hide resolved
bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
Show resolved
Hide resolved
|
|
||
| private final ThreadPoolExecutor fileSystemExecutor = | ||
| new ThreadPoolExecutor( | ||
| Math.max(2, Runtime.getRuntime().availableProcessors() * 2), |
There was a problem hiding this comment.
This will create too many threads for server if they're more than 32 cores.
There was a problem hiding this comment.
I limit the max thread number to 24
...fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
...fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
...fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
...fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
...doop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
Outdated
Show resolved
Hide resolved
...doop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
Show resolved
Hide resolved
bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
Show resolved
Hide resolved
...log-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
Outdated
Show resolved
Hide resolved
| // Test the following method should finish with 10s | ||
| long now = System.currentTimeMillis(); | ||
| try { | ||
| filesetCatalogOperations.getFileSystem(new Path("file:///tmp"), ImmutableMap.of()); | ||
| } catch (IOException e) { | ||
| long timeTake = System.currentTimeMillis() - now; | ||
| Assertions.assertTrue(timeTake <= 10000); | ||
| } |
There was a problem hiding this comment.
The test doesn't verify that the exception is actually thrown - it silently catches IOException and only checks the timing. If no exception is thrown, the test will pass incorrectly. Consider using Assertions.assertThrows() to ensure an IOException is thrown, then verify the timing within that context.
| // Test the following method should finish with 10s | |
| long now = System.currentTimeMillis(); | |
| try { | |
| filesetCatalogOperations.getFileSystem(new Path("file:///tmp"), ImmutableMap.of()); | |
| } catch (IOException e) { | |
| long timeTake = System.currentTimeMillis() - now; | |
| Assertions.assertTrue(timeTake <= 10000); | |
| } | |
| // Test the following method should finish with 10s and throw IOException | |
| Assertions.assertThrows( | |
| IOException.class, | |
| () -> { | |
| long now = System.currentTimeMillis(); | |
| try { | |
| filesetCatalogOperations.getFileSystem(new Path("file:///tmp"), ImmutableMap.of()); | |
| } finally { | |
| long timeTake = System.currentTimeMillis() - now; | |
| Assertions.assertTrue(timeTake <= 10000, "Timeout should occur within 10 seconds"); | |
| } | |
| }); |
bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
Outdated
Show resolved
Hide resolved
| private final ThreadPoolExecutor fileSystemExecutor = | ||
| new ThreadPoolExecutor( | ||
| Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2, 16)), | ||
| Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2, 32)), | ||
| 5L, | ||
| TimeUnit.SECONDS, | ||
| new ArrayBlockingQueue<>(1000), | ||
| new ThreadFactoryBuilder() | ||
| .setDaemon(true) | ||
| .setNameFormat("fileset-filesystem-getter-pool-%d") | ||
| .build(), | ||
| new ThreadPoolExecutor.AbortPolicy()) { | ||
| { | ||
| allowCoreThreadTimeOut(true); | ||
| } | ||
| }; |
There was a problem hiding this comment.
The ThreadPoolExecutor is initialized as a final instance field, but the close() method calls shutdownNow() without checking if it has already been shut down. If close() is called multiple times, this could throw an exception or cause issues. Consider adding a guard to check the executor's state before shutting it down, or use a safer shutdown pattern.
...fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
Show resolved
Hide resolved
| try { | ||
| filesetCatalogOperations.getFileSystem(new Path("file:///tmp"), ImmutableMap.of()); | ||
| } catch (IOException e) { | ||
| long timeTake = System.currentTimeMillis() - now; | ||
| Assertions.assertTrue(timeTake <= 10000); | ||
| } |
There was a problem hiding this comment.
The test expects an IOException to be thrown but doesn't verify the exception message or type. Consider adding assertions to verify that the correct exception is thrown with an appropriate error message about the timeout, to ensure the error handling path works as expected.
|
|
||
| private final ThreadPoolExecutor fileSystemExecutor = | ||
| new ThreadPoolExecutor( | ||
| Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2, 16)), |
There was a problem hiding this comment.
Do we need to keep such threads active for use? My feeling is that most of the threads can be swept when idle to save resources.
There was a problem hiding this comment.
I have allowed core poll timeout, and the threads will stop if there is no task assigned to the pool.
| e); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new IOException("Interrupted while waiting for FileSystem", e); |
There was a problem hiding this comment.
I think this is expected, shall we throw an exception here?
There was a problem hiding this comment.
Normally, a TimeoutException will occur if it hangs for a long time. and only when we interrupt it deliberately will it throw InterruptedException.
There was a problem hiding this comment.
I think you don't understand what I mean. InterruptedException often happens when closing or shutting down. This is expected and should not throw an IOException instead.
...t/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserAuthenticationIT.java
Outdated
Show resolved
Hide resolved
| filesetCatalogOperations.getFileSystem(new Path("file:///tmp"), ImmutableMap.of()); | ||
| } catch (IOException e) { | ||
| long timeTake = System.currentTimeMillis() - now; | ||
| Assertions.assertTrue(timeTake <= 10000); |
There was a problem hiding this comment.
I guess this will be very flaky if the test machine is under heavy load.
There was a problem hiding this comment.
I have used annotation Timeout(15) to replace it.
| } | ||
|
|
||
| if (!configs.containsKey(HDFS_IPC_PING_KEY)) { | ||
| additionalConfigs.put(HDFS_IPC_PING_KEY, "true"); |
There was a problem hiding this comment.
I found that you have several customized conf values here and above, you'd also define constants for these.
What changes were proposed in this pull request?
Replaced Awaitility-based polling with CompletableFuture for FileSystem retrieval using a dedicated ThreadPoolExecutor. This improves performance and simplifies timeout handling by leveraging async execution and modern concurrency utilities. Added cancellation handling and clearer exception propagation for robustness.
Why are the changes needed?
provider.getFileSystem(path, config)may hang and will never return a value, which will cause theAwaitilitymechanism does not work as expected.Fix: #9280
Does this PR introduce any user-facing change?
N/A
How was this patch tested?
Test locally and existing tests.