Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ConsumerMessageInterface::getNums() to change the number of amqp consumer by dynamically. #4909

Merged
merged 5 commits into from Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Expand Up @@ -52,6 +52,7 @@ composer analyse
- [#4852](https://github.com/hyperf/hyperf/pull/4852) Added `NullDisableEventDispatcher` to disable event dispatcher by default.
- [#4866](https://github.com/hyperf/hyperf/pull/4866) [#4869](https://github.com/hyperf/hyperf/pull/4869) Added Annotation `Scene` which use scene in FormRequest easily.
- [#4908](https://github.com/hyperf/hyperf/pull/4908) Added `Db::beforeExecuting()` to register a hook which to be run just before a database query is executed.
- [#4909](https://github.com/hyperf/hyperf/pull/4909) Added `ConsumerMessageInterface::getNums()` to change the number of amqp consumer by dynamically.

## Optimized

Expand Down
2 changes: 1 addition & 1 deletion src/amqp/src/Annotation/Consumer.php
Expand Up @@ -22,7 +22,7 @@ public function __construct(
public string $routingKey = '',
public string $queue = '',
public string $name = 'Consumer',
public int $nums = 1,
public ?int $nums = null,
public ?bool $enable = null,
public int $maxConsumption = 0
) {
Expand Down
4 changes: 2 additions & 2 deletions src/amqp/src/ConsumerManager.php
Expand Up @@ -43,9 +43,9 @@ public function run()
! is_null($annotation->enable) && $instance->setEnable($annotation->enable);
property_exists($instance, 'container') && $instance->container = $this->container;
$annotation->maxConsumption && $instance->setMaxConsumption($annotation->maxConsumption);
$nums = $annotation->nums;
! is_null($annotation->nums) && $instance->setNums($annotation->nums);
$process = $this->createProcess($instance);
$process->nums = (int) $nums;
$process->nums = $instance->getNums();
$process->name = $annotation->name . '-' . $instance->getQueue();
ProcessManager::register($process);
}
Expand Down
32 changes: 18 additions & 14 deletions src/amqp/src/Message/ConsumerMessage.php
Expand Up @@ -35,20 +35,13 @@ abstract class ConsumerMessage extends Message implements ConsumerMessageInterfa
'global' => false,
];

/**
* @var bool
*/
protected $enable = true;
protected bool $enable = true;

/**
* @var int
*/
protected $maxConsumption = 0;
protected int $maxConsumption = 0;

/**
* @var float|int
*/
protected $waitTimeout = 0;
protected int|float $waitTimeout = 0;

protected int $nums = 1;

public function consumeMessage($data, AMQPMessage $message): string
{
Expand Down Expand Up @@ -121,17 +114,28 @@ public function setMaxConsumption(int $maxConsumption)
return $this;
}

public function getWaitTimeout()
public function getWaitTimeout(): int|float
{
return $this->waitTimeout;
}

public function setWaitTimeout($timeout)
public function setWaitTimeout(int|float $timeout)
{
$this->waitTimeout = $timeout;
return $this;
}

public function getNums(): int
{
return $this->nums;
}

public function setNums(int $nums)
{
$this->nums = $nums;
return $this;
}

protected function reply($data, AMQPMessage $message)
{
$packer = ApplicationContext::getContainer()->get(Packer::class);
Expand Down
16 changes: 7 additions & 9 deletions src/amqp/src/Message/ConsumerMessageInterface.php
Expand Up @@ -38,13 +38,11 @@ public function getMaxConsumption(): int;

public function setMaxConsumption(int $maxConsumption);

/**
* @return float|int
*/
public function getWaitTimeout();

/**
* @param float|int $timeout
*/
public function setWaitTimeout($timeout);
public function getWaitTimeout(): int|float;

public function setWaitTimeout(int|float $timeout);

public function setNums(int $nums);

public function getNums(): int;
}
28 changes: 28 additions & 0 deletions src/amqp/tests/ConsumerManagerTest.php
Expand Up @@ -19,6 +19,7 @@
use Hyperf\Process\ProcessManager;
use HyperfTest\Amqp\Stub\ContainerStub;
use HyperfTest\Amqp\Stub\DemoConsumer;
use HyperfTest\Amqp\Stub\NumsConsumer;
use PHPUnit\Framework\TestCase;

/**
Expand Down Expand Up @@ -95,4 +96,31 @@ public function testConsumerAnnotationNotEnable()

$this->assertTrue($hasRegistered);
}

public function testConsumerGetNums()
{
$container = ContainerStub::getContainer();

AnnotationCollector::collectClass(NumsConsumer::class, Consumer::class, new Consumer(
exchange: uniqid(),
routingKey: uniqid(),
queue: uniqid(),
nums: rand(1, 10),
));

$manager = new ConsumerManager($container);
$manager->run();

$hasRegistered = false;
/** @var AbstractProcess $item */
foreach (ProcessManager::all() as $item) {
if (method_exists($item, 'getConsumerMessage')) {
$hasRegistered = true;
$this->assertSame($item->getConsumerMessage()->getNums(), $item->nums);
break;
}
}

$this->assertTrue($hasRegistered);
}
}
37 changes: 37 additions & 0 deletions src/amqp/tests/Stub/NumsConsumer.php
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Amqp\Stub;

use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;

class NumsConsumer extends ConsumerMessage
{
protected string $exchange = 'hyperf';

protected array|string $routingKey = [
'hyperf1',
'hyperf2',
];

protected ?string $queue = 'hyperf';

public function consume($data): string
{
return Result::ACK;
}

public function getNums(): int
{
return 10;
}
}