refactor(inkless): improve thread pool lifecycle management#475
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request fixes thread pool resource leaks in three Inkless storage components by ensuring proper shutdown of all ExecutorService instances during broker lifecycle operations.
Key Changes:
- Added missing shutdown call for
executorServiceCacheStorein FileCommitter, preventing leak of 8 threads per broker shutdown - Implemented Closeable interface for DeleteRecordsInterceptor with proper ExecutorService shutdown using 5-second timeout
- Improved FetchOffsetHandler shutdown from basic
shutdown()toshutdownExecutorServiceQuietly()with 5-second timeout
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java | Added missing executorServiceCacheStore.shutdown() call and documentation about thread pool lifecycle |
| storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java | Added verification for executorServiceCacheStore shutdown |
| storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java | Implemented Closeable, changed Executor to ExecutorService, added InklessThreadFactory for thread naming, and proper shutdown with timeout |
| storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java | Added close() calls to all tests and new test verifying shutdown behavior |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchOffsetHandler.java | Changed to shutdownExecutorServiceQuietly with timeout, refactored constructor for testability |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FetchOffsetHandlerTest.java | Added test verifying proper executor shutdown with timeout |
| storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java | Added documentation about thread pool lifecycle management |
| storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java | Added documentation about thread pool lifecycle management |
| core/src/main/scala/kafka/server/ReplicaManager.scala | Added close() call for inklessDeleteRecordsInterceptor in shutdown sequence |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java
Outdated
Show resolved
Hide resolved
2458994 to
89571b9
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
89571b9 to
d6f8803
Compare
43da310 to
e871ebb
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- **FileCommitter**: Add missing `executorServiceCacheStore` shutdown and implement ordered shutdown sequence (reject → cancel cache → await commits → force-stop uploads) to prevent orphaned files in object storage - **DeleteRecordsInterceptor**: Implement `Closeable`, narrow dependencies from `SharedState` to `ControlPlane` + `MetadataView`, add `InklessThreadFactory` for consistent thread naming, and register in `ReplicaManager.close()` - **FetchOffsetHandler**: Use `shutdownExecutorServiceQuietly()` with 5s timeout, extract constructor for testability - **Reader, Writer**: Add Javadoc documenting thread pool lifecycle
e871ebb to
36a9c65
Compare
viktorsomogyi
left a comment
There was a problem hiding this comment.
I think overall this is good, had one minor comment. I won't merge yet to give @Mwea a chance to add comments if needed.
storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java
Outdated
Show resolved
Hide resolved
- **FileCommitter**: Add missing `executorServiceCacheStore` shutdown and implement ordered shutdown sequence (reject → cancel cache → await commits → force-stop uploads) to prevent orphaned files in object storage - **DeleteRecordsInterceptor**: Implement `Closeable`, narrow dependencies from `SharedState` to `ControlPlane` + `MetadataView`, add `InklessThreadFactory` for consistent thread naming, and register in `ReplicaManager.close()` - **FetchOffsetHandler**: Use `shutdownExecutorServiceQuietly()` with 5s timeout, extract constructor for testability - **Reader, Writer**: Add Javadoc documenting thread pool lifecycle (cherry picked from commit 3daf634)
- **FileCommitter**: Add missing `executorServiceCacheStore` shutdown and implement ordered shutdown sequence (reject → cancel cache → await commits → force-stop uploads) to prevent orphaned files in object storage - **DeleteRecordsInterceptor**: Implement `Closeable`, narrow dependencies from `SharedState` to `ControlPlane` + `MetadataView`, add `InklessThreadFactory` for consistent thread naming, and register in `ReplicaManager.close()` - **FetchOffsetHandler**: Use `shutdownExecutorServiceQuietly()` with 5s timeout, extract constructor for testability - **Reader, Writer**: Add Javadoc documenting thread pool lifecycle
- **FileCommitter**: Add missing `executorServiceCacheStore` shutdown and implement ordered shutdown sequence (reject → cancel cache → await commits → force-stop uploads) to prevent orphaned files in object storage - **DeleteRecordsInterceptor**: Implement `Closeable`, narrow dependencies from `SharedState` to `ControlPlane` + `MetadataView`, add `InklessThreadFactory` for consistent thread naming, and register in `ReplicaManager.close()` - **FetchOffsetHandler**: Use `shutdownExecutorServiceQuietly()` with 5s timeout, extract constructor for testability - **Reader, Writer**: Add Javadoc documenting thread pool lifecycle (cherry picked from commit 3daf634)
executorServiceCacheStoreshutdown and implement ordered shutdown sequence (reject → cancel cache → await commits → force-stop uploads) to prevent orphaned files in object storageCloseable, narrow dependencies fromSharedStatetoControlPlane+MetadataView, addInklessThreadFactoryfor consistent thread naming, and register inReplicaManager.close()shutdownExecutorServiceQuietly()with 5s timeout, extract constructor for testabilityTest plan