-
Notifications
You must be signed in to change notification settings - Fork 566
Description
What happened
Currently, when querying MQ messages from multiple Pulsar clusters, there are several issues:
-
No thread pool management: Message queries are executed without proper thread pool management, which may lead to resource exhaustion under high concurrency.
-
No graceful handling for task rejection: When too many concurrent requests come in, the system doesn't properly handle the
RejectedExecutionExceptionand doesn't provide a user-friendly error response. -
No task cancellation mechanism: When task submission fails, previously submitted tasks continue to run unnecessarily.
-
No interruption support: Long-running IO operations cannot be cancelled when the request is aborted.
What you expected to happen
-
Add a dedicated thread pool: Use
ThreadPoolTaskExecutorwith configurable core/max pool size and queue capacity for message query tasks. -
Implement task cancellation: When
RejectedExecutionExceptionoccurs, cancel all previously submitted tasks to free up resources. -
Add interruption checks: Check
Thread.currentThread().isInterrupted()before and after IO operations to support task cancellation.
How to reproduce
If you frequently submit tasks to a multi-cluster Pulsar setup, you will find that later-submitted tasks time out during submission. However, these tasks remain queued and processed in the code, even though they are useless and should be discarded.
Environment
No response
InLong version
master
InLong Component
InLong Manager
Are you willing to submit PR?
- Yes, I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct