diff --git a/phpstan.neon b/phpstan.neon index 8895424..8c76cc9 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -4,3 +4,5 @@ parameters: stubFiles: - tests/stubs/PromiseInterface.stub - tests/stubs/ExtendedPromiseInterface.stub + - tests/stubs/Client.stub + - tests/stubs/EventEmitterInterface.stub diff --git a/src/Client/AsyncClientFactory.php b/src/Client/AsyncClientFactory.php deleted file mode 100644 index 8c311ee..0000000 --- a/src/Client/AsyncClientFactory.php +++ /dev/null @@ -1,82 +0,0 @@ - - * @package FastyBird:RedisDbExchangePlugin! - * @subpackage Client - * @since 0.1.0 - * - * @date 17.12.20 - */ - -namespace FastyBird\RedisDbExchangePlugin\Client; - -use Clue\React\Redis; -use FastyBird\RedisDbExchangePlugin\Connections; -use FastyBird\RedisDbExchangePlugin\Events; -use Psr\EventDispatcher; -use React\EventLoop; -use React\Promise; -use React\Socket; -use Throwable; - -final class AsyncClientFactory -{ - - public function __construct( - private readonly string $channelName, - private readonly Connections\Connection $connection, - private readonly EventDispatcher\EventDispatcherInterface|null $dispatcher = null, - ) - { - } - - public function create( - Socket\ConnectorInterface|null $connector, - EventLoop\LoopInterface|null $eventLoop, - ): Promise\PromiseInterface - { - $factory = new Redis\Factory($eventLoop, $connector); - - $deferred = new Promise\Deferred(); - - $factory->createClient($this->connection->getHost() . ':' . $this->connection->getPort()) - ->then(function (Redis\Client $redis) use ($deferred): void { - // @phpstan-ignore-next-line - $redis->subscribe($this->channelName); - - $redis->on('message', function (string $channel, string $payload) use ($redis): void { - $this->dispatcher?->dispatch(new Events\MessageReceived($channel, $payload, $redis)); - }); - - $redis->on( - 'pmessage', - function (string $pattern, string $channel, string $payload) use ($redis): void { - $this->dispatcher?->dispatch( - new Events\PatternMessageReceived($pattern, $channel, $payload, $redis), - ); - }, - ); - - $redis->on('close', function () use ($redis): void { - $this->dispatcher?->dispatch(new Events\ConnectionClosed($redis)); - }); - - $redis->on('error', function (Throwable $ex) use ($redis): void { - $this->dispatcher?->dispatch(new Events\Error($ex, $redis)); - }); - - $deferred->resolve($redis); - }) - ->otherwise(static function (Throwable $ex) use ($deferred): void { - $deferred->reject($ex); - }); - - return $deferred->promise(); - } - -} diff --git a/src/Client/Client.php b/src/Client/Client.php index f778554..c62fe1f 100644 --- a/src/Client/Client.php +++ b/src/Client/Client.php @@ -38,10 +38,7 @@ class Client /** @var Predis\Client */ private Predis\Client $redis; - public function __construct( - private readonly string $channelName, - private readonly Connections\Connection $connection, - ) + public function __construct(Connections\Connection $connection) { $options = [ 'scheme' => 'tcp', @@ -60,18 +57,13 @@ public function __construct( $this->redis = new Predis\Client($options); } - public function publish(string $content): bool + public function publish(string $channel, string $content): bool { /** @var mixed $response */ - $response = $this->redis->publish($this->channelName, $content); + $response = $this->redis->publish($channel, $content); assert(is_int($response) || $response instanceof PredisResponse\ResponseInterface); return !$response instanceof PredisResponse\ErrorInterface; } - public function getIdentifier(): string - { - return $this->connection->getIdentifier(); - } - } diff --git a/src/Client/Factory.php b/src/Client/Factory.php new file mode 100644 index 0000000..b42d6fc --- /dev/null +++ b/src/Client/Factory.php @@ -0,0 +1,94 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Client + * @since 0.61.0 + * + * @date 09.10.22 + */ + +namespace FastyBird\RedisDbExchangePlugin\Client; + +use Clue\React\Redis; +use FastyBird\RedisDbExchangePlugin\Connections; +use FastyBird\RedisDbExchangePlugin\Events; +use FastyBird\RedisDbExchangePlugin\Publishers; +use Psr\EventDispatcher; +use React\EventLoop; +use React\Promise; +use React\Socket; +use Throwable; + +final class Factory +{ + + public function __construct( + private readonly string $channel, + private readonly Connections\Connection $connection, + private readonly Publishers\Publisher $publisher, + private readonly EventDispatcher\EventDispatcherInterface|null $dispatcher = null, + ) + { + } + + public function create( + EventLoop\LoopInterface|null $eventLoop = null, + Socket\ConnectorInterface|null $connector = null, + ): Promise\PromiseInterface + { + $factory = new Redis\Factory($eventLoop, $connector); + + $deferred = new Promise\Deferred(); + + $factory->createClient($this->connection->getHost() . ':' . $this->connection->getPort()) + ->then( + function (Redis\Client $redis) use ($deferred): void { + $this->publisher->setAsyncClient($redis); + + $redis->subscribe($this->channel); + + $redis->on('message', function (string $channel, string $payload) use ($redis): void { + $this->dispatcher?->dispatch( + new Events\MessageReceived($channel, $payload, $redis), + ); + }); + + $redis->on( + 'pmessage', + function (string $pattern, string $channel, string $payload) use ($redis): void { + $this->dispatcher?->dispatch( + new Events\PatternMessageReceived( + $pattern, + $channel, + $payload, + $redis, + ), + ); + }, + ); + + $redis->on('close', function () use ($redis): void { + $this->dispatcher?->dispatch(new Events\ConnectionClosed($redis)); + }); + + $redis->on('error', function (Throwable $ex) use ($redis): void { + $this->dispatcher?->dispatch(new Events\Error($ex, $redis)); + }); + + $deferred->resolve($redis); + }, + static function (Throwable $ex) use ($deferred): void { + $deferred->reject($ex); + }, + ); + + return $deferred->promise(); + } + +} diff --git a/src/Commands/RedisClient.php b/src/Commands/RedisClient.php new file mode 100644 index 0000000..2889b85 --- /dev/null +++ b/src/Commands/RedisClient.php @@ -0,0 +1,130 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Commands + * @since 0.61.0 + * + * @date 09.10.22 + */ + +namespace FastyBird\RedisDbExchangePlugin\Commands; + +use FastyBird\Metadata\Types as MetadataTypes; +use FastyBird\RedisDbExchangePlugin\Client; +use FastyBird\RedisDbExchangePlugin\Events; +use FastyBird\RedisDbExchangePlugin\Exceptions; +use Nette; +use Psr\EventDispatcher; +use Psr\Log; +use React\EventLoop; +use Symfony\Component\Console; +use Symfony\Component\Console\Input; +use Symfony\Component\Console\Output; +use Throwable; + +/** + * Redis client command + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Commands + * + * @author Adam Kadlec + */ +class RedisClient extends Console\Command\Command +{ + + use Nette\SmartObject; + + public const NAME = 'fb:redis-client:start'; + + private Log\LoggerInterface $logger; + + public function __construct( + private readonly Client\Factory $clientFactory, + private readonly EventLoop\LoopInterface $eventLoop, + private readonly EventDispatcher\EventDispatcherInterface|null $dispatcher = null, + Log\LoggerInterface|null $logger = null, + string|null $name = null, + ) + { + parent::__construct($name); + + $this->logger = $logger ?? new Log\NullLogger(); + } + + protected function configure(): void + { + parent::configure(); + + $this + ->setName(self::NAME) + ->setDescription('Start redis client.'); + } + + protected function execute( + Input\InputInterface $input, + Output\OutputInterface $output, + ): int + { + $this->logger->info( + 'Launching Redis client', + [ + 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, + 'type' => 'command', + ], + ); + + try { + $this->dispatcher?->dispatch(new Events\Startup()); + + $this->clientFactory->create($this->eventLoop); + + $this->eventLoop->run(); + + } catch (Exceptions\Terminate $ex) { + // Log error action reason + $this->logger->error( + 'Redis client was forced to close', + [ + 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, + 'type' => 'command', + 'exception' => [ + 'message' => $ex->getMessage(), + 'code' => $ex->getCode(), + ], + 'cmd' => $this->getName(), + ], + ); + + $this->eventLoop->stop(); + + } catch (Throwable $ex) { + // Log error action reason + $this->logger->error( + 'An unhandled error occurred. Stopping Redis client', + [ + 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, + 'type' => 'command', + 'exception' => [ + 'message' => $ex->getMessage(), + 'code' => $ex->getCode(), + ], + 'cmd' => $this->getName(), + ], + ); + + $this->eventLoop->stop(); + + return self::FAILURE; + } + + return self::SUCCESS; + } + +} diff --git a/src/Connections/Connection.php b/src/Connections/Connection.php index b827fc9..f233f3d 100644 --- a/src/Connections/Connection.php +++ b/src/Connections/Connection.php @@ -31,7 +31,6 @@ final class Connection use Nette\SmartObject; public function __construct( - private readonly string $identifier, private readonly string $host = '127.0.0.1', private readonly int $port = 6_379, private readonly string|null $username = null, @@ -60,9 +59,4 @@ public function getPassword(): string|null return $this->password; } - public function getIdentifier(): string - { - return $this->identifier; - } - } diff --git a/src/DI/RedisDbExchangePluginExtension.php b/src/DI/RedisDbExchangePluginExtension.php index 5a0f0be..95cd0ac 100644 --- a/src/DI/RedisDbExchangePluginExtension.php +++ b/src/DI/RedisDbExchangePluginExtension.php @@ -16,14 +16,14 @@ namespace FastyBird\RedisDbExchangePlugin\DI; use FastyBird\RedisDbExchangePlugin\Client; +use FastyBird\RedisDbExchangePlugin\Commands; use FastyBird\RedisDbExchangePlugin\Connections; -use FastyBird\RedisDbExchangePlugin\Exceptions; use FastyBird\RedisDbExchangePlugin\Publishers; use FastyBird\RedisDbExchangePlugin\Subscribers; +use FastyBird\RedisDbExchangePlugin\Utils; use Nette; use Nette\DI; use Nette\Schema; -use Ramsey\Uuid; use stdClass; use function assert; @@ -38,9 +38,11 @@ class RedisDbExchangePluginExtension extends DI\CompilerExtension { + public const NAME = 'fbRedisDbExchangePlugin'; + public static function register( Nette\Configurator $config, - string $extensionName = 'fbRedisDbExchangePlugin', + string $extensionName = self::NAME, ): void { $config->onCompile[] = static function ( @@ -54,15 +56,15 @@ public static function register( public function getConfigSchema(): Schema\Schema { return Schema\Expect::structure([ - 'connection' => Schema\Expect::arrayOf(Schema\Expect::structure([ + 'client' => Schema\Expect::structure([ 'host' => Schema\Expect::string()->default('127.0.0.1'), 'port' => Schema\Expect::int(6_379), 'username' => Schema\Expect::string()->nullable(), 'password' => Schema\Expect::string()->nullable(), + ]), + 'exchange' => Schema\Expect::structure([ 'channel' => Schema\Expect::string()->default('fb_exchange'), - ])), - 'enableClassic' => Schema\Expect::bool(true), - 'enableAsync' => Schema\Expect::bool(false), + ]), ]); } @@ -72,65 +74,54 @@ public function loadConfiguration(): void $configuration = $this->getConfig(); assert($configuration instanceof stdClass); - $asyncClientService = null; - - foreach ($configuration->connection as $name => $connection) { - $connectionService = $builder->addDefinition( - $this->prefix('connection.' . $name), - new DI\Definitions\ServiceDefinition(), - ) - ->setType(Connections\Connection::class) - ->setArguments([ - 'host' => $connection->host, - 'port' => $connection->port, - 'username' => $connection->username, - 'password' => $connection->password, - 'identifier' => Uuid\Uuid::uuid4()->toString(), - ]) - ->setAutowired(false); - - if ($configuration->enableClassic) { - $clientService = $builder->addDefinition( - $this->prefix('client.' . $name), - new DI\Definitions\ServiceDefinition(), - ) - ->setType(Client\Client::class) - ->setArguments([ - 'channelName' => $connection->channel, - 'connection' => $connectionService, - ]) - ->setAutowired($name === 'default'); - - $builder->addDefinition($this->prefix('publisher.' . $name), new DI\Definitions\ServiceDefinition()) - ->setType(Publishers\Publisher::class) - ->setArguments([ - 'client' => $clientService, - ]); - } - - if ($name === 'default' && $configuration->enableAsync) { - $asyncClientService = $builder->addDefinition( - $this->prefix('asyncClientFactory'), - new DI\Definitions\ServiceDefinition(), - ) - ->setType(Client\AsyncClientFactory::class) - ->setArguments([ - 'channelName' => $connection->channel, - 'connection' => $connectionService, - ]); - } - } - - if ($configuration->enableAsync) { - if ($asyncClientService === null) { - throw new Exceptions\InvalidState( - 'Asynchronous client could not be created missing "default" connection configuration', - ); - } - - $builder->addDefinition($this->prefix('subscribers.asyncClient'), new DI\Definitions\ServiceDefinition()) - ->setType(Subscribers\AsyncClientSubscriber::class); - } + $connectionService = $builder->addDefinition( + $this->prefix('connection'), + new DI\Definitions\ServiceDefinition(), + ) + ->setType(Connections\Connection::class) + ->setArguments([ + 'host' => $configuration->client->host, + 'port' => $configuration->client->port, + 'username' => $configuration->client->username, + 'password' => $configuration->client->password, + ]) + ->setAutowired(false); + + $clientService = $builder->addDefinition( + $this->prefix('client'), + new DI\Definitions\ServiceDefinition(), + ) + ->setType(Client\Client::class) + ->setArguments([ + 'connection' => $connectionService, + ]); + + $publisher = $builder->addDefinition($this->prefix('publisher'), new DI\Definitions\ServiceDefinition()) + ->setType(Publishers\Publisher::class) + ->setArguments([ + 'channel' => $configuration->exchange->channel, + 'client' => $clientService, + ]); + + $builder->addDefinition( + $this->prefix('asyncClientFactory'), + new DI\Definitions\ServiceDefinition(), + ) + ->setType(Client\Factory::class) + ->setArguments([ + 'channel' => $configuration->exchange->channel, + 'connection' => $connectionService, + 'publisher' => $publisher, + ]); + + $builder->addDefinition($this->prefix('subscribers.asyncClient'), new DI\Definitions\ServiceDefinition()) + ->setType(Subscribers\ClientSubscriber::class); + + $builder->addDefinition($this->prefix('command.client'), new DI\Definitions\ServiceDefinition()) + ->setType(Commands\RedisClient::class); + + $builder->addDefinition($this->prefix('utils.identifier'), new DI\Definitions\ServiceDefinition()) + ->setType(Utils\IdentifierGenerator::class); } } diff --git a/src/Events/Startup.php b/src/Events/Startup.php new file mode 100644 index 0000000..053a413 --- /dev/null +++ b/src/Events/Startup.php @@ -0,0 +1,31 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * @since 0.61.0 + * + * @date 09.10.22 + */ + +namespace FastyBird\RedisDbExchangePlugin\Events; + +use Symfony\Contracts\EventDispatcher; + +/** + * After message consumed event + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Events + * + * @author Adam Kadlec + */ +class Startup extends EventDispatcher\Event +{ + +} diff --git a/src/Exceptions/InvalidArgument.php b/src/Exceptions/InvalidArgument.php new file mode 100644 index 0000000..14290c5 --- /dev/null +++ b/src/Exceptions/InvalidArgument.php @@ -0,0 +1,23 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Exceptions + * @since 0.61.0 + * + * @date 09.10.22 + */ + +namespace FastyBird\RedisDbExchangePlugin\Exceptions; + +use InvalidArgumentException as PHPInvalidArgumentException; + +class InvalidArgument extends PHPInvalidArgumentException implements Exception +{ + +} diff --git a/src/Publishers/Publisher.php b/src/Publishers/Publisher.php index bd1372b..f75242b 100644 --- a/src/Publishers/Publisher.php +++ b/src/Publishers/Publisher.php @@ -15,14 +15,17 @@ namespace FastyBird\RedisDbExchangePlugin\Publishers; +use Clue\React\Redis; use FastyBird\DateTimeFactory; use FastyBird\Exchange\Publisher as ExchangePublisher; use FastyBird\Metadata\Entities as MetadataEntities; use FastyBird\Metadata\Types as MetadataTypes; use FastyBird\RedisDbExchangePlugin\Client; +use FastyBird\RedisDbExchangePlugin\Utils; use Nette; -use Nette\Utils; use Psr\Log; +use React\Promise; +use Throwable; use const DATE_ATOM; /** @@ -38,9 +41,13 @@ final class Publisher implements ExchangePublisher\Publisher use Nette\SmartObject; + private Redis\Client|null $asyncClient = null; + private Log\LoggerInterface $logger; public function __construct( + private readonly Utils\IdentifierGenerator $identifier, + private readonly string $channel, private readonly Client\Client $client, private readonly DateTimeFactory\Factory $dateTimeFactory, Log\LoggerInterface|null $logger = null, @@ -49,6 +56,14 @@ public function __construct( $this->logger = $logger ?? new Log\NullLogger(); } + /** + * @internal + */ + public function setAsyncClient(Redis\Client $client): void + { + $this->asyncClient = $client; + } + public function publish( MetadataTypes\ModuleSource|MetadataTypes\PluginSource|MetadataTypes\ConnectorSource $source, MetadataTypes\RoutingKey $routingKey, @@ -56,9 +71,10 @@ public function publish( ): void { try { - $result = $this->client->publish( - Utils\Json::encode([ - 'sender_id' => $this->client->getIdentifier(), + $result = $this->getClient()->publish( + $this->channel, + Nette\Utils\Json::encode([ + 'sender_id' => $this->identifier->getIdentifier(), 'source' => $source->getValue(), 'routing_key' => $routingKey->getValue(), 'created' => $this->dateTimeFactory->getNow()->format(DATE_ATOM), @@ -66,7 +82,7 @@ public function publish( ]), ); - } catch (Utils\JsonException $ex) { + } catch (Nette\Utils\JsonException $ex) { $this->logger->error('Data could not be converted to message', [ 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, 'type' => 'publish', @@ -84,27 +100,67 @@ public function publish( return; } - if ($result) { - $this->logger->debug('Received message was pushed into data exchange', [ - 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, - 'type' => 'publish', - 'message' => [ - 'routingKey' => $routingKey->getValue(), - 'source' => $source->getValue(), - 'data' => $entity?->toArray(), - ], - ]); + if ($result instanceof Promise\PromiseInterface) { + $result->then( + function () use ($routingKey, $source, $entity): void { + $this->logger->debug('Received message was pushed into data exchange', [ + 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, + 'type' => 'publish', + 'message' => [ + 'routingKey' => $routingKey->getValue(), + 'source' => $source->getValue(), + 'data' => $entity?->toArray(), + ], + ]); + }, + function (Throwable $ex) use ($routingKey, $source, $entity): void { + $this->logger->error('Received message could not be pushed into data exchange', [ + 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, + 'type' => 'publish', + 'message' => [ + 'routingKey' => $routingKey->getValue(), + 'source' => $source->getValue(), + 'data' => $entity?->toArray(), + ], + 'exception' => [ + 'message' => $ex->getMessage(), + 'code' => $ex->getCode(), + ], + ]); + }, + ); } else { - $this->logger->error('Received message could not be pushed into data exchange', [ - 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, - 'type' => 'publish', - 'message' => [ - 'routingKey' => $routingKey->getValue(), - 'source' => $source->getValue(), - 'data' => $entity?->toArray(), - ], - ]); + if ($result === true) { + $this->logger->debug('Received message was pushed into data exchange', [ + 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, + 'type' => 'publish', + 'message' => [ + 'routingKey' => $routingKey->getValue(), + 'source' => $source->getValue(), + 'data' => $entity?->toArray(), + ], + ]); + } else { + $this->logger->error('Received message could not be pushed into data exchange', [ + 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, + 'type' => 'publish', + 'message' => [ + 'routingKey' => $routingKey->getValue(), + 'source' => $source->getValue(), + 'data' => $entity?->toArray(), + ], + ]); + } + } + } + + private function getClient(): Client\Client|Redis\Client + { + if ($this->asyncClient !== null) { + return $this->asyncClient; } + + return $this->client; } } diff --git a/src/Subscribers/AsyncClientSubscriber.php b/src/Subscribers/ClientSubscriber.php similarity index 85% rename from src/Subscribers/AsyncClientSubscriber.php rename to src/Subscribers/ClientSubscriber.php index 11c30ea..d153cac 100644 --- a/src/Subscribers/AsyncClientSubscriber.php +++ b/src/Subscribers/ClientSubscriber.php @@ -1,7 +1,7 @@ */ -class AsyncClientSubscriber implements EventDispatcher\EventSubscriberInterface +class ClientSubscriber implements EventDispatcher\EventSubscriberInterface { private Log\LoggerInterface $logger; public function __construct( + private readonly Utils\IdentifierGenerator $identifier, private readonly ExchangeEntities\EntityFactory $entityFactory, private readonly PsrEventDispatcher\EventDispatcherInterface|null $dispatcher = null, private readonly ExchangeConsumer\Consumer|null $consumer = null, @@ -64,7 +66,7 @@ public function handleMessage(Events\MessageReceived $event): void $this->dispatcher?->dispatch(new Events\BeforeMessageHandled($event->getPayload())); try { - $data = Utils\Json::decode($event->getPayload(), Utils\Json::FORCE_ARRAY); + $data = Nette\Utils\Json::decode($event->getPayload(), Nette\Utils\Json::FORCE_ARRAY); if ( is_array($data) @@ -75,7 +77,8 @@ public function handleMessage(Events\MessageReceived $event): void $this->handle( strval($data['source']), MetadataTypes\RoutingKey::get($data['routing_key']), - Utils\Json::encode($data['data']), + Nette\Utils\Json::encode($data['data']), + array_key_exists('sender_id', $data) ? $data['sender_id'] : null, ); } else { @@ -85,7 +88,7 @@ public function handleMessage(Events\MessageReceived $event): void 'type' => 'subscriber', ]); } - } catch (Utils\JsonException $ex) { + } catch (Nette\Utils\JsonException $ex) { // Log error action reason $this->logger->warning('Received message is not valid json', [ 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, @@ -107,12 +110,22 @@ private function handle( string $source, MetadataTypes\RoutingKey $routingKey, string $data, + string|null $senderId = null, ): void { if ($this->consumer === null) { return; } + if ($senderId === $this->identifier->getIdentifier()) { + $this->logger->debug('Received message published by itself', [ + 'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB, + 'type' => 'subscriber', + ]); + + return; + } + $source = $this->validateSource($source); if ($source === null) { diff --git a/src/Utils/IdentifierGenerator.php b/src/Utils/IdentifierGenerator.php new file mode 100644 index 0000000..5bbf929 --- /dev/null +++ b/src/Utils/IdentifierGenerator.php @@ -0,0 +1,46 @@ + + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Utils + * @since 0.61.0 + * + * @date 09.10.22 + */ + +namespace FastyBird\RedisDbExchangePlugin\Utils; + +use Nette; +use Ramsey\Uuid; + +/** + * Pub/Sub messages identifier + * + * @package FastyBird:RedisDbExchangePlugin! + * @subpackage Utils + * + * @author Adam Kadlec + */ +final class IdentifierGenerator +{ + + use Nette\SmartObject; + + private string $identifier; + + public function __construct() + { + $this->identifier = Uuid\Uuid::uuid4()->toString(); + } + + public function getIdentifier(): string + { + return $this->identifier; + } + +} diff --git a/tests/cases/unit/Connections/ConnectionTest.phpt b/tests/cases/unit/Connections/ConnectionTest.phpt index a92b433..40500ad 100644 --- a/tests/cases/unit/Connections/ConnectionTest.phpt +++ b/tests/cases/unit/Connections/ConnectionTest.phpt @@ -16,11 +16,10 @@ final class ConnectionTest extends BaseMockeryTestCase public function testDefaultValues(): void { - $config = new Connections\Connection('identifier', '127.0.0.1', 1_234, null, null); + $config = new Connections\Connection('127.0.0.1', 1_234, null, null); - Assert::same('identifier', $config->getIdentifier()); Assert::same('127.0.0.1', $config->getHost()); - Assert::same(1234, $config->getPort()); + Assert::same(1_234, $config->getPort()); Assert::null($config->getUsername()); Assert::null($config->getPassword()); } diff --git a/tests/cases/unit/DI/ExtensionTest.phpt b/tests/cases/unit/DI/ExtensionTest.phpt index 50761fa..1fedeb1 100644 --- a/tests/cases/unit/DI/ExtensionTest.phpt +++ b/tests/cases/unit/DI/ExtensionTest.phpt @@ -20,9 +20,9 @@ final class ExtensionTest extends BaseTestCase $container = $this->createContainer(); Assert::notNull($container->getByType(Client\Client::class, false)); - Assert::notNull($container->getByType(Client\AsyncClientFactory::class)); + Assert::notNull($container->getByType(Client\Factory::class)); - Assert::notNull($container->getByType(Subscribers\AsyncClientSubscriber::class)); + Assert::notNull($container->getByType(Subscribers\ClientSubscriber::class)); } } diff --git a/tests/cases/unit/Publisher/PublisherTest.phpt b/tests/cases/unit/Publisher/PublisherTest.phpt index c02355e..c871948 100644 --- a/tests/cases/unit/Publisher/PublisherTest.phpt +++ b/tests/cases/unit/Publisher/PublisherTest.phpt @@ -8,8 +8,9 @@ use FastyBird\Metadata\Entities as MetadataEntities; use FastyBird\Metadata\Types as MetadataTypes; use FastyBird\RedisDbExchangePlugin\Client; use FastyBird\RedisDbExchangePlugin\Publishers; +use FastyBird\RedisDbExchangePlugin\Utils; use Mockery; -use Nette\Utils; +use Nette; use Ninjify\Nunjuck\TestCase\BaseMockeryTestCase; use Tester\Assert; @@ -28,8 +29,10 @@ final class PublisherTest extends BaseMockeryTestCase $client = Mockery::mock(Client\Client::class); $client ->shouldReceive('publish') - ->withArgs(function ($data) use ($now): bool { - Assert::same(Utils\Json::encode([ + ->withArgs(function ($channel, $data) use ($now): bool { + Assert::same('exchange_channel', $channel); + + Assert::same(Nette\Utils\Json::encode([ 'sender_id' => 'redis_client_identifier', 'source' => MetadataTypes\ModuleSource::SOURCE_MODULE_DEVICES, 'routing_key' => MetadataTypes\RoutingKey::ROUTE_DEVICE_ENTITY_UPDATED, @@ -46,11 +49,6 @@ final class PublisherTest extends BaseMockeryTestCase return true; }) ->andReturn(true) - ->times(1) - ->getMock() - ->shouldReceive('getIdentifier') - ->withNoArgs() - ->andReturn('redis_client_identifier') ->times(1); $dateTimeFactory = Mockery::mock(DateTimeFactory\Factory::class); @@ -60,7 +58,14 @@ final class PublisherTest extends BaseMockeryTestCase ->andReturn($now) ->times(1); - $publisher = new Publishers\Publisher($client, $dateTimeFactory); + $identifierGenerator = Mockery::mock(Utils\IdentifierGenerator::class); + $identifierGenerator + ->shouldReceive('getIdentifier') + ->withNoArgs() + ->andReturn('redis_client_identifier') + ->times(1); + + $publisher = new Publishers\Publisher($identifierGenerator, 'exchange_channel', $client, $dateTimeFactory); $publisher->publish( MetadataTypes\ModuleSource::get(MetadataTypes\ModuleSource::SOURCE_MODULE_DEVICES), diff --git a/tests/common.neon b/tests/common.neon index c1aa97f..8238a70 100644 --- a/tests/common.neon +++ b/tests/common.neon @@ -19,8 +19,12 @@ extensions: fbExchange : FastyBird\Exchange\DI\ExchangeExtension ipubPhone : IPub\Phone\DI\PhoneExtension +services: + - { + create: React\EventLoop\Factory::create() + type: React\EventLoop\LoopInterface + } + fbRedisDbExchangePlugin: - enableAsync: true - connection: - default: - host: 127.0.0.1 + client: + host: 127.0.0.1 diff --git a/tests/stubs/Client.stub b/tests/stubs/Client.stub new file mode 100644 index 0000000..cfd02c7 --- /dev/null +++ b/tests/stubs/Client.stub @@ -0,0 +1,15 @@ +