Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allows send WebSocket message to any fd in current server, even the worker process does not hold the fd #418

Merged
merged 12 commits into from Aug 19, 2019
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -3,6 +3,7 @@
## Added

- [#402](https://github.com/hyperf-cloud/hyperf/pull/402) Added Annotation AsyncQueueMessage.
- [#418](https://github.com/hyperf-cloud/hyperf/pull/418) Added `Sender` to fix fd not found when use method `server::send`.

## Deleted

Expand Down
2 changes: 2 additions & 0 deletions src/server/src/ConfigProvider.php
Expand Up @@ -13,13 +13,15 @@
namespace Hyperf\Server;

use Hyperf\Server\Listener\InitProcessTitleListener;
use Swoole\Server as SwooleServer;

class ConfigProvider
{
public function __invoke(): array
{
return [
'dependencies' => [
SwooleServer::class => SwooleServerFactory::class,
],
'commands' => [
],
Expand Down
25 changes: 25 additions & 0 deletions src/server/src/SwooleServerFactory.php
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\Server;

use Psr\Container\ContainerInterface;

class SwooleServerFactory
{
public function __invoke(ContainerInterface $container)
{
$factory = $container->get(ServerFactory::class);

return $factory->getServer()->getServer();
}
}
4 changes: 3 additions & 1 deletion src/websocket-server/src/ConfigProvider.php
Expand Up @@ -19,7 +19,9 @@ public function __invoke(): array
return [
'dependencies' => [
],
'commands' => [
'listeners' => [
Listener\InitSenderListener::class,
Listener\OnPipeMessageListener::class,
],
'scan' => [
'paths' => [
Expand Down
19 changes: 19 additions & 0 deletions src/websocket-server/src/Exception/InvalidMethodException.php
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\WebSocketServer\Exception;

use Hyperf\Server\Exception\ServerException;

class InvalidMethodException extends ServerException
{
}
Expand Up @@ -12,43 +12,38 @@

namespace Hyperf\WebSocketServer\Listener;

use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeMainServerStart;
use Hyperf\WebSocketServer\Server;
use Hyperf\Framework\Event\AfterWorkerStart;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerInterface;

/**
* @Listener
*/
class InitWebSocketServerListener implements ListenerInterface
class InitSenderListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
protected $container;
private $container;

public function __construct(ContainerInterface $container)
{
$this->container = $container;
}

/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
BeforeMainServerStart::class,
AfterWorkerStart::class,
];
}

/**
* @param BeforeMainServerStart $event
*/
public function process(object $event)
{
if (! $this->container->has(Server::class)) {
return;
if ($this->container->has(Sender::class)) {
$sender = $this->container->get(Sender::class);
$sender->setWorkerId($event->workerId);
}
$server = $this->container->get(Server::class);
$event->server instanceof \Swoole\WebSocket\Server && $server->setServer($event->server);
}
}
75 changes: 75 additions & 0 deletions src/websocket-server/src/Listener/OnPipeMessageListener.php
@@ -0,0 +1,75 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\WebSocketServer\Listener;

use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Hyperf\Framework\Event\OnPipeMessage;
use Hyperf\WebSocketServer\Sender;
use Hyperf\WebSocketServer\SenderPipeMessage;
use Psr\Container\ContainerInterface;

class OnPipeMessageListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
private $container;

/**
* @var StdoutLoggerInterface
*/
private $logger;

/**
* @var Sender
*/
private $sender;

public function __construct(ContainerInterface $container, StdoutLoggerInterface $logger, Sender $sender)
{
$this->container = $container;
$this->logger = $logger;
$this->sender = $sender;
}

/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
OnPipeMessage::class,
];
}

/**
* Handle the Event when the event is triggered, all listeners will
* complete before the event is returned to the EventDispatcher.
*/
public function process(object $event)
{
if ($event instanceof OnPipeMessage && $event->data instanceof SenderPipeMessage) {
/** @var SenderPipeMessage $message */
$message = $event->data;

try {
$this->sender->proxy($message->name, $message->arguments);
} catch (\Throwable $exception) {
$formatter = $this->container->get(FormatterInterface::class);
$this->logger->warning($formatter->format($exception));
}
}
}
}
107 changes: 107 additions & 0 deletions src/websocket-server/src/Sender.php
@@ -0,0 +1,107 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\WebSocketServer;

use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\WebSocketServer\Exception\InvalidMethodException;
use Psr\Container\ContainerInterface;
use Swoole\Server;

/**
* @method push(int $fd, $data, int $opcode = null, $finish = null)
*/
class Sender
{
/**
* @var ContainerInterface
*/
protected $container;

/**
* @var StdoutLoggerInterface
*/
protected $logger;

/**
* @var int
*/
protected $workerId;

public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->logger = $container->get(StdoutLoggerInterface::class);
}

public function __call($name, $arguments)
{
if (! $this->proxy($name, $arguments)) {
$this->sendPipeMessage($name, $arguments);
}
}

public function proxy(string $name, array $arguments): bool
{
$fd = $this->getFdFromProxyMethod($name, $arguments);
huangzhhui marked this conversation as resolved.
Show resolved Hide resolved

$result = $this->check($fd);
if ($result) {
$this->getServer()->push(...$arguments);
$this->logger->debug("[WebSocket] Worker.{$this->workerId} send to #{$fd}");
}

return $result;
}

public function setWorkerId(int $workerId): void
{
$this->workerId = $workerId;
}

public function check($fd): bool
{
$info = $this->getServer()->connection_info($fd);

if ($info && $info['websocket_status'] === WEBSOCKET_STATUS_ACTIVE) {
return true;
}

return false;
}

protected function getFdFromProxyMethod(string $method, array $arguments): int
{
if (! in_array($method, ['push', 'send', 'sendto'])) {
throw new InvalidMethodException(sprintf('Method [%s] is not allowed.', $method));
}

return (int) $arguments[0];
}

protected function getServer(): Server
{
return $this->container->get(Server::class);
}

protected function sendPipeMessage(string $name, array $arguments): void
{
$server = $this->getServer();
$workerCount = $server->setting['worker_num'] - 1;
for ($workerId = 0; $workerId <= $workerCount; ++$workerId) {
if ($workerId !== $this->workerId) {
$server->sendMessage(new SenderPipeMessage($name, $arguments), $workerId);
huangzhhui marked this conversation as resolved.
Show resolved Hide resolved
$this->logger->debug("[WebSocket] Let Worker.{$workerId} try to {$name}.");
}
}
}
}
32 changes: 32 additions & 0 deletions src/websocket-server/src/SenderPipeMessage.php
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\WebSocketServer;

class SenderPipeMessage
{
/**
* @var string
*/
public $name;

/**
* @var array
*/
public $arguments;

public function __construct(string $name, array $arguments)
{
$this->name = $name;
$this->arguments = $arguments;
}
}
12 changes: 4 additions & 8 deletions src/websocket-server/src/Server.php
Expand Up @@ -24,6 +24,7 @@
use Hyperf\HttpMessage\Server\Request as Psr7Request;
use Hyperf\HttpMessage\Server\Response as Psr7Response;
use Hyperf\HttpServer\MiddlewareManager;
use Hyperf\Server\ServerFactory;
use Hyperf\Utils\Context;
use Hyperf\WebSocketServer\Collector\FdCollector;
use Hyperf\WebSocketServer\Exception\Handler\WebSocketExceptionHandler;
Expand Down Expand Up @@ -64,11 +65,6 @@ class Server implements MiddlewareInitializerInterface, OnHandShakeInterface, On
*/
protected $logger;

/**
* @var WebSocketServer
*/
protected $server;

/**
* @var array
*/
Expand Down Expand Up @@ -98,9 +94,9 @@ public function initCoreMiddleware(string $serverName): void
]);
}

public function setServer(WebSocketServer $server): void
public function getServer(): WebSocketServer
{
$this->server = $server;
return $this->container->get(ServerFactory::class)->getServer()->getServer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里应该改成获取 Swoole\Server

}

public function onHandShake(SwooleRequest $request, SwooleResponse $response): void
Expand Down Expand Up @@ -130,7 +126,7 @@ public function onHandShake(SwooleRequest $request, SwooleResponse $response): v
defer(function () use ($request, $class) {
$instance = $this->container->get($class);
if ($instance instanceof OnOpenInterface) {
$instance->onOpen($this->server, $request);
$instance->onOpen($this->getServer(), $request);
}
});
}
Expand Down