Skip to content

Commit

Permalink
Added ConsumerMessageInterface::getNums() to change the number of a…
Browse files Browse the repository at this point in the history
…mqp consumer by dynamically. (#4909)


Co-authored-by: 李铭昕 <715557344@qq.com>
  • Loading branch information
her-cat and limingxinleo committed Jul 8, 2022
1 parent 13ab808 commit 8b67301
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Expand Up @@ -52,6 +52,7 @@ composer analyse
- [#4852](https://github.com/hyperf/hyperf/pull/4852) Added `NullDisableEventDispatcher` to disable event dispatcher by default.
- [#4866](https://github.com/hyperf/hyperf/pull/4866) [#4869](https://github.com/hyperf/hyperf/pull/4869) Added Annotation `Scene` which use scene in FormRequest easily.
- [#4908](https://github.com/hyperf/hyperf/pull/4908) Added `Db::beforeExecuting()` to register a hook which to be run just before a database query is executed.
- [#4909](https://github.com/hyperf/hyperf/pull/4909) Added `ConsumerMessageInterface::getNums()` to change the number of amqp consumer by dynamically.

## Optimized

Expand Down
2 changes: 1 addition & 1 deletion src/amqp/src/Annotation/Consumer.php
Expand Up @@ -22,7 +22,7 @@ public function __construct(
public string $routingKey = '',
public string $queue = '',
public string $name = 'Consumer',
public int $nums = 1,
public ?int $nums = null,
public ?bool $enable = null,
public int $maxConsumption = 0
) {
Expand Down
4 changes: 2 additions & 2 deletions src/amqp/src/ConsumerManager.php
Expand Up @@ -43,9 +43,9 @@ public function run()
! is_null($annotation->enable) && $instance->setEnable($annotation->enable);
property_exists($instance, 'container') && $instance->container = $this->container;
$annotation->maxConsumption && $instance->setMaxConsumption($annotation->maxConsumption);
$nums = $annotation->nums;
! is_null($annotation->nums) && $instance->setNums($annotation->nums);
$process = $this->createProcess($instance);
$process->nums = (int) $nums;
$process->nums = $instance->getNums();
$process->name = $annotation->name . '-' . $instance->getQueue();
ProcessManager::register($process);
}
Expand Down
32 changes: 18 additions & 14 deletions src/amqp/src/Message/ConsumerMessage.php
Expand Up @@ -35,20 +35,13 @@ abstract class ConsumerMessage extends Message implements ConsumerMessageInterfa
'global' => false,
];

/**
* @var bool
*/
protected $enable = true;
protected bool $enable = true;

/**
* @var int
*/
protected $maxConsumption = 0;
protected int $maxConsumption = 0;

/**
* @var float|int
*/
protected $waitTimeout = 0;
protected int|float $waitTimeout = 0;

protected int $nums = 1;

public function consumeMessage($data, AMQPMessage $message): string
{
Expand Down Expand Up @@ -121,17 +114,28 @@ public function setMaxConsumption(int $maxConsumption)
return $this;
}

public function getWaitTimeout()
public function getWaitTimeout(): int|float
{
return $this->waitTimeout;
}

public function setWaitTimeout($timeout)
public function setWaitTimeout(int|float $timeout)
{
$this->waitTimeout = $timeout;
return $this;
}

public function getNums(): int
{
return $this->nums;
}

public function setNums(int $nums)
{
$this->nums = $nums;
return $this;
}

protected function reply($data, AMQPMessage $message)
{
$packer = ApplicationContext::getContainer()->get(Packer::class);
Expand Down
16 changes: 7 additions & 9 deletions src/amqp/src/Message/ConsumerMessageInterface.php
Expand Up @@ -38,13 +38,11 @@ public function getMaxConsumption(): int;

public function setMaxConsumption(int $maxConsumption);

/**
* @return float|int
*/
public function getWaitTimeout();

/**
* @param float|int $timeout
*/
public function setWaitTimeout($timeout);
public function getWaitTimeout(): int|float;

public function setWaitTimeout(int|float $timeout);

public function setNums(int $nums);

public function getNums(): int;
}
28 changes: 28 additions & 0 deletions src/amqp/tests/ConsumerManagerTest.php
Expand Up @@ -19,6 +19,7 @@
use Hyperf\Process\ProcessManager;
use HyperfTest\Amqp\Stub\ContainerStub;
use HyperfTest\Amqp\Stub\DemoConsumer;
use HyperfTest\Amqp\Stub\NumsConsumer;
use PHPUnit\Framework\TestCase;

/**
Expand Down Expand Up @@ -95,4 +96,31 @@ public function testConsumerAnnotationNotEnable()

$this->assertTrue($hasRegistered);
}

public function testConsumerGetNums()
{
$container = ContainerStub::getContainer();

AnnotationCollector::collectClass(NumsConsumer::class, Consumer::class, new Consumer(
exchange: uniqid(),
routingKey: uniqid(),
queue: uniqid(),
nums: rand(1, 10),
));

$manager = new ConsumerManager($container);
$manager->run();

$hasRegistered = false;
/** @var AbstractProcess $item */
foreach (ProcessManager::all() as $item) {
if (method_exists($item, 'getConsumerMessage')) {
$hasRegistered = true;
$this->assertSame($item->getConsumerMessage()->getNums(), $item->nums);
break;
}
}

$this->assertTrue($hasRegistered);
}
}
37 changes: 37 additions & 0 deletions src/amqp/tests/Stub/NumsConsumer.php
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Amqp\Stub;

use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;

class NumsConsumer extends ConsumerMessage
{
protected string $exchange = 'hyperf';

protected array|string $routingKey = [
'hyperf1',
'hyperf2',
];

protected ?string $queue = 'hyperf';

public function consume($data): string
{
return Result::ACK;
}

public function getNums(): int
{
return 10;
}
}

0 comments on commit 8b67301

Please sign in to comment.