CAMEL-23495: Replace Thread.sleep() with Awaitility in camel-core and camel-management tests#23180
CAMEL-23495: Replace Thread.sleep() with Awaitility in camel-core and camel-management tests#23180gnodet wants to merge 4 commits into
Conversation
… camel-management tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| // even though we wait for the queues to empty, there is a race condition where the consumer | ||
| // may still process messages while it's being suspended due to asynchronous message handling. | ||
| // as a result, we need to wait a bit longer to ensure that the seda consumer is suspended before | ||
| // sending the next message. | ||
| Thread.sleep(1000L); | ||
|
|
There was a problem hiding this comment.
there is no await replacement then this thread;sleep. This could only be worse than it currently is
There was a problem hiding this comment.
What about the pollDelay(1, TimeUnit.SECONDS) ?
There was a problem hiding this comment.
Fixed in ef24703 — now split into two await() calls: first checks queues are empty, then uses pollDelay(1s) to give the consumer thread time to idle after the queues empty. The second await also asserts the consumer status is Suspended.
Claude Code on behalf of Guillaume Nodet
| // may still process messages while it's being suspended due to asynchronous message handling. | ||
| // as a result, we need to wait a bit longer to ensure that the seda consumer is suspended before | ||
| // sending the next message. | ||
| Thread.sleep(1000L); |
There was a problem hiding this comment.
there isn't a wait condition anymore ensuring that the message is suspended
There was a problem hiding this comment.
What about the pollDelay(1, TimeUnit.SECONDS) ?
There was a problem hiding this comment.
Fixed in ef24703 — now split into two await() calls: first checks context.isSuspended(), then uses pollDelay(1s) to give the seda consumer thread time to complete its poll cycle before sending the test message.
Claude Code on behalf of Guillaume Nodet
| // may still process messages while it's being suspended due to asynchronous message handling. | ||
| // as a result, we need to wait a bit longer to ensure that the seda consumer is suspended before | ||
| // sending the next message. | ||
| Thread.sleep(1000L); |
There was a problem hiding this comment.
there is no await replacement then this thread;sleep. This could only be worse than it currently is
There was a problem hiding this comment.
What about the pollDelay(1, TimeUnit.SECONDS) ?
There was a problem hiding this comment.
The poll Delay is before checking for for the suspension and the big comment is explaining that we need to wait after this point due to async behavior
There was a problem hiding this comment.
You're right, the pollDelay was before the condition check, but the original Thread.sleep was after the queue-empty await. Fixed in ef24703 — now split into two steps:
await().until(queue.isEmpty())— checks the condition firstawait().pollDelay(1s).untilAsserted(...)— then gives the consumer thread time to idle
This preserves the original semantics: wait for the queue to empty, then wait for the consumer to stop its poll loop.
Claude Code on behalf of Guillaume Nodet
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🐫 Apache Camel Committers, please review the following items:
|
Split single pollDelay-based await into two steps: 1. First await checks the queue-empty condition 2. Second await with pollDelay gives the consumer thread time to idle after the queue empties Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Thanks for the review! You're right — the
Claude Code on behalf of Guillaume Nodet |
Use pollDelay(500ms) and atMost(2s) to match the original Thread.sleep(500) timing and stay within the StopWatch 5-second budget. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… diverges Move the inflight count, exchange ID, and duration assertions into untilAsserted so Awaitility polls until the oldest exchange actually changes, avoiding a race where durations are captured at the same value. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
🧪 CI tested the following changed modules:
Build reactor — dependencies compiled but only changed modules were tested (2 modules)
|
|
Commits cherry-picked into #23178 which consolidates all flaky test fixes. |
CAMEL-23495
Replace
Thread.sleep()synchronization calls with Awaitility in 15 test files across camel-core and camel-management to reduce flakiness under CI load.Changes by category
Suspend/resume tests (4 files)
RouteSedaSuspendResumeTest,DefaultCamelContextSuspendResumeRouteTest,TwoRouteSuspendResumeTest,SedaConsumerSuspendResumeTestThread.sleep(1000L)+ existingawait()with a singleawait().pollDelay(1s).atMost(5s)to give SEDA consumer thread time to idle after route suspensionManagement tests (2 files)
ManagedInflightStatisticsTest— replacedThread.sleep(250)andThread.sleep(200)withawait().until()checking JMX inflight exchange countDefaultExecutorServiceManagerTest— replacedThread.sleep(3000)with astartedCountDownLatch +await()to detect when the pool task beginsAsync/scheduler tests (2 files)
SchedulerMulticastParallelGreedyTest— replacedThread.sleep(50)withMockEndpoint.setAssertPeriod(200)to verify no extra messages arriveThreadsRejectedExecutionWithDeadLetterTest— replacedThread.sleep(100)withawait().until()checking the rejected message arrived at dead letterAggregation tests (3 files)
AggregateGroupedExchangeBatchSizeTest— replacedThread.sleep(1000)withawait()for the second batch via completion timeoutAggregateDiscardOnTimeoutTest— replacedThread.sleep(250)withMockEndpoint.setAssertPeriod(500)AggregateClosedCorrelationKeyTest— replacedThread.sleep(200)withawait().until()checking all aggregated results arrivedRedelivery/shutdown tests (3 files)
NotAllowRedeliveryWhileStoppingTest,NotAllowRedeliveryWhileStoppingDeadLetterChannelTest— replacedThread.sleep(500)withawait()checking inflight repositoryRedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest— replacedThread.sleep(500)withawait().until(() -> counter.get() >= 20)Resequencer test (1 file)
ResequenceStreamRejectOldExchangesTest— replacedThread.sleep(100)withawait().until()checking first message delivered before sending the restOut of scope
The following
Thread.sleep()uses were intentionally left unchanged:Processor.process()orRunnable.run()simulating slow I/OClaude Code on behalf of Guillaume Nodet