From 66d5e9350993b1b7197146118bcaa9d484758fef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20TANNEUX?= Date: Wed, 25 Feb 2026 12:34:11 +0100 Subject: [PATCH 1/3] feat: Add receiver to stop the worker when queue is empty for a among of time --- src/Console/Command/ConsumeCommand.php | 5 + .../Receiver/Builder/ReceiverBuilder.php | 19 +++- .../Receiver/LimitTimeWhenEmptyReceiver.php | 92 +++++++++++++++++++ .../LimitTimeWhenEmptyReceiverTest.php | 71 ++++++++++++++ 4 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php create mode 100644 tests/Consumer/Receiver/LimitTimeWhenEmptyReceiverTest.php diff --git a/src/Console/Command/ConsumeCommand.php b/src/Console/Command/ConsumeCommand.php index c402dd3..5654ca2 100644 --- a/src/Console/Command/ConsumeCommand.php +++ b/src/Console/Command/ConsumeCommand.php @@ -83,6 +83,7 @@ protected function configure(): void ->addOption('save', null, InputOption::VALUE_NONE, 'Save failed job.') ->addOption('max', null, InputOption::VALUE_REQUIRED, 'The max number of jobs.') ->addOption('expire', null, InputOption::VALUE_REQUIRED, 'The worker duration in seconds.') + ->addOption('expireWhenEmpty', null, InputOption::VALUE_REQUIRED, 'Stop the worker when the queues are empty for a among of time in seconds.') ->addOption('stopWhenEmpty', null, InputOption::VALUE_NONE, 'Stop the worker if the queues are empty.') ->addOption('stopOnError', null, InputOption::VALUE_NONE, 'Stop the worker if error occurs.') ->addOption('logger', null, InputOption::VALUE_REQUIRED, 'The logger to use "stdout", "null", or "default".', "default") @@ -159,6 +160,10 @@ protected function createExtension(InputInterface $input, OutputInterface $outpu $builder->expire((int)$input->getOption('expire')); } + if ($input->getOption('expireWhenEmpty')) { + $builder->expireWhenEmpty((int)$input->getOption('expireWhenEmpty')); + } + $memory = $this->convertToBytes($input->getOption('memory')); if ($memory > 0) { $builder->memory($memory); diff --git a/src/Consumer/Receiver/Builder/ReceiverBuilder.php b/src/Consumer/Receiver/Builder/ReceiverBuilder.php index 282a2e0..e5ba8ea 100644 --- a/src/Consumer/Receiver/Builder/ReceiverBuilder.php +++ b/src/Consumer/Receiver/Builder/ReceiverBuilder.php @@ -7,6 +7,7 @@ use Bdf\Queue\Consumer\Receiver\Binder\BinderInterface; use Bdf\Queue\Consumer\Receiver\Binder\BinderReceiver; use Bdf\Queue\Consumer\Receiver\Binder\ClassNameBinder; +use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver; use Bdf\Queue\Consumer\Receiver\MemoryLimiterReceiver; use Bdf\Queue\Consumer\Receiver\MessageCountLimiterReceiver; use Bdf\Queue\Consumer\Receiver\MessageLoggerReceiver; @@ -219,17 +220,31 @@ public function max(int $number): ReceiverBuilder * Limit the number of received message * When the limit is reached, the consumer is stopped * - * @param int $seconds Number of messages + * @param int $seconds Time in seconds * * @return $this * - * @see MessageCountLimiterReceiver + * @see TimeLimiterReceiver */ public function expire(int $seconds): ReceiverBuilder { return $this->add(new TimeLimiterReceiver($seconds, $this->logger)); } + /** + * Stops consumption when the queues are empty for a among of time + * + * @param int $seconds Time in seconds + * + * @return $this + * + * @see LimitTimeWhenEmptyReceiver + */ + public function expireWhenEmpty(int $seconds): ReceiverBuilder + { + return $this->add(new LimitTimeWhenEmptyReceiver($seconds, $this->logger)); + } + /** * Limit the total memory usage of the current runtime * When the limit is reached, the consumer is stopped diff --git a/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php b/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php new file mode 100644 index 0000000..b4ba24f --- /dev/null +++ b/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php @@ -0,0 +1,92 @@ +delegate = $args[0]; + ++$index; + } + + $this->endTime = null; + $this->limit = $args[$index++]; + $this->logger = $args[$index] ?? null; + } + + /** + * {@inheritdoc} + */ + public function receive($message, ConsumerInterface $consumer): void + { + $this->endTime = null; + + $next = $this->delegate ?? $consumer; + $next->receive($message, $consumer); + } + + /** + * {@inheritdoc} + */ + public function receiveTimeout(ConsumerInterface $consumer): void + { + $next = $this->delegate ?? $consumer; + $next->receiveTimeout($consumer); + + if (null === $this->endTime) { + $this->endTime = $this->limit + time(); + } + + if ($this->endTime >= time()) { + return; + } + + $consumer->stop(); + + if (null !== $this->logger) { + $this->logger->info('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => $this->limit]); + } + } +} diff --git a/tests/Consumer/Receiver/LimitTimeWhenEmptyReceiverTest.php b/tests/Consumer/Receiver/LimitTimeWhenEmptyReceiverTest.php new file mode 100644 index 0000000..f8444ca --- /dev/null +++ b/tests/Consumer/Receiver/LimitTimeWhenEmptyReceiverTest.php @@ -0,0 +1,71 @@ +createMock(NextInterface::class); + $next->expects($this->never())->method('stop'); + + $extension = new LimitTimeWhenEmptyReceiver(2); + $extension->receiveTimeout($next); + $extension->receiveTimeout($next); + sleep(2); + $extension->receive('message', $next); + $extension->receiveTimeout($next); + } + + /** + * @group time-sensitive + */ + public function test_receiver_stops_when_time_limit_is_reached() + { + $next = $this->createMock(NextInterface::class); + $next->expects($this->once())->method('stop'); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => 1]); + + $extension = new LimitTimeWhenEmptyReceiver(1, $logger); + $extension->receiveTimeout($next); + sleep(2); + $extension->receiveTimeout($next); + } + + /** + * @group time-sensitive + */ + public function test_receiver_stops_when_time_limit_is_reached_legacy() + { + $decorated = $this->createMock(ReceiverInterface::class); + $decorated->expects($this->any())->method('receiveTimeout'); + + $next = $this->createMock(ConsumerInterface::class); + $next->expects($this->once())->method('stop'); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => 1]); + + $extension = new LimitTimeWhenEmptyReceiver($decorated, 1, $logger); + $extension->receiveTimeout($next); + sleep(2); + $extension->receiveTimeout($next); + } +} From 88a3cee21350967f59de8e09429693486fec614a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20TANNEUX?= Date: Wed, 25 Feb 2026 15:18:48 +0100 Subject: [PATCH 2/3] fix: Fix wrong sentence and add type on new receiver --- src/Console/Command/ConsumeCommand.php | 2 +- .../Receiver/LimitTimeWhenEmptyReceiver.php | 21 +++---------------- .../Receiver/Builder/ReceiverBuilderTest.php | 17 +++++++++++++++ 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/Console/Command/ConsumeCommand.php b/src/Console/Command/ConsumeCommand.php index 5654ca2..5c8adb8 100644 --- a/src/Console/Command/ConsumeCommand.php +++ b/src/Console/Command/ConsumeCommand.php @@ -83,7 +83,7 @@ protected function configure(): void ->addOption('save', null, InputOption::VALUE_NONE, 'Save failed job.') ->addOption('max', null, InputOption::VALUE_REQUIRED, 'The max number of jobs.') ->addOption('expire', null, InputOption::VALUE_REQUIRED, 'The worker duration in seconds.') - ->addOption('expireWhenEmpty', null, InputOption::VALUE_REQUIRED, 'Stop the worker when the queues are empty for a among of time in seconds.') + ->addOption('expireWhenEmpty', null, InputOption::VALUE_REQUIRED, 'Stop the worker when the queues are empty for an among of time in seconds.') ->addOption('stopWhenEmpty', null, InputOption::VALUE_NONE, 'Stop the worker if the queues are empty.') ->addOption('stopOnError', null, InputOption::VALUE_NONE, 'Stop the worker if error occurs.') ->addOption('logger', null, InputOption::VALUE_REQUIRED, 'The logger to use "stdout", "null", or "default".', "default") diff --git a/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php b/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php index b4ba24f..9070d8a 100644 --- a/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php +++ b/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php @@ -14,24 +14,9 @@ class LimitTimeWhenEmptyReceiver implements ReceiverInterface { use DelegateHelper; - /** - * Time limit in second - * - * @var int - */ - private $limit; - - /** - * The end time - * - * @var int - */ - private $endTime; - - /** - * @var LoggerInterface - */ - private $logger; + private int $limit; + private ?int $endTime; + private ?LoggerInterface $logger; /** * TimeLimiterMiddlewareReceiver constructor. diff --git a/tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php b/tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php index f761fdb..4e49f6e 100644 --- a/tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php +++ b/tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php @@ -9,6 +9,7 @@ use Bdf\Queue\Consumer\Receiver\BenchReceiver; use Bdf\Queue\Consumer\Receiver\Binder\BinderReceiver; use Bdf\Queue\Consumer\Receiver\Binder\ClassNameBinder; +use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver; use Bdf\Queue\Consumer\Receiver\MemoryLimiterReceiver; use Bdf\Queue\Consumer\Receiver\MessageCountLimiterReceiver; use Bdf\Queue\Consumer\Receiver\MessageLoggerReceiver; @@ -573,6 +574,22 @@ public function test_expire() ); } + /** + * + */ + public function test_expire_when_empty() + { + $this->builder->expireWhenEmpty(10); + + $this->assertEquals( + new ReceiverPipeline([ + new LimitTimeWhenEmptyReceiver(10, new LoggerProxy(new NullLogger())), + new ProcessorReceiver(new JobHintProcessorResolver($this->container->get(InstantiatorInterface::class))), + ]), + $this->builder->build() + ); + } + /** * */ From b0f5789784b36df99b09f24bdfd712573114dbad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20TANNEUX?= Date: Thu, 26 Feb 2026 09:30:37 +0100 Subject: [PATCH 3/3] doc: Fix wrong word --- src/Console/Command/ConsumeCommand.php | 2 +- src/Consumer/Receiver/Builder/ReceiverBuilder.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Console/Command/ConsumeCommand.php b/src/Console/Command/ConsumeCommand.php index 5c8adb8..17b3b0a 100644 --- a/src/Console/Command/ConsumeCommand.php +++ b/src/Console/Command/ConsumeCommand.php @@ -83,7 +83,7 @@ protected function configure(): void ->addOption('save', null, InputOption::VALUE_NONE, 'Save failed job.') ->addOption('max', null, InputOption::VALUE_REQUIRED, 'The max number of jobs.') ->addOption('expire', null, InputOption::VALUE_REQUIRED, 'The worker duration in seconds.') - ->addOption('expireWhenEmpty', null, InputOption::VALUE_REQUIRED, 'Stop the worker when the queues are empty for an among of time in seconds.') + ->addOption('expireWhenEmpty', null, InputOption::VALUE_REQUIRED, 'Stop the worker when the queues are empty for an amount of time in seconds.') ->addOption('stopWhenEmpty', null, InputOption::VALUE_NONE, 'Stop the worker if the queues are empty.') ->addOption('stopOnError', null, InputOption::VALUE_NONE, 'Stop the worker if error occurs.') ->addOption('logger', null, InputOption::VALUE_REQUIRED, 'The logger to use "stdout", "null", or "default".', "default") diff --git a/src/Consumer/Receiver/Builder/ReceiverBuilder.php b/src/Consumer/Receiver/Builder/ReceiverBuilder.php index e5ba8ea..731566e 100644 --- a/src/Consumer/Receiver/Builder/ReceiverBuilder.php +++ b/src/Consumer/Receiver/Builder/ReceiverBuilder.php @@ -232,7 +232,7 @@ public function expire(int $seconds): ReceiverBuilder } /** - * Stops consumption when the queues are empty for a among of time + * Stops consumption when the queues are empty for an amount of time * * @param int $seconds Time in seconds *