diff --git a/composer.json b/composer.json index a7b0efd..bf09b60 100644 --- a/composer.json +++ b/composer.json @@ -38,7 +38,7 @@ "clue/redis-react": "^2.5", "contributte/console": "^0.9", "fastybird/datetime-factory": "^0.2", - "fastybird/exchange-plugin": "^0.3.1", + "fastybird/exchange-plugin": "^0.4", "fastybird/modules-metadata": "^0.7", "fastybird/socket-server-factory": "^0.1", "nette/bootstrap": "^3.0", diff --git a/redisdb_exchange_plugin/__init__.py b/redisdb_exchange_plugin/__init__.py index 4e5fe3c..6242af1 100644 --- a/redisdb_exchange_plugin/__init__.py +++ b/redisdb_exchange_plugin/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. """ -Redis DB exchange +Redis DB exchange plugin """ -__version__ = "0.1.5" +__version__ = "0.2.0" diff --git a/src/Client/AsyncClient.php b/src/Client/AsyncClient.php index 88b8b1d..358ba3f 100644 --- a/src/Client/AsyncClient.php +++ b/src/Client/AsyncClient.php @@ -15,14 +15,12 @@ namespace FastyBird\RedisDbExchangePlugin\Client; -use Closure; use Clue\Redis\Protocol as RedisProtocol; -use FastyBird\ModulesMetadata\Types as ModulesMetadataTypes; use FastyBird\RedisDbExchangePlugin\Connections; -use FastyBird\RedisDbExchangePlugin\Consumer; +use FastyBird\RedisDbExchangePlugin\Events; use FastyBird\RedisDbExchangePlugin\Exceptions; use Nette; -use Nette\Utils; +use Psr\EventDispatcher; use Psr\Log; use Ramsey\Uuid; use React\EventLoop; @@ -38,41 +36,12 @@ * @subpackage Client * * @author Adam Kadlec - * - * @method onOpen(IAsyncClient $client) - * @method onClose(IAsyncClient $client) - * @method onMessage(string $channel, string $payload, IAsyncClient $client) - * @method onPmessage(string $patern, string $channel, string $payload, IAsyncClient $client) - * @method onError(Throwable $ex, IAsyncClient $client) - * @method onBeforeConsumeMessage(string $payload) - * @method onAfterConsumeMessage(string $payload) */ class AsyncClient implements IAsyncClient { use Nette\SmartObject; - /** @var Closure[] */ - public array $onOpen = []; - - /** @var Closure[] */ - public array $onClose = []; - - /** @var Closure[] */ - public array $onMessage = []; - - /** @var Closure[] */ - public array $onPmessage = []; - - /** @var Closure[] */ - public array $onError = []; - - /** @var Closure[] */ - public array $onBeforeConsumeMessage = []; - - /** @var Closure[] */ - public array $onAfterConsumeMessage = []; - /** @var string */ private string $channelName; @@ -94,9 +63,6 @@ class AsyncClient implements IAsyncClient /** @var Connections\IConnection */ private Connections\IConnection $connection; - /** @var Consumer\IConsumer */ - private Consumer\IConsumer $consumer; - /** @var Promise\Deferred[] */ private array $requests = []; @@ -109,6 +75,9 @@ class AsyncClient implements IAsyncClient /** @var RedisProtocol\Serializer\SerializerInterface */ private RedisProtocol\Serializer\SerializerInterface $serializer; + /** @var EventDispatcher\EventDispatcherInterface */ + private EventDispatcher\EventDispatcherInterface $eventDispatcher; + /** @var EventLoop\LoopInterface */ private EventLoop\LoopInterface $eventLoop; @@ -118,14 +87,13 @@ class AsyncClient implements IAsyncClient public function __construct( string $channelName, Connections\IConnection $connection, - Consumer\IConsumer $consumer, EventLoop\LoopInterface $eventLoop, + EventDispatcher\EventDispatcherInterface $eventDispatcher, ?Log\LoggerInterface $logger = null ) { $this->channelName = $channelName; $this->connection = $connection; - $this->consumer = $consumer; $factory = new RedisProtocol\Factory(); @@ -134,6 +102,8 @@ public function __construct( $this->eventLoop = $eventLoop; + $this->eventDispatcher = $eventDispatcher; + $this->logger = $logger ?? new Log\NullLogger(); $this->identifier = Uuid\Uuid::uuid4()->toString(); @@ -166,14 +136,14 @@ public function connect(): Promise\ExtendedPromiseInterface function (Socket\ConnectionInterface $stream) use ($deferred): void { $this->stream = $stream; - $this->onOpen($this); + $this->eventDispatcher->dispatch(new Events\ConnectionOpenedEvent($this)); $deferred->resolve($this); }, function (Throwable $ex) use ($deferred): void { $this->isConnecting = false; - $this->onError($ex, $this); + $this->eventDispatcher->dispatch(new Events\ErrorEvent($ex, $this)); $deferred->reject($ex); } @@ -201,7 +171,7 @@ public function close(): void $this->stream->close(); } - $this->onClose($this); + $this->eventDispatcher->dispatch(new Events\ConnectionClosedEvent($this)); // Reject all remaining requests in the queue while ($this->requests) { @@ -294,45 +264,6 @@ public function initialize(): void ->connect() ->then(function (AsyncClient $client): void { $client->subscribe($this->channelName); - - $client->onMessage[] = function (string $channel, string $payload) use ($client): void { - if ($channel === $this->channelName) { - $this->onBeforeConsumeMessage($payload); - - try { - $data = Utils\ArrayHash::from(Utils\Json::decode($payload, Utils\Json::FORCE_ARRAY)); - - if ( - $data->offsetExists('origin') - && $data->offsetExists('routing_key') - && $data->offsetExists('data') - ) { - $this->consumer->consume( - ModulesMetadataTypes\ModuleOriginType::get($data->offsetGet('origin')), - ModulesMetadataTypes\RoutingKeyType::get($data->offsetGet('routing_key')), - $data->offsetGet('data') - ); - - } else { - // Log error action reason - $this->logger->warning('[FB:PLUGIN:REDISDB_EXCHANGE] Received message is not in valid format'); - } - } catch (Utils\JsonException $ex) { - // Log error action reason - $this->logger->warning('[FB:PLUGIN:REDISDB_EXCHANGE] Received message is not valid json', [ - 'exception' => [ - 'message' => $ex->getMessage(), - 'code' => $ex->getCode(), - ], - ]); - - } catch (Exceptions\TerminateException $ex) { - $client->close(); - } - - $this->onAfterConsumeMessage($payload); - } - }; }); if ($promise instanceof Promise\ExtendedPromiseInterface) { @@ -385,7 +316,7 @@ function (Socket\ConnectionInterface $stream) use ($deferred, $timer): void { $models = $this->parser->pushIncoming($chunk); } catch (RedisProtocol\Parser\ParserException $ex) { - $this->onError($ex, $this); + $this->eventDispatcher->dispatch(new Events\ErrorEvent($ex, $this)); $this->close(); @@ -397,7 +328,7 @@ function (Socket\ConnectionInterface $stream) use ($deferred, $timer): void { $this->handleMessage($data); } catch (UnderflowException $ex) { - $this->onError($ex, $this); + $this->eventDispatcher->dispatch(new Events\ErrorEvent($ex, $this)); $this->close(); @@ -411,7 +342,7 @@ function (Socket\ConnectionInterface $stream) use ($deferred, $timer): void { }); $stream->on('error', function (Throwable $ex): void { - $this->onError($ex, $this); + $this->eventDispatcher->dispatch(new Events\ErrorEvent($ex, $this)); }); $deferred->resolve($stream); @@ -440,7 +371,7 @@ private function handleMessage(RedisProtocol\Model\ModelInterface $message): voi // Pub/Sub messages are to be forwarded and should not be processed as request responses if ($type === 'message') { if (isset($array[0]) && isset($array[1])) { - $this->onMessage($array[0], $array[1], $this); + $this->eventDispatcher->dispatch(new Events\MessageReceivedEvent($array[0], $array[1], $this)); return; @@ -449,7 +380,9 @@ private function handleMessage(RedisProtocol\Model\ModelInterface $message): voi } } elseif ($type === 'pmessage') { if (isset($array[0]) && isset($array[1]) && isset($array[2])) { - $this->onPmessage($array[0], $array[1], $array[2], $this); + $this->eventDispatcher->dispatch( + new Events\PatternMessageReceivedEvent($array[0], $array[1], $array[2], $this) + ); return; } else { diff --git a/src/Consumer/ConsumerProxy.php b/src/Consumer/ConsumerProxy.php deleted file mode 100644 index 91ad0c7..0000000 --- a/src/Consumer/ConsumerProxy.php +++ /dev/null @@ -1,156 +0,0 @@ - - * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Consumer - * @since 0.1.0 - * - * @date 17.09.21 - */ - -namespace FastyBird\RedisDbExchangePlugin\Consumer; - -use FastyBird\ExchangePlugin\Consumer as ExchangePluginConsumer; -use FastyBird\ExchangePlugin\Events as ExchangePluginEvents; -use FastyBird\ModulesMetadata\Exceptions as ModulesMetadataExceptions; -use FastyBird\ModulesMetadata\Loaders as ModulesMetadataLoaders; -use FastyBird\ModulesMetadata\Schemas as ModulesMetadataSchemas; -use FastyBird\ModulesMetadata\Types as ModulesMetadataTypes; -use FastyBird\RedisDbExchangePlugin\Exceptions; -use Nette; -use Nette\Utils; -use Psr\Log; -use SplObjectStorage; -use Symfony\Contracts\EventDispatcher; -use Throwable; - -/** - * Exchange message consumer proxy - * - * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Consumer - * - * @author Adam Kadlec - */ -final class ConsumerProxy implements IConsumer -{ - - use Nette\SmartObject; - - /** @var SplObjectStorage */ - private SplObjectStorage $consumers; - - /** @var ModulesMetadataLoaders\ISchemaLoader */ - private ModulesMetadataLoaders\ISchemaLoader $schemaLoader; - - /** @var ModulesMetadataSchemas\IValidator */ - private ModulesMetadataSchemas\IValidator $validator; - - /** @var EventDispatcher\EventDispatcherInterface */ - private EventDispatcher\EventDispatcherInterface $dispatcher; - - /** @var Log\LoggerInterface */ - private Log\LoggerInterface $logger; - - public function __construct( - ModulesMetadataLoaders\ISchemaLoader $schemaLoader, - ModulesMetadataSchemas\IValidator $validator, - EventDispatcher\EventDispatcherInterface $dispatcher, - ?Log\LoggerInterface $logger = null - ) { - $this->schemaLoader = $schemaLoader; - $this->validator = $validator; - - $this->dispatcher = $dispatcher; - - $this->consumers = new SplObjectStorage(); - - $this->logger = $logger ?? new Log\NullLogger(); - } - - /** - * {@inheritDoc} - */ - public function registerConsumer(ExchangePluginConsumer\IConsumer $consumer): void - { - if (!$this->consumers->contains($consumer)) { - $this->consumers->attach($consumer); - } - } - - /** - * {@inheritDoc} - * - * @throws Exceptions\TerminateException - */ - public function consume( - ModulesMetadataTypes\ModuleOriginType $origin, - ModulesMetadataTypes\RoutingKeyType $routingKey, - Utils\ArrayHash $data - ): void { - try { - $schema = $this->schemaLoader->load($origin->getValue(), $routingKey->getValue()); - - } catch (ModulesMetadataExceptions\InvalidArgumentException $ex) { - return; - } - - try { - $data = $this->validator->validate(Utils\Json::encode($data), $schema); - - } catch (Throwable $ex) { - return; - } - - /** @var ExchangePluginConsumer\IConsumer $consumer */ - foreach ($this->consumers as $consumer) { - try { - $this->processMessage($origin, $routingKey, $data, $consumer); - - } catch (Exceptions\UnprocessableMessageException $ex) { - // Log error consume reason - $this->logger->error('[FB:PLUGIN:REDISDB_EXCHANGE] Message could not be consumed', [ - 'exception' => [ - 'message' => $ex->getMessage(), - 'code' => $ex->getCode(), - ], - ]); - - return; - } - } - - $this->dispatcher->dispatch(new ExchangePluginEvents\MessageConsumedEvent($origin, $routingKey, $data)); - } - - /** - * @param ModulesMetadataTypes\ModuleOriginType $origin - * @param ModulesMetadataTypes\RoutingKeyType $routingKey - * @param Utils\ArrayHash $data - * @param ExchangePluginConsumer\IConsumer $consumer - * - * @return void - */ - private function processMessage( - ModulesMetadataTypes\ModuleOriginType $origin, - ModulesMetadataTypes\RoutingKeyType $routingKey, - Utils\ArrayHash $data, - ExchangePluginConsumer\IConsumer $consumer - ): void { - try { - $consumer->consume($origin, $routingKey, $data); - - } catch (Exceptions\TerminateException $ex) { - throw $ex; - - } catch (Throwable $ex) { - throw new Exceptions\UnprocessableMessageException('Received message could not be consumed', $ex->getCode(), $ex); - } - } - -} diff --git a/src/Consumer/IConsumer.php b/src/Consumer/IConsumer.php deleted file mode 100644 index 18afa8b..0000000 --- a/src/Consumer/IConsumer.php +++ /dev/null @@ -1,53 +0,0 @@ - - * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Consumer - * @since 0.1.0 - * - * @date 17.09.21 - */ - -namespace FastyBird\RedisDbExchangePlugin\Consumer; - -use FastyBird\ExchangePlugin\Consumer as ExchangePluginConsumer; -use FastyBird\ModulesMetadata\Types as ModulesMetadataTypes; -use Nette\Utils; - -/** - * Exchange messages consumer interface - * - * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Consumer - * - * @author Adam Kadlec - */ -interface IConsumer -{ - - /** - * @param ExchangePluginConsumer\IConsumer $consumer - * - * @return void - */ - public function registerConsumer(ExchangePluginConsumer\IConsumer $consumer): void; - - /** - * @param ModulesMetadataTypes\ModuleOriginType $origin - * @param ModulesMetadataTypes\RoutingKeyType $routingKey - * @param Utils\ArrayHash $data - * - * @return void - */ - public function consume( - ModulesMetadataTypes\ModuleOriginType $origin, - ModulesMetadataTypes\RoutingKeyType $routingKey, - Utils\ArrayHash $data - ): void; - -} diff --git a/src/DI/RedisDbExchangePluginExtension.php b/src/DI/RedisDbExchangePluginExtension.php index c933611..61906b4 100644 --- a/src/DI/RedisDbExchangePluginExtension.php +++ b/src/DI/RedisDbExchangePluginExtension.php @@ -15,12 +15,10 @@ namespace FastyBird\RedisDbExchangePlugin\DI; -use FastyBird\ExchangePlugin\Consumer as ExchangePluginConsumer; use FastyBird\RedisDbExchangePlugin\Client; use FastyBird\RedisDbExchangePlugin\Connections; -use FastyBird\RedisDbExchangePlugin\Consumer; use FastyBird\RedisDbExchangePlugin\Exceptions; -use FastyBird\RedisDbExchangePlugin\Publisher; +use FastyBird\RedisDbExchangePlugin\Publishers; use FastyBird\RedisDbExchangePlugin\Subscribers; use Nette; use Nette\DI; @@ -106,7 +104,7 @@ public function loadConfiguration(): void ->setAutowired($name === 'default'); $builder->addDefinition($this->prefix('publisher.' . $name), new DI\Definitions\ServiceDefinition()) - ->setType(Publisher\Publisher::class) + ->setType(Publishers\Publisher::class) ->setArguments([ 'client' => $clientService, ]) @@ -123,7 +121,7 @@ public function loadConfiguration(): void ->setAutowired(true); $builder->addDefinition($this->prefix('asyncPublisher'), new DI\Definitions\ServiceDefinition()) - ->setType(Publisher\AsyncPublisher::class) + ->setType(Publishers\AsyncPublisher::class) ->setArguments([ 'client' => $asyncClientService, ]) @@ -131,9 +129,6 @@ public function loadConfiguration(): void } } - $builder->addDefinition($this->prefix('consumer'), new DI\Definitions\ServiceDefinition()) - ->setType(Consumer\ConsumerProxy::class); - if ($configuration->enableAsync) { if ($asyncClientService === null) { throw new Exceptions\InvalidStateException('Asynchronous client could not be created missing "default" connection configuration'); @@ -141,31 +136,9 @@ public function loadConfiguration(): void $builder->addDefinition($this->prefix('subscribers.application'), new DI\Definitions\ServiceDefinition()) ->setType(Subscribers\ApplicationSubscriber::class); - } - } - - /** - * {@inheritDoc} - */ - public function beforeCompile(): void - { - parent::beforeCompile(); - - $builder = $this->getContainerBuilder(); - - /** @var string $consumerProxyServiceName */ - $consumerProxyServiceName = $builder->getByType(Consumer\ConsumerProxy::class, true); - - /** @var DI\Definitions\ServiceDefinition $consumerProxyService */ - $consumerProxyService = $builder->getDefinition($consumerProxyServiceName); - - $consumerServices = $builder->findByType(ExchangePluginConsumer\IConsumer::class); - foreach ($consumerServices as $consumerService) { - $consumerProxyService->addSetup('?->registerConsumer(?)', [ - '@self', - $consumerService, - ]); + $builder->addDefinition($this->prefix('subscribers.asyncClient'), new DI\Definitions\ServiceDefinition()) + ->setType(Subscribers\AsyncClientSubscriber::class); } } diff --git a/src/Events/AfterMessageHandledEvent.php b/src/Events/AfterMessageHandledEvent.php new file mode 100644 index 0000000..8a62ddc --- /dev/null +++ b/src/Events/AfterMessageHandledEvent.php @@ -0,0 +1,47 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * @since 0.2.0 + * + * @date 09.10.21 + */ + +namespace FastyBird\RedisDbExchangePlugin\Events; + +use Symfony\Contracts\EventDispatcher; + +/** + * After message handled event + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * + * @author Adam Kadlec + */ +class AfterMessageHandledEvent extends EventDispatcher\Event +{ + + /** @var string */ + private string $payload; + + public function __construct(string $payload) + { + $this->payload = $payload; + } + + /** + * @return string + */ + public function getPayload(): string + { + return $this->payload; + } + +} diff --git a/src/Events/BeforeMessageHandledEvent.php b/src/Events/BeforeMessageHandledEvent.php new file mode 100644 index 0000000..abf784a --- /dev/null +++ b/src/Events/BeforeMessageHandledEvent.php @@ -0,0 +1,47 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * @since 0.2.0 + * + * @date 09.10.21 + */ + +namespace FastyBird\RedisDbExchangePlugin\Events; + +use Symfony\Contracts\EventDispatcher; + +/** + * Before message handled event + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * + * @author Adam Kadlec + */ +class BeforeMessageHandledEvent extends EventDispatcher\Event +{ + + /** @var string */ + private string $payload; + + public function __construct(string $payload) + { + $this->payload = $payload; + } + + /** + * @return string + */ + public function getPayload(): string + { + return $this->payload; + } + +} diff --git a/src/Events/ConnectionClosedEvent.php b/src/Events/ConnectionClosedEvent.php new file mode 100644 index 0000000..d424dfa --- /dev/null +++ b/src/Events/ConnectionClosedEvent.php @@ -0,0 +1,48 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * @since 0.2.0 + * + * @date 09.10.21 + */ + +namespace FastyBird\RedisDbExchangePlugin\Events; + +use FastyBird\RedisDbExchangePlugin\Client; +use Symfony\Contracts\EventDispatcher; + +/** + * After connection closed event + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * + * @author Adam Kadlec + */ +class ConnectionClosedEvent extends EventDispatcher\Event +{ + + /** @var Client\IAsyncClient */ + private Client\IAsyncClient $client; + + public function __construct(Client\IAsyncClient $client) + { + $this->client = $client; + } + + /** + * @return Client\IAsyncClient + */ + public function getClient(): Client\IAsyncClient + { + return $this->client; + } + +} diff --git a/src/Events/ConnectionOpenedEvent.php b/src/Events/ConnectionOpenedEvent.php new file mode 100644 index 0000000..a4448df --- /dev/null +++ b/src/Events/ConnectionOpenedEvent.php @@ -0,0 +1,48 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * @since 0.2.0 + * + * @date 09.10.21 + */ + +namespace FastyBird\RedisDbExchangePlugin\Events; + +use FastyBird\RedisDbExchangePlugin\Client; +use Symfony\Contracts\EventDispatcher; + +/** + * After connection opened event + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * + * @author Adam Kadlec + */ +class ConnectionOpenedEvent extends EventDispatcher\Event +{ + + /** @var Client\IAsyncClient */ + private Client\IAsyncClient $client; + + public function __construct(Client\IAsyncClient $client) + { + $this->client = $client; + } + + /** + * @return Client\IAsyncClient + */ + public function getClient(): Client\IAsyncClient + { + return $this->client; + } + +} diff --git a/src/Events/ErrorEvent.php b/src/Events/ErrorEvent.php new file mode 100644 index 0000000..c602aab --- /dev/null +++ b/src/Events/ErrorEvent.php @@ -0,0 +1,61 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * @since 0.2.0 + * + * @date 09.10.21 + */ + +namespace FastyBird\RedisDbExchangePlugin\Events; + +use FastyBird\RedisDbExchangePlugin\Client; +use Symfony\Contracts\EventDispatcher; +use Throwable; + +/** + * Connection error event + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * + * @author Adam Kadlec + */ +class ErrorEvent extends EventDispatcher\Event +{ + + /** @var Throwable */ + private Throwable $ex; + + /** @var Client\IAsyncClient */ + private Client\IAsyncClient $client; + + public function __construct(Throwable $ex, Client\IAsyncClient $client) + { + $this->ex = $ex; + $this->client = $client; + } + + /** + * @return Throwable + */ + public function getException(): Throwable + { + return $this->ex; + } + + /** + * @return Client\IAsyncClient + */ + public function getClient(): Client\IAsyncClient + { + return $this->client; + } + +} diff --git a/src/Events/MessageReceivedEvent.php b/src/Events/MessageReceivedEvent.php new file mode 100644 index 0000000..36e648c --- /dev/null +++ b/src/Events/MessageReceivedEvent.php @@ -0,0 +1,72 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * @since 0.2.0 + * + * @date 09.10.21 + */ + +namespace FastyBird\RedisDbExchangePlugin\Events; + +use FastyBird\RedisDbExchangePlugin\Client; +use Symfony\Contracts\EventDispatcher; + +/** + * Exchange message received event + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * + * @author Adam Kadlec + */ +class MessageReceivedEvent extends EventDispatcher\Event +{ + + /** @var string */ + private string $channel; + + /** @var string */ + private string $payload; + + /** @var Client\IAsyncClient */ + private Client\IAsyncClient $client; + + public function __construct(string $channel, string $payload, Client\IAsyncClient $client) + { + $this->channel = $channel; + $this->payload = $payload; + $this->client = $client; + } + + /** + * @return string + */ + public function getChannel(): string + { + return $this->channel; + } + + /** + * @return string + */ + public function getPayload(): string + { + return $this->payload; + } + + /** + * @return Client\IAsyncClient + */ + public function getClient(): Client\IAsyncClient + { + return $this->client; + } + +} diff --git a/src/Events/PatternMessageReceivedEvent.php b/src/Events/PatternMessageReceivedEvent.php new file mode 100644 index 0000000..1155342 --- /dev/null +++ b/src/Events/PatternMessageReceivedEvent.php @@ -0,0 +1,84 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * @since 0.2.0 + * + * @date 09.10.21 + */ + +namespace FastyBird\RedisDbExchangePlugin\Events; + +use FastyBird\RedisDbExchangePlugin\Client; +use Symfony\Contracts\EventDispatcher; + +/** + * Exchange pattern message received event + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * + * @author Adam Kadlec + */ +class PatternMessageReceivedEvent extends EventDispatcher\Event +{ + + /** @var string */ + private string $pattern; + + /** @var string */ + private string $channel; + + /** @var string */ + private string $payload; + + /** @var Client\IAsyncClient */ + private Client\IAsyncClient $client; + + public function __construct(string $pattern, string $channel, string $payload, Client\IAsyncClient $client) + { + $this->pattern = $pattern; + $this->channel = $channel; + $this->payload = $payload; + $this->client = $client; + } + + /** + * @return string + */ + public function getPattern(): string + { + return $this->pattern; + } + + /** + * @return string + */ + public function getChannel(): string + { + return $this->channel; + } + + /** + * @return string + */ + public function getPayload(): string + { + return $this->payload; + } + + /** + * @return Client\IAsyncClient + */ + public function getClient(): Client\IAsyncClient + { + return $this->client; + } + +} diff --git a/src/Publisher/IPublisher.php b/src/Publisher/IPublisher.php deleted file mode 100644 index 52d27f2..0000000 --- a/src/Publisher/IPublisher.php +++ /dev/null @@ -1,31 +0,0 @@ - - * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Publisher - * @since 0.1.0 - * - * @date 17.09.21 - */ - -namespace FastyBird\RedisDbExchangePlugin\Publisher; - -use FastyBird\ExchangePlugin\Publisher as ExchangePluginPublisher; - -/** - * Redis DB exchange publisher interface - * - * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Publisher - * - * @author Adam Kadlec - */ -interface IPublisher extends ExchangePluginPublisher\IPublisher -{ - -} diff --git a/src/Publisher/AsyncPublisher.php b/src/Publishers/AsyncPublisher.php similarity index 75% rename from src/Publisher/AsyncPublisher.php rename to src/Publishers/AsyncPublisher.php index 74e3757..770b1a6 100644 --- a/src/Publisher/AsyncPublisher.php +++ b/src/Publishers/AsyncPublisher.php @@ -7,15 +7,16 @@ * @copyright https://www.fastybird.com * @author Adam Kadlec * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Publisher + * @subpackage Publishers * @since 0.1.0 * * @date 17.09.21 */ -namespace FastyBird\RedisDbExchangePlugin\Publisher; +namespace FastyBird\RedisDbExchangePlugin\Publishers; use FastyBird\DateTimeFactory; +use FastyBird\ExchangePlugin\Publisher as ExchangePluginPublisher; use FastyBird\ModulesMetadata\Types as ModulesMetadataTypes; use FastyBird\RedisDbExchangePlugin\Client; use Nette; @@ -27,11 +28,11 @@ * Redis DB exchange asynchronous publisher * * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Publisher + * @subpackage Publishers * * @author Adam Kadlec */ -final class AsyncPublisher implements IPublisher +final class AsyncPublisher implements ExchangePluginPublisher\IPublisher { use Nette\SmartObject; @@ -61,7 +62,7 @@ public function __construct( public function publish( ModulesMetadataTypes\ModuleOriginType $origin, ModulesMetadataTypes\RoutingKeyType $routingKey, - array $data + ?Utils\ArrayHash $data ): void { try { $result = $this->client->publish( @@ -70,7 +71,7 @@ public function publish( 'origin' => $origin->getValue(), 'routing_key' => $routingKey->getValue(), 'created' => $this->dateTimeFactory->getNow()->format(DATE_ATOM), - 'data' => $data, + 'data' => $data !== null ? $this->dataToArray($data) : null, ]), ); @@ -79,7 +80,7 @@ public function publish( 'message' => [ 'routingKey' => $routingKey->getValue(), 'origin' => $origin->getValue(), - 'data' => $data, + 'data' => $data !== null ? $this->dataToArray($data) : null, ], ]); }); @@ -90,7 +91,7 @@ public function publish( 'message' => [ 'routingKey' => $routingKey->getValue(), 'origin' => $origin->getValue(), - 'data' => $data, + 'data' => $data !== null ? $this->dataToArray($data) : null, ], ]); }); @@ -100,7 +101,7 @@ public function publish( 'message' => [ 'routingKey' => $routingKey->getValue(), 'origin' => $origin->getValue(), - 'data' => $data, + 'data' => $data !== null ? $this->dataToArray($data) : null, ], 'exception' => [ 'message' => $ex->getMessage(), @@ -112,4 +113,22 @@ public function publish( } } + /** + * @param Utils\ArrayHash $data + * + * @return mixed[] + */ + private function dataToArray(Utils\ArrayHash $data): array + { + $transformed = (array) $data; + + foreach ($transformed as $key => $value) { + if ($value instanceof Utils\ArrayHash) { + $transformed[$key] = $this->dataToArray($value); + } + } + + return $transformed; + } + } diff --git a/src/Publisher/Publisher.php b/src/Publishers/Publisher.php similarity index 73% rename from src/Publisher/Publisher.php rename to src/Publishers/Publisher.php index 905771b..a18bd4d 100644 --- a/src/Publisher/Publisher.php +++ b/src/Publishers/Publisher.php @@ -1,21 +1,22 @@ * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Publisher + * @subpackage Publishers * @since 0.1.0 * * @date 17.09.21 */ -namespace FastyBird\RedisDbExchangePlugin\Publisher; +namespace FastyBird\RedisDbExchangePlugin\Publishers; use FastyBird\DateTimeFactory; +use FastyBird\ExchangePlugin\Publisher as ExchangePluginPublisher; use FastyBird\ModulesMetadata\Types as ModulesMetadataTypes; use FastyBird\RedisDbExchangePlugin\Client; use Nette; @@ -26,11 +27,11 @@ * Redis DB exchange publisher * * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Publisher + * @subpackage Publishers * * @author Adam Kadlec */ -final class Publisher implements IPublisher +final class Publisher implements ExchangePluginPublisher\IPublisher { use Nette\SmartObject; @@ -60,7 +61,7 @@ public function __construct( public function publish( ModulesMetadataTypes\ModuleOriginType $origin, ModulesMetadataTypes\RoutingKeyType $routingKey, - array $data + ?Utils\ArrayHash $data ): void { try { $result = $this->client->publish( @@ -69,7 +70,7 @@ public function publish( 'origin' => $origin->getValue(), 'routing_key' => $routingKey->getValue(), 'created' => $this->dateTimeFactory->getNow()->format(DATE_ATOM), - 'data' => $data, + 'data' => $data !== null ? $this->dataToArray($data) : null, ]), ); @@ -78,7 +79,7 @@ public function publish( 'message' => [ 'routingKey' => $routingKey->getValue(), 'origin' => $origin->getValue(), - 'data' => $data, + 'data' => $data !== null ? $this->dataToArray($data) : null, ], 'exception' => [ 'message' => $ex->getMessage(), @@ -94,7 +95,7 @@ public function publish( 'message' => [ 'routingKey' => $routingKey->getValue(), 'origin' => $origin->getValue(), - 'data' => $data, + 'data' => $data !== null ? $this->dataToArray($data) : null, ], ]); } else { @@ -102,10 +103,28 @@ public function publish( 'message' => [ 'routingKey' => $routingKey->getValue(), 'origin' => $origin->getValue(), - 'data' => $data, + 'data' => $data !== null ? $this->dataToArray($data) : null, ], ]); } } + /** + * @param Utils\ArrayHash $data + * + * @return mixed[] + */ + private function dataToArray(Utils\ArrayHash $data): array + { + $transformed = (array) $data; + + foreach ($transformed as $key => $value) { + if ($value instanceof Utils\ArrayHash) { + $transformed[$key] = $this->dataToArray($value); + } + } + + return $transformed; + } + } diff --git a/src/Subscribers/AsyncClientSubscriber.php b/src/Subscribers/AsyncClientSubscriber.php new file mode 100644 index 0000000..f85a412 --- /dev/null +++ b/src/Subscribers/AsyncClientSubscriber.php @@ -0,0 +1,168 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Subscribers + * @since 0.2.0 + * + * @date 09.10.21 + */ + +namespace FastyBird\RedisDbExchangePlugin\Subscribers; + +use FastyBird\ExchangePlugin\Consumer as ExchangePluginConsumer; +use FastyBird\ExchangePlugin\Events as ExchangePluginEvents; +use FastyBird\ModulesMetadata\Exceptions as ModulesMetadataExceptions; +use FastyBird\ModulesMetadata\Loaders as ModulesMetadataLoaders; +use FastyBird\ModulesMetadata\Schemas as ModulesMetadataSchemas; +use FastyBird\ModulesMetadata\Types as ModulesMetadataTypes; +use FastyBird\RedisDbExchangePlugin\Events; +use FastyBird\RedisDbExchangePlugin\Exceptions; +use Nette\Utils; +use Psr\EventDispatcher as PsrEventDispatcher; +use Psr\Log; +use Symfony\Component\EventDispatcher; +use Throwable; + +/** + * Redis async clients subscriber + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Subscribers + * + * @author Adam Kadlec + */ +class AsyncClientSubscriber implements EventDispatcher\EventSubscriberInterface +{ + + /** @var ExchangePluginConsumer\IConsumer|null */ + private ?ExchangePluginConsumer\IConsumer $consumer; + + /** @var ModulesMetadataLoaders\ISchemaLoader */ + private ModulesMetadataLoaders\ISchemaLoader $schemaLoader; + + /** @var ModulesMetadataSchemas\IValidator */ + private ModulesMetadataSchemas\IValidator $validator; + + /** @var PsrEventDispatcher\EventDispatcherInterface */ + private PsrEventDispatcher\EventDispatcherInterface $dispatcher; + + /** @var Log\LoggerInterface */ + private Log\LoggerInterface $logger; + + public function __construct( + ModulesMetadataLoaders\ISchemaLoader $schemaLoader, + ModulesMetadataSchemas\IValidator $validator, + PsrEventDispatcher\EventDispatcherInterface $dispatcher, + ?ExchangePluginConsumer\IConsumer $consumer = null, + ?Log\LoggerInterface $logger = null + ) { + $this->schemaLoader = $schemaLoader; + $this->validator = $validator; + + $this->dispatcher = $dispatcher; + + $this->consumer = $consumer; + + $this->logger = $logger ?? new Log\NullLogger(); + } + + /** + * @return array + */ + public static function getSubscribedEvents(): array + { + return [ + Events\MessageReceivedEvent::class => 'handleMessage', + ]; + } + + public function handleMessage(Events\MessageReceivedEvent $event): void + { + $this->dispatcher->dispatch(new Events\BeforeMessageHandledEvent($event->getPayload())); + + try { + $data = Utils\ArrayHash::from(Utils\Json::decode($event->getPayload(), Utils\Json::FORCE_ARRAY)); + + if ( + $data->offsetExists('origin') + && $data->offsetExists('routing_key') + && $data->offsetExists('data') + ) { + $this->handle( + ModulesMetadataTypes\ModuleOriginType::get($data->offsetGet('origin')), + ModulesMetadataTypes\RoutingKeyType::get($data->offsetGet('routing_key')), + $data->offsetGet('data') + ); + + } else { + // Log error action reason + $this->logger->warning('[FB:PLUGIN:REDISDB_EXCHANGE] Received message is not in valid format'); + } + } catch (Utils\JsonException $ex) { + // Log error action reason + $this->logger->warning('[FB:PLUGIN:REDISDB_EXCHANGE] Received message is not valid json', [ + 'exception' => [ + 'message' => $ex->getMessage(), + 'code' => $ex->getCode(), + ], + ]); + + } catch (Exceptions\TerminateException $ex) { + $event->getClient()->close(); + } + + $this->dispatcher->dispatch(new Events\AfterMessageHandledEvent($event->getPayload())); + } + + /** + * @param ModulesMetadataTypes\ModuleOriginType $origin + * @param ModulesMetadataTypes\RoutingKeyType $routingKey + * @param Utils\ArrayHash $data + * + * @throws Utils\JsonException + */ + private function handle( + ModulesMetadataTypes\ModuleOriginType $origin, + ModulesMetadataTypes\RoutingKeyType $routingKey, + Utils\ArrayHash $data + ): void { + try { + $schema = $this->schemaLoader->load($origin->getValue(), $routingKey->getValue()); + + } catch (ModulesMetadataExceptions\InvalidArgumentException $ex) { + return; + } + + try { + $data = $this->validator->validate(Utils\Json::encode($data), $schema); + + } catch (Throwable $ex) { + return; + } + + try { + if ($this->consumer !== null) { + $this->consumer->consume($origin, $routingKey, $data); + } + } catch (Exceptions\UnprocessableMessageException $ex) { + // Log error consume reason + $this->logger->error('[FB:PLUGIN:REDISDB_EXCHANGE] Message could not be handled', [ + 'exception' => [ + 'message' => $ex->getMessage(), + 'code' => $ex->getCode(), + ], + ]); + + return; + } + + $this->dispatcher->dispatch(new ExchangePluginEvents\MessageReceivedEvent($origin, $routingKey, $data)); + } + +} diff --git a/tests/cases/Unit/DI/ExtensionTest.phpt b/tests/cases/Unit/DI/ExtensionTest.phpt index bf620e2..79d7e38 100644 --- a/tests/cases/Unit/DI/ExtensionTest.phpt +++ b/tests/cases/Unit/DI/ExtensionTest.phpt @@ -3,7 +3,6 @@ namespace Tests\Cases; use FastyBird\RedisDbExchangePlugin\Client; -use FastyBird\RedisDbExchangePlugin\Consumer; use FastyBird\RedisDbExchangePlugin\Subscribers; use Tester\Assert; @@ -23,9 +22,8 @@ final class ExtensionTest extends BaseTestCase Assert::notNull($container->getByType(Client\IClient::class)); Assert::notNull($container->getByType(Client\IAsyncClient::class)); - Assert::notNull($container->getByType(Consumer\IConsumer::class)); - Assert::notNull($container->getByType(Subscribers\ApplicationSubscriber::class)); + Assert::notNull($container->getByType(Subscribers\AsyncClientSubscriber::class)); } } diff --git a/tests/cases/Unit/Publisher/AsyncPublisherTest.phpt b/tests/cases/Unit/Publisher/AsyncPublisherTest.phpt index 17185b0..20efe21 100644 --- a/tests/cases/Unit/Publisher/AsyncPublisherTest.phpt +++ b/tests/cases/Unit/Publisher/AsyncPublisherTest.phpt @@ -6,7 +6,7 @@ use DateTime; use FastyBird\DateTimeFactory; use FastyBird\ModulesMetadata\Types as ModulesMetadataTypes; use FastyBird\RedisDbExchangePlugin\Client; -use FastyBird\RedisDbExchangePlugin\Publisher; +use FastyBird\RedisDbExchangePlugin\Publishers; use Mockery; use Nette\Utils; use Ninjify\Nunjuck\TestCase\BaseMockeryTestCase; @@ -60,15 +60,15 @@ final class AsyncPublisherTest extends BaseMockeryTestCase ->andReturn($now) ->times(1); - $publisher = new Publisher\AsyncPublisher($asyncClient, $dateTimeFactory); + $publisher = new Publishers\AsyncPublisher($asyncClient, $dateTimeFactory); $publisher->publish( ModulesMetadataTypes\ModuleOriginType::get(ModulesMetadataTypes\ModuleOriginType::ORIGIN_MODULE_DEVICES), ModulesMetadataTypes\RoutingKeyType::get(ModulesMetadataTypes\RoutingKeyType::ROUTE_DEVICES_ENTITY_UPDATED), - [ + Utils\ArrayHash::from([ 'key_one' => 'value_one', 'key_two' => 'value_two', - ] + ]) ); } diff --git a/tests/cases/Unit/Publisher/PublisherTest.phpt b/tests/cases/Unit/Publisher/PublisherTest.phpt index 5fdb91d..b036a1a 100644 --- a/tests/cases/Unit/Publisher/PublisherTest.phpt +++ b/tests/cases/Unit/Publisher/PublisherTest.phpt @@ -6,7 +6,7 @@ use DateTime; use FastyBird\DateTimeFactory; use FastyBird\ModulesMetadata\Types as ModulesMetadataTypes; use FastyBird\RedisDbExchangePlugin\Client; -use FastyBird\RedisDbExchangePlugin\Publisher; +use FastyBird\RedisDbExchangePlugin\Publishers; use Mockery; use Nette\Utils; use Ninjify\Nunjuck\TestCase\BaseMockeryTestCase; @@ -56,15 +56,15 @@ final class PublisherTest extends BaseMockeryTestCase ->andReturn($now) ->times(1); - $publisher = new Publisher\Publisher($client, $dateTimeFactory); + $publisher = new Publishers\Publisher($client, $dateTimeFactory); $publisher->publish( ModulesMetadataTypes\ModuleOriginType::get(ModulesMetadataTypes\ModuleOriginType::ORIGIN_MODULE_DEVICES), ModulesMetadataTypes\RoutingKeyType::get(ModulesMetadataTypes\RoutingKeyType::ROUTE_DEVICES_ENTITY_UPDATED), - [ + Utils\ArrayHash::from([ 'key_one' => 'value_one', 'key_two' => 'value_two', - ] + ]) ); }