Skip to content

Commit

Permalink
Merge pull request #418 from limingxinleo/ws-server
Browse files Browse the repository at this point in the history
Allows send WebSocket message to any fd in current server, even the worker process does  not hold the fd
  • Loading branch information
limingxinleo committed Aug 19, 2019
2 parents fe6e59c + c79912b commit 3f1d518
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -3,6 +3,7 @@
## Added

- [#402](https://github.com/hyperf-cloud/hyperf/pull/402) Added Annotation AsyncQueueMessage.
- [#418](https://github.com/hyperf-cloud/hyperf/pull/418) Allows send WebSocket message to any fd in current server, even the worker process does not hold the fd

## Deleted

Expand Down
2 changes: 2 additions & 0 deletions src/server/src/ConfigProvider.php
Expand Up @@ -13,13 +13,15 @@
namespace Hyperf\Server;

use Hyperf\Server\Listener\InitProcessTitleListener;
use Swoole\Server as SwooleServer;

class ConfigProvider
{
public function __invoke(): array
{
return [
'dependencies' => [
SwooleServer::class => SwooleServerFactory::class,
],
'commands' => [
],
Expand Down
25 changes: 25 additions & 0 deletions src/server/src/SwooleServerFactory.php
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\Server;

use Psr\Container\ContainerInterface;

class SwooleServerFactory
{
public function __invoke(ContainerInterface $container)
{
$factory = $container->get(ServerFactory::class);

return $factory->getServer()->getServer();
}
}
4 changes: 3 additions & 1 deletion src/websocket-server/src/ConfigProvider.php
Expand Up @@ -19,7 +19,9 @@ public function __invoke(): array
return [
'dependencies' => [
],
'commands' => [
'listeners' => [
Listener\InitSenderListener::class,
Listener\OnPipeMessageListener::class,
],
'scan' => [
'paths' => [
Expand Down
19 changes: 19 additions & 0 deletions src/websocket-server/src/Exception/InvalidMethodException.php
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\WebSocketServer\Exception;

use Hyperf\Server\Exception\ServerException;

class InvalidMethodException extends ServerException
{
}
Expand Up @@ -12,43 +12,38 @@

namespace Hyperf\WebSocketServer\Listener;

use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeMainServerStart;
use Hyperf\WebSocketServer\Server;
use Hyperf\Framework\Event\AfterWorkerStart;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerInterface;

/**
* @Listener
*/
class InitWebSocketServerListener implements ListenerInterface
class InitSenderListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
protected $container;
private $container;

public function __construct(ContainerInterface $container)
{
$this->container = $container;
}

/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
BeforeMainServerStart::class,
AfterWorkerStart::class,
];
}

/**
* @param BeforeMainServerStart $event
*/
public function process(object $event)
{
if (! $this->container->has(Server::class)) {
return;
if ($this->container->has(Sender::class)) {
$sender = $this->container->get(Sender::class);
$sender->setWorkerId($event->workerId);
}
$server = $this->container->get(Server::class);
$event->server instanceof \Swoole\WebSocket\Server && $server->setServer($event->server);
}
}
75 changes: 75 additions & 0 deletions src/websocket-server/src/Listener/OnPipeMessageListener.php
@@ -0,0 +1,75 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\WebSocketServer\Listener;

use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Hyperf\Framework\Event\OnPipeMessage;
use Hyperf\WebSocketServer\Sender;
use Hyperf\WebSocketServer\SenderPipeMessage;
use Psr\Container\ContainerInterface;

class OnPipeMessageListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
private $container;

/**
* @var StdoutLoggerInterface
*/
private $logger;

/**
* @var Sender
*/
private $sender;

public function __construct(ContainerInterface $container, StdoutLoggerInterface $logger, Sender $sender)
{
$this->container = $container;
$this->logger = $logger;
$this->sender = $sender;
}

/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
OnPipeMessage::class,
];
}

/**
* Handle the Event when the event is triggered, all listeners will
* complete before the event is returned to the EventDispatcher.
*/
public function process(object $event)
{
if ($event instanceof OnPipeMessage && $event->data instanceof SenderPipeMessage) {
/** @var SenderPipeMessage $message */
$message = $event->data;

try {
$this->sender->proxy($message->name, $message->arguments);
} catch (\Throwable $exception) {
$formatter = $this->container->get(FormatterInterface::class);
$this->logger->warning($formatter->format($exception));
}
}
}
}
107 changes: 107 additions & 0 deletions src/websocket-server/src/Sender.php
@@ -0,0 +1,107 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\WebSocketServer;

use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\WebSocketServer\Exception\InvalidMethodException;
use Psr\Container\ContainerInterface;
use Swoole\Server;

/**
* @method push(int $fd, $data, int $opcode = null, $finish = null)
*/
class Sender
{
/**
* @var ContainerInterface
*/
protected $container;

/**
* @var StdoutLoggerInterface
*/
protected $logger;

/**
* @var int
*/
protected $workerId;

public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->logger = $container->get(StdoutLoggerInterface::class);
}

public function __call($name, $arguments)
{
if (! $this->proxy($name, $arguments)) {
$this->sendPipeMessage($name, $arguments);
}
}

public function proxy(string $name, array $arguments): bool
{
$fd = $this->getFdFromProxyMethod($name, $arguments);

$result = $this->check($fd);
if ($result) {
$this->getServer()->push(...$arguments);
$this->logger->debug("[WebSocket] Worker.{$this->workerId} send to #{$fd}");
}

return $result;
}

public function setWorkerId(int $workerId): void
{
$this->workerId = $workerId;
}

public function check($fd): bool
{
$info = $this->getServer()->connection_info($fd);

if ($info && $info['websocket_status'] === WEBSOCKET_STATUS_ACTIVE) {
return true;
}

return false;
}

protected function getFdFromProxyMethod(string $method, array $arguments): int
{
if (! in_array($method, ['push', 'send', 'sendto'])) {
throw new InvalidMethodException(sprintf('Method [%s] is not allowed.', $method));
}

return (int) $arguments[0];
}

protected function getServer(): Server
{
return $this->container->get(Server::class);
}

protected function sendPipeMessage(string $name, array $arguments): void
{
$server = $this->getServer();
$workerCount = $server->setting['worker_num'] - 1;
for ($workerId = 0; $workerId <= $workerCount; ++$workerId) {
if ($workerId !== $this->workerId) {
$server->sendMessage(new SenderPipeMessage($name, $arguments), $workerId);
$this->logger->debug("[WebSocket] Let Worker.{$workerId} try to {$name}.");
}
}
}
}
32 changes: 32 additions & 0 deletions src/websocket-server/src/SenderPipeMessage.php
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\WebSocketServer;

class SenderPipeMessage
{
/**
* @var string
*/
public $name;

/**
* @var array
*/
public $arguments;

public function __construct(string $name, array $arguments)
{
$this->name = $name;
$this->arguments = $arguments;
}
}

0 comments on commit 3f1d518

Please sign in to comment.