Skip to content

Commit

Permalink
Added afterRefreshed() to register a hook which to be run after ref…
Browse files Browse the repository at this point in the history
…resh nodes. (#4918)
  • Loading branch information
limingxinleo committed Jul 12, 2022
1 parent 52bcee0 commit 1f709b7
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Expand Up @@ -53,6 +53,7 @@ composer analyse
- [#4866](https://github.com/hyperf/hyperf/pull/4866) [#4869](https://github.com/hyperf/hyperf/pull/4869) Added Annotation `Scene` which use scene in FormRequest easily.
- [#4908](https://github.com/hyperf/hyperf/pull/4908) Added `Db::beforeExecuting()` to register a hook which to be run just before a database query is executed.
- [#4909](https://github.com/hyperf/hyperf/pull/4909) Added `ConsumerMessageInterface::getNums()` to change the number of amqp consumer by dynamically.
- [#4918](https://github.com/hyperf/hyperf/pull/4918) Added `LoadBalancerInterface::afterRefreshed()` to register a hook which to be run after refresh nodes.

## Optimized

Expand Down
4 changes: 3 additions & 1 deletion src/load-balancer/composer.json
Expand Up @@ -18,7 +18,9 @@
"require": {
"php": ">=8.0",
"hyperf/coordinator": "~3.0.0",
"markrogoyski/math-php": "^2.0"
"hyperf/utils": "~3.0.0",
"markrogoyski/math-php": "^2.0",
"psr/log": "^1.0|^2.0|^3.0"
},
"autoload": {
"psr-4": {
Expand Down
58 changes: 47 additions & 11 deletions src/load-balancer/src/AbstractLoadBalancer.php
Expand Up @@ -11,15 +11,25 @@
*/
namespace Hyperf\LoadBalancer;

use Closure;
use Hyperf\Coordinator\Constants;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Utils\Coroutine;
use Psr\Log\LoggerInterface;

abstract class AbstractLoadBalancer implements LoadBalancerInterface
{
/**
* @var array<string, ?Closure>
*/
protected array $afterRefreshCallbacks = [];

protected bool $autoRefresh = false;

/**
* @param Node[] $nodes
*/
public function __construct(protected array $nodes = [])
public function __construct(protected array $nodes = [], protected ?LoggerInterface $logger = null)
{
}

Expand Down Expand Up @@ -53,17 +63,43 @@ public function removeNode(Node $node): bool

public function refresh(callable $callback, int $tickMs = 5000): void
{
while (true) {
try {
$exited = CoordinatorManager::until(Constants::WORKER_EXIT)->yield($tickMs / 1000);
if ($exited) {
break;
}
$this->autoRefresh = true;
Coroutine::create(function () use ($callback, $tickMs) {
while (true) {
try {
$exited = CoordinatorManager::until(Constants::WORKER_EXIT)->yield($tickMs / 1000);
if ($exited) {
break;
}

$nodes = call($callback);
is_array($nodes) && $this->setNodes($nodes);
} catch (\Throwable) {
$origin = $this->getNodes();
$nodes = call($callback);
if (is_array($nodes)) {
$this->setNodes($nodes);
foreach ($this->afterRefreshCallbacks as $refreshCallback) {
! is_null($refreshCallback) && $refreshCallback($origin, $nodes);
}
}
} catch (\Throwable $exception) {
$this->logger?->error((string) $exception);
}
}
}
$this->autoRefresh = false;
});
}

public function isAutoRefresh(): bool
{
return $this->autoRefresh;
}

public function afterRefreshed(string $key, ?Closure $callback): void
{
$this->afterRefreshCallbacks[$key] = $callback;
}

public function clearAfterRefreshedCallbacks(): void
{
$this->afterRefreshCallbacks = [];
}
}
8 changes: 8 additions & 0 deletions src/load-balancer/src/LoadBalancerInterface.php
Expand Up @@ -11,6 +11,8 @@
*/
namespace Hyperf\LoadBalancer;

use Closure;

interface LoadBalancerInterface
{
/**
Expand All @@ -34,4 +36,10 @@ public function getNodes(): array;
public function removeNode(Node $node): bool;

public function refresh(callable $callback, int $tickMs = 5000): void;

public function isAutoRefresh(): bool;

public function afterRefreshed(string $key, ?Closure $callback): void;

public function clearAfterRefreshedCallbacks(): void;
}
34 changes: 34 additions & 0 deletions src/load-balancer/tests/RandomTest.php
Expand Up @@ -11,6 +11,9 @@
*/
namespace HyperfTest\LoadBalancer;

use Hyperf\Coordinator\Constants;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Engine\Channel;
use Hyperf\LoadBalancer\Node;
use Hyperf\LoadBalancer\Random;
use PHPUnit\Framework\TestCase;
Expand All @@ -33,4 +36,35 @@ public function testRandom()
$this->assertTrue(in_array($node, $nodes));
$this->assertSame($nodes, $random->getNodes());
}

public function testAfterRefreshed()
{
$nodes = [
new Node('127.0.0.1', 80),
new Node('127.0.0.2', 81),
new Node('127.0.0.3', 82),
];
$random = new Random($nodes);
$this->assertFalse($random->isAutoRefresh());
$random->refresh(static function () {
return [
new Node('127.0.0.1', 80),
new Node('127.0.0.4', 81),
new Node('127.0.0.3', 81),
];
}, 200);
$chan = new Channel(1);
$random->afterRefreshed('test', function ($old, $new) use ($chan) {
$this->assertSame('127.0.0.1', $old[0]->host);
$this->assertSame('127.0.0.4', $new[1]->host);
$chan->push(true);
});

$chan->pop(-1);

$this->assertTrue($random->isAutoRefresh());
CoordinatorManager::until(Constants::WORKER_EXIT)->resume();
CoordinatorManager::clear(Constants::WORKER_EXIT);
$random->clearAfterRefreshedCallbacks();
}
}

0 comments on commit 1f709b7

Please sign in to comment.