Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Console/Command/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ protected function configure(): void
->addOption('save', null, InputOption::VALUE_NONE, 'Save failed job.')
->addOption('max', null, InputOption::VALUE_REQUIRED, 'The max number of jobs.')
->addOption('expire', null, InputOption::VALUE_REQUIRED, 'The worker duration in seconds.')
->addOption('expireWhenEmpty', null, InputOption::VALUE_REQUIRED, 'Stop the worker when the queues are empty for an amount of time in seconds.')
->addOption('stopWhenEmpty', null, InputOption::VALUE_NONE, 'Stop the worker if the queues are empty.')
->addOption('stopOnError', null, InputOption::VALUE_NONE, 'Stop the worker if error occurs.')
->addOption('logger', null, InputOption::VALUE_REQUIRED, 'The logger to use "stdout", "null", or "default".', "default")
Expand Down Expand Up @@ -159,6 +160,10 @@ protected function createExtension(InputInterface $input, OutputInterface $outpu
$builder->expire((int)$input->getOption('expire'));
}

if ($input->getOption('expireWhenEmpty')) {
$builder->expireWhenEmpty((int)$input->getOption('expireWhenEmpty'));
}

$memory = $this->convertToBytes($input->getOption('memory'));
if ($memory > 0) {
$builder->memory($memory);
Expand Down
19 changes: 17 additions & 2 deletions src/Consumer/Receiver/Builder/ReceiverBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Bdf\Queue\Consumer\Receiver\Binder\BinderInterface;
use Bdf\Queue\Consumer\Receiver\Binder\BinderReceiver;
use Bdf\Queue\Consumer\Receiver\Binder\ClassNameBinder;
use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver;
use Bdf\Queue\Consumer\Receiver\MemoryLimiterReceiver;
use Bdf\Queue\Consumer\Receiver\MessageCountLimiterReceiver;
use Bdf\Queue\Consumer\Receiver\MessageLoggerReceiver;
Expand Down Expand Up @@ -219,17 +220,31 @@ public function max(int $number): ReceiverBuilder
* Limit the number of received message
* When the limit is reached, the consumer is stopped
*
* @param int $seconds Number of messages
* @param int $seconds Time in seconds
*
* @return $this
*
* @see MessageCountLimiterReceiver
* @see TimeLimiterReceiver
*/
public function expire(int $seconds): ReceiverBuilder
{
return $this->add(new TimeLimiterReceiver($seconds, $this->logger));
}

/**
* Stops consumption when the queues are empty for an amount of time
*
* @param int $seconds Time in seconds
*
* @return $this
*
* @see LimitTimeWhenEmptyReceiver
*/
public function expireWhenEmpty(int $seconds): ReceiverBuilder
{
return $this->add(new LimitTimeWhenEmptyReceiver($seconds, $this->logger));
}
Comment thread
Johnmeurt marked this conversation as resolved.

/**
* Limit the total memory usage of the current runtime
* When the limit is reached, the consumer is stopped
Expand Down
77 changes: 77 additions & 0 deletions src/Consumer/Receiver/LimitTimeWhenEmptyReceiver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

namespace Bdf\Queue\Consumer\Receiver;

use Bdf\Queue\Consumer\ConsumerInterface;
use Bdf\Queue\Consumer\DelegateHelper;
use Bdf\Queue\Consumer\ReceiverInterface;
use Psr\Log\LoggerInterface;

/**
*
*/
class LimitTimeWhenEmptyReceiver implements ReceiverInterface
{
use DelegateHelper;

private int $limit;
private ?int $endTime;
private ?LoggerInterface $logger;

/**
* TimeLimiterMiddlewareReceiver constructor.
*
* @param ReceiverInterface $delegate
* @param int $limit Time limit in second
* @param LoggerInterface|null $logger
*/
public function __construct(/*int $limit, LoggerInterface $logger = null*/)
{
$args = func_get_args();
$index = 0;

if ($args[0] instanceof ReceiverInterface) {
@trigger_error('Passing delegate in constructor of receiver is deprecated since 1.4', E_USER_DEPRECATED);
$this->delegate = $args[0];
++$index;
}

$this->endTime = null;
$this->limit = $args[$index++];
$this->logger = $args[$index] ?? null;
}

/**
* {@inheritdoc}
*/
public function receive($message, ConsumerInterface $consumer): void
{
$this->endTime = null;

$next = $this->delegate ?? $consumer;
$next->receive($message, $consumer);
}

/**
* {@inheritdoc}
*/
public function receiveTimeout(ConsumerInterface $consumer): void
{
$next = $this->delegate ?? $consumer;
$next->receiveTimeout($consumer);

if (null === $this->endTime) {
$this->endTime = $this->limit + time();
}

if ($this->endTime >= time()) {
return;
}

$consumer->stop();

if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => $this->limit]);
}
}
}
17 changes: 17 additions & 0 deletions tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Bdf\Queue\Consumer\Receiver\BenchReceiver;
use Bdf\Queue\Consumer\Receiver\Binder\BinderReceiver;
use Bdf\Queue\Consumer\Receiver\Binder\ClassNameBinder;
use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver;
use Bdf\Queue\Consumer\Receiver\MemoryLimiterReceiver;
use Bdf\Queue\Consumer\Receiver\MessageCountLimiterReceiver;
use Bdf\Queue\Consumer\Receiver\MessageLoggerReceiver;
Expand Down Expand Up @@ -573,6 +574,22 @@ public function test_expire()
);
}

/**
*
*/
public function test_expire_when_empty()
{
$this->builder->expireWhenEmpty(10);

$this->assertEquals(
new ReceiverPipeline([
new LimitTimeWhenEmptyReceiver(10, new LoggerProxy(new NullLogger())),
new ProcessorReceiver(new JobHintProcessorResolver($this->container->get(InstantiatorInterface::class))),
]),
$this->builder->build()
);
}

/**
*
*/
Expand Down
71 changes: 71 additions & 0 deletions tests/Consumer/Receiver/LimitTimeWhenEmptyReceiverTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

namespace Bdf\Queue\Consumer\Receiver\Tests;

use Bdf\Queue\Consumer\ConsumerInterface;
use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver;
use Bdf\Queue\Consumer\Receiver\NextInterface;
use Bdf\Queue\Consumer\ReceiverInterface;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;

/**
*
*/
class LimitTimeWhenEmptyReceiverTest extends TestCase
{
/**
* @group time-sensitive
*/
public function test_receiver_never_stop_stops()
{
$next = $this->createMock(NextInterface::class);
$next->expects($this->never())->method('stop');

$extension = new LimitTimeWhenEmptyReceiver(2);
$extension->receiveTimeout($next);
$extension->receiveTimeout($next);
sleep(2);
$extension->receive('message', $next);
$extension->receiveTimeout($next);
}

/**
* @group time-sensitive
*/
public function test_receiver_stops_when_time_limit_is_reached()
{
$next = $this->createMock(NextInterface::class);
$next->expects($this->once())->method('stop');

$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => 1]);

$extension = new LimitTimeWhenEmptyReceiver(1, $logger);
$extension->receiveTimeout($next);
sleep(2);
$extension->receiveTimeout($next);
}

/**
* @group time-sensitive
*/
public function test_receiver_stops_when_time_limit_is_reached_legacy()
{
$decorated = $this->createMock(ReceiverInterface::class);
$decorated->expects($this->any())->method('receiveTimeout');

$next = $this->createMock(ConsumerInterface::class);
$next->expects($this->once())->method('stop');

$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => 1]);

$extension = new LimitTimeWhenEmptyReceiver($decorated, 1, $logger);
$extension->receiveTimeout($next);
sleep(2);
$extension->receiveTimeout($next);
}
}