diff --git a/src/Console/Command/ConsumeCommand.php b/src/Console/Command/ConsumeCommand.php index c402dd3..17b3b0a 100644 --- a/src/Console/Command/ConsumeCommand.php +++ b/src/Console/Command/ConsumeCommand.php @@ -83,6 +83,7 @@ protected function configure(): void ->addOption('save', null, InputOption::VALUE_NONE, 'Save failed job.') ->addOption('max', null, InputOption::VALUE_REQUIRED, 'The max number of jobs.') ->addOption('expire', null, InputOption::VALUE_REQUIRED, 'The worker duration in seconds.') + ->addOption('expireWhenEmpty', null, InputOption::VALUE_REQUIRED, 'Stop the worker when the queues are empty for an amount of time in seconds.') ->addOption('stopWhenEmpty', null, InputOption::VALUE_NONE, 'Stop the worker if the queues are empty.') ->addOption('stopOnError', null, InputOption::VALUE_NONE, 'Stop the worker if error occurs.') ->addOption('logger', null, InputOption::VALUE_REQUIRED, 'The logger to use "stdout", "null", or "default".', "default") @@ -159,6 +160,10 @@ protected function createExtension(InputInterface $input, OutputInterface $outpu $builder->expire((int)$input->getOption('expire')); } + if ($input->getOption('expireWhenEmpty')) { + $builder->expireWhenEmpty((int)$input->getOption('expireWhenEmpty')); + } + $memory = $this->convertToBytes($input->getOption('memory')); if ($memory > 0) { $builder->memory($memory); diff --git a/src/Consumer/Receiver/Builder/ReceiverBuilder.php b/src/Consumer/Receiver/Builder/ReceiverBuilder.php index 282a2e0..731566e 100644 --- a/src/Consumer/Receiver/Builder/ReceiverBuilder.php +++ b/src/Consumer/Receiver/Builder/ReceiverBuilder.php @@ -7,6 +7,7 @@ use Bdf\Queue\Consumer\Receiver\Binder\BinderInterface; use Bdf\Queue\Consumer\Receiver\Binder\BinderReceiver; use Bdf\Queue\Consumer\Receiver\Binder\ClassNameBinder; +use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver; use Bdf\Queue\Consumer\Receiver\MemoryLimiterReceiver; use Bdf\Queue\Consumer\Receiver\MessageCountLimiterReceiver; use Bdf\Queue\Consumer\Receiver\MessageLoggerReceiver; @@ -219,17 +220,31 @@ public function max(int $number): ReceiverBuilder * Limit the number of received message * When the limit is reached, the consumer is stopped * - * @param int $seconds Number of messages + * @param int $seconds Time in seconds * * @return $this * - * @see MessageCountLimiterReceiver + * @see TimeLimiterReceiver */ public function expire(int $seconds): ReceiverBuilder { return $this->add(new TimeLimiterReceiver($seconds, $this->logger)); } + /** + * Stops consumption when the queues are empty for an amount of time + * + * @param int $seconds Time in seconds + * + * @return $this + * + * @see LimitTimeWhenEmptyReceiver + */ + public function expireWhenEmpty(int $seconds): ReceiverBuilder + { + return $this->add(new LimitTimeWhenEmptyReceiver($seconds, $this->logger)); + } + /** * Limit the total memory usage of the current runtime * When the limit is reached, the consumer is stopped diff --git a/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php b/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php new file mode 100644 index 0000000..9070d8a --- /dev/null +++ b/src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php @@ -0,0 +1,77 @@ +delegate = $args[0]; + ++$index; + } + + $this->endTime = null; + $this->limit = $args[$index++]; + $this->logger = $args[$index] ?? null; + } + + /** + * {@inheritdoc} + */ + public function receive($message, ConsumerInterface $consumer): void + { + $this->endTime = null; + + $next = $this->delegate ?? $consumer; + $next->receive($message, $consumer); + } + + /** + * {@inheritdoc} + */ + public function receiveTimeout(ConsumerInterface $consumer): void + { + $next = $this->delegate ?? $consumer; + $next->receiveTimeout($consumer); + + if (null === $this->endTime) { + $this->endTime = $this->limit + time(); + } + + if ($this->endTime >= time()) { + return; + } + + $consumer->stop(); + + if (null !== $this->logger) { + $this->logger->info('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => $this->limit]); + } + } +} diff --git a/tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php b/tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php index f761fdb..4e49f6e 100644 --- a/tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php +++ b/tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php @@ -9,6 +9,7 @@ use Bdf\Queue\Consumer\Receiver\BenchReceiver; use Bdf\Queue\Consumer\Receiver\Binder\BinderReceiver; use Bdf\Queue\Consumer\Receiver\Binder\ClassNameBinder; +use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver; use Bdf\Queue\Consumer\Receiver\MemoryLimiterReceiver; use Bdf\Queue\Consumer\Receiver\MessageCountLimiterReceiver; use Bdf\Queue\Consumer\Receiver\MessageLoggerReceiver; @@ -573,6 +574,22 @@ public function test_expire() ); } + /** + * + */ + public function test_expire_when_empty() + { + $this->builder->expireWhenEmpty(10); + + $this->assertEquals( + new ReceiverPipeline([ + new LimitTimeWhenEmptyReceiver(10, new LoggerProxy(new NullLogger())), + new ProcessorReceiver(new JobHintProcessorResolver($this->container->get(InstantiatorInterface::class))), + ]), + $this->builder->build() + ); + } + /** * */ diff --git a/tests/Consumer/Receiver/LimitTimeWhenEmptyReceiverTest.php b/tests/Consumer/Receiver/LimitTimeWhenEmptyReceiverTest.php new file mode 100644 index 0000000..f8444ca --- /dev/null +++ b/tests/Consumer/Receiver/LimitTimeWhenEmptyReceiverTest.php @@ -0,0 +1,71 @@ +createMock(NextInterface::class); + $next->expects($this->never())->method('stop'); + + $extension = new LimitTimeWhenEmptyReceiver(2); + $extension->receiveTimeout($next); + $extension->receiveTimeout($next); + sleep(2); + $extension->receive('message', $next); + $extension->receiveTimeout($next); + } + + /** + * @group time-sensitive + */ + public function test_receiver_stops_when_time_limit_is_reached() + { + $next = $this->createMock(NextInterface::class); + $next->expects($this->once())->method('stop'); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => 1]); + + $extension = new LimitTimeWhenEmptyReceiver(1, $logger); + $extension->receiveTimeout($next); + sleep(2); + $extension->receiveTimeout($next); + } + + /** + * @group time-sensitive + */ + public function test_receiver_stops_when_time_limit_is_reached_legacy() + { + $decorated = $this->createMock(ReceiverInterface::class); + $decorated->expects($this->any())->method('receiveTimeout'); + + $next = $this->createMock(ConsumerInterface::class); + $next->expects($this->once())->method('stop'); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => 1]); + + $extension = new LimitTimeWhenEmptyReceiver($decorated, 1, $logger); + $extension->receiveTimeout($next); + sleep(2); + $extension->receiveTimeout($next); + } +}