Skip to content

Commit

Permalink
Refactoring consumer logic
Browse files Browse the repository at this point in the history
  • Loading branch information
akadlec committed Oct 9, 2021
1 parent a35b66d commit 17de5aa
Show file tree
Hide file tree
Showing 20 changed files with 667 additions and 390 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"clue/redis-react": "^2.5",
"contributte/console": "^0.9",
"fastybird/datetime-factory": "^0.2",
"fastybird/exchange-plugin": "^0.3.1",
"fastybird/exchange-plugin": "^0.4",
"fastybird/modules-metadata": "^0.7",
"fastybird/socket-server-factory": "^0.1",
"nette/bootstrap": "^3.0",
Expand Down
4 changes: 2 additions & 2 deletions redisdb_exchange_plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

"""
Redis DB exchange
Redis DB exchange plugin
"""

__version__ = "0.1.5"
__version__ = "0.2.0"
103 changes: 18 additions & 85 deletions src/Client/AsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@

namespace FastyBird\RedisDbExchangePlugin\Client;

use Closure;
use Clue\Redis\Protocol as RedisProtocol;
use FastyBird\ModulesMetadata\Types as ModulesMetadataTypes;
use FastyBird\RedisDbExchangePlugin\Connections;
use FastyBird\RedisDbExchangePlugin\Consumer;
use FastyBird\RedisDbExchangePlugin\Events;
use FastyBird\RedisDbExchangePlugin\Exceptions;
use Nette;
use Nette\Utils;
use Psr\EventDispatcher;
use Psr\Log;
use Ramsey\Uuid;
use React\EventLoop;
Expand All @@ -38,41 +36,12 @@
* @subpackage Client
*
* @author Adam Kadlec <adam.kadlec@fastybird.com>
*
* @method onOpen(IAsyncClient $client)
* @method onClose(IAsyncClient $client)
* @method onMessage(string $channel, string $payload, IAsyncClient $client)
* @method onPmessage(string $patern, string $channel, string $payload, IAsyncClient $client)
* @method onError(Throwable $ex, IAsyncClient $client)
* @method onBeforeConsumeMessage(string $payload)
* @method onAfterConsumeMessage(string $payload)
*/
class AsyncClient implements IAsyncClient
{

use Nette\SmartObject;

/** @var Closure[] */
public array $onOpen = [];

/** @var Closure[] */
public array $onClose = [];

/** @var Closure[] */
public array $onMessage = [];

/** @var Closure[] */
public array $onPmessage = [];

/** @var Closure[] */
public array $onError = [];

/** @var Closure[] */
public array $onBeforeConsumeMessage = [];

/** @var Closure[] */
public array $onAfterConsumeMessage = [];

/** @var string */
private string $channelName;

Expand All @@ -94,9 +63,6 @@ class AsyncClient implements IAsyncClient
/** @var Connections\IConnection */
private Connections\IConnection $connection;

/** @var Consumer\IConsumer */
private Consumer\IConsumer $consumer;

/** @var Promise\Deferred[] */
private array $requests = [];

Expand All @@ -109,6 +75,9 @@ class AsyncClient implements IAsyncClient
/** @var RedisProtocol\Serializer\SerializerInterface */
private RedisProtocol\Serializer\SerializerInterface $serializer;

/** @var EventDispatcher\EventDispatcherInterface */
private EventDispatcher\EventDispatcherInterface $eventDispatcher;

/** @var EventLoop\LoopInterface */
private EventLoop\LoopInterface $eventLoop;

Expand All @@ -118,14 +87,13 @@ class AsyncClient implements IAsyncClient
public function __construct(
string $channelName,
Connections\IConnection $connection,
Consumer\IConsumer $consumer,
EventLoop\LoopInterface $eventLoop,
EventDispatcher\EventDispatcherInterface $eventDispatcher,
?Log\LoggerInterface $logger = null
) {
$this->channelName = $channelName;

$this->connection = $connection;
$this->consumer = $consumer;

$factory = new RedisProtocol\Factory();

Expand All @@ -134,6 +102,8 @@ public function __construct(

$this->eventLoop = $eventLoop;

$this->eventDispatcher = $eventDispatcher;

$this->logger = $logger ?? new Log\NullLogger();

$this->identifier = Uuid\Uuid::uuid4()->toString();
Expand Down Expand Up @@ -166,14 +136,14 @@ public function connect(): Promise\ExtendedPromiseInterface
function (Socket\ConnectionInterface $stream) use ($deferred): void {
$this->stream = $stream;

$this->onOpen($this);
$this->eventDispatcher->dispatch(new Events\ConnectionOpenedEvent($this));

$deferred->resolve($this);
},
function (Throwable $ex) use ($deferred): void {
$this->isConnecting = false;

$this->onError($ex, $this);
$this->eventDispatcher->dispatch(new Events\ErrorEvent($ex, $this));

$deferred->reject($ex);
}
Expand Down Expand Up @@ -201,7 +171,7 @@ public function close(): void
$this->stream->close();
}

$this->onClose($this);
$this->eventDispatcher->dispatch(new Events\ConnectionClosedEvent($this));

// Reject all remaining requests in the queue
while ($this->requests) {
Expand Down Expand Up @@ -294,45 +264,6 @@ public function initialize(): void
->connect()
->then(function (AsyncClient $client): void {
$client->subscribe($this->channelName);

$client->onMessage[] = function (string $channel, string $payload) use ($client): void {
if ($channel === $this->channelName) {
$this->onBeforeConsumeMessage($payload);

try {
$data = Utils\ArrayHash::from(Utils\Json::decode($payload, Utils\Json::FORCE_ARRAY));

if (
$data->offsetExists('origin')
&& $data->offsetExists('routing_key')
&& $data->offsetExists('data')
) {
$this->consumer->consume(
ModulesMetadataTypes\ModuleOriginType::get($data->offsetGet('origin')),
ModulesMetadataTypes\RoutingKeyType::get($data->offsetGet('routing_key')),
$data->offsetGet('data')
);

} else {
// Log error action reason
$this->logger->warning('[FB:PLUGIN:REDISDB_EXCHANGE] Received message is not in valid format');
}
} catch (Utils\JsonException $ex) {
// Log error action reason
$this->logger->warning('[FB:PLUGIN:REDISDB_EXCHANGE] Received message is not valid json', [
'exception' => [
'message' => $ex->getMessage(),
'code' => $ex->getCode(),
],
]);

} catch (Exceptions\TerminateException $ex) {
$client->close();
}

$this->onAfterConsumeMessage($payload);
}
};
});

if ($promise instanceof Promise\ExtendedPromiseInterface) {
Expand Down Expand Up @@ -385,7 +316,7 @@ function (Socket\ConnectionInterface $stream) use ($deferred, $timer): void {
$models = $this->parser->pushIncoming($chunk);

} catch (RedisProtocol\Parser\ParserException $ex) {
$this->onError($ex, $this);
$this->eventDispatcher->dispatch(new Events\ErrorEvent($ex, $this));

$this->close();

Expand All @@ -397,7 +328,7 @@ function (Socket\ConnectionInterface $stream) use ($deferred, $timer): void {
$this->handleMessage($data);

} catch (UnderflowException $ex) {
$this->onError($ex, $this);
$this->eventDispatcher->dispatch(new Events\ErrorEvent($ex, $this));

$this->close();

Expand All @@ -411,7 +342,7 @@ function (Socket\ConnectionInterface $stream) use ($deferred, $timer): void {
});

$stream->on('error', function (Throwable $ex): void {
$this->onError($ex, $this);
$this->eventDispatcher->dispatch(new Events\ErrorEvent($ex, $this));
});

$deferred->resolve($stream);
Expand Down Expand Up @@ -440,7 +371,7 @@ private function handleMessage(RedisProtocol\Model\ModelInterface $message): voi
// Pub/Sub messages are to be forwarded and should not be processed as request responses
if ($type === 'message') {
if (isset($array[0]) && isset($array[1])) {
$this->onMessage($array[0], $array[1], $this);
$this->eventDispatcher->dispatch(new Events\MessageReceivedEvent($array[0], $array[1], $this));

return;

Expand All @@ -449,7 +380,9 @@ private function handleMessage(RedisProtocol\Model\ModelInterface $message): voi
}
} elseif ($type === 'pmessage') {
if (isset($array[0]) && isset($array[1]) && isset($array[2])) {
$this->onPmessage($array[0], $array[1], $array[2], $this);
$this->eventDispatcher->dispatch(
new Events\PatternMessageReceivedEvent($array[0], $array[1], $array[2], $this)
);

return;
} else {
Expand Down
156 changes: 0 additions & 156 deletions src/Consumer/ConsumerProxy.php

This file was deleted.

Loading

0 comments on commit 17de5aa

Please sign in to comment.