Skip to content

Commit

Permalink
Refactoring async client
Browse files Browse the repository at this point in the history
  • Loading branch information
akadlec committed Oct 9, 2022
1 parent c5b710e commit 4e92665
Show file tree
Hide file tree
Showing 18 changed files with 547 additions and 213 deletions.
2 changes: 2 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ parameters:
stubFiles:
- tests/stubs/PromiseInterface.stub
- tests/stubs/ExtendedPromiseInterface.stub
- tests/stubs/Client.stub
- tests/stubs/EventEmitterInterface.stub
82 changes: 0 additions & 82 deletions src/Client/AsyncClientFactory.php

This file was deleted.

14 changes: 3 additions & 11 deletions src/Client/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ class Client
/** @var Predis\Client<mixed> */
private Predis\Client $redis;

public function __construct(
private readonly string $channelName,
private readonly Connections\Connection $connection,
)
public function __construct(Connections\Connection $connection)
{
$options = [
'scheme' => 'tcp',
Expand All @@ -60,18 +57,13 @@ public function __construct(
$this->redis = new Predis\Client($options);
}

public function publish(string $content): bool
public function publish(string $channel, string $content): bool
{
/** @var mixed $response */
$response = $this->redis->publish($this->channelName, $content);
$response = $this->redis->publish($channel, $content);
assert(is_int($response) || $response instanceof PredisResponse\ResponseInterface);

return !$response instanceof PredisResponse\ErrorInterface;
}

public function getIdentifier(): string
{
return $this->connection->getIdentifier();
}

}
94 changes: 94 additions & 0 deletions src/Client/Factory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?php declare(strict_types = 1);

/**
* AsyncClient.php
*
* @license More in license.md
* @copyright https://www.fastybird.com
* @author Adam Kadlec <adam.kadlec@fastybird.com>
* @package FastyBird:RedisDbExchangePlugin!
* @subpackage Client
* @since 0.61.0
*
* @date 09.10.22
*/

namespace FastyBird\RedisDbExchangePlugin\Client;

use Clue\React\Redis;
use FastyBird\RedisDbExchangePlugin\Connections;
use FastyBird\RedisDbExchangePlugin\Events;
use FastyBird\RedisDbExchangePlugin\Publishers;
use Psr\EventDispatcher;
use React\EventLoop;
use React\Promise;
use React\Socket;
use Throwable;

final class Factory
{

public function __construct(
private readonly string $channel,
private readonly Connections\Connection $connection,
private readonly Publishers\Publisher $publisher,
private readonly EventDispatcher\EventDispatcherInterface|null $dispatcher = null,
)
{
}

public function create(
EventLoop\LoopInterface|null $eventLoop = null,
Socket\ConnectorInterface|null $connector = null,
): Promise\PromiseInterface
{
$factory = new Redis\Factory($eventLoop, $connector);

$deferred = new Promise\Deferred();

$factory->createClient($this->connection->getHost() . ':' . $this->connection->getPort())
->then(
function (Redis\Client $redis) use ($deferred): void {
$this->publisher->setAsyncClient($redis);

$redis->subscribe($this->channel);

$redis->on('message', function (string $channel, string $payload) use ($redis): void {
$this->dispatcher?->dispatch(
new Events\MessageReceived($channel, $payload, $redis),
);
});

$redis->on(
'pmessage',
function (string $pattern, string $channel, string $payload) use ($redis): void {
$this->dispatcher?->dispatch(
new Events\PatternMessageReceived(
$pattern,
$channel,
$payload,
$redis,
),
);
},
);

$redis->on('close', function () use ($redis): void {
$this->dispatcher?->dispatch(new Events\ConnectionClosed($redis));
});

$redis->on('error', function (Throwable $ex) use ($redis): void {
$this->dispatcher?->dispatch(new Events\Error($ex, $redis));
});

$deferred->resolve($redis);
},
static function (Throwable $ex) use ($deferred): void {
$deferred->reject($ex);
},
);

return $deferred->promise();
}

}
130 changes: 130 additions & 0 deletions src/Commands/RedisClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
<?php declare(strict_types = 1);

/**
* RedisClient.php
*
* @license More in LICENSE.md
* @copyright https://www.fastybird.com
* @author Adam Kadlec <adam.kadlec@fastybird.com>
* @package FastyBird:RedisDbExchangePlugin!
* @subpackage Commands
* @since 0.61.0
*
* @date 09.10.22
*/

namespace FastyBird\RedisDbExchangePlugin\Commands;

use FastyBird\Metadata\Types as MetadataTypes;
use FastyBird\RedisDbExchangePlugin\Client;
use FastyBird\RedisDbExchangePlugin\Events;
use FastyBird\RedisDbExchangePlugin\Exceptions;
use Nette;
use Psr\EventDispatcher;
use Psr\Log;
use React\EventLoop;
use Symfony\Component\Console;
use Symfony\Component\Console\Input;
use Symfony\Component\Console\Output;
use Throwable;

/**
* Redis client command
*
* @package FastyBird:RedisDbExchangePlugin!
* @subpackage Commands
*
* @author Adam Kadlec <adam.kadlec@fastybird.com>
*/
class RedisClient extends Console\Command\Command
{

use Nette\SmartObject;

public const NAME = 'fb:redis-client:start';

private Log\LoggerInterface $logger;

public function __construct(
private readonly Client\Factory $clientFactory,
private readonly EventLoop\LoopInterface $eventLoop,
private readonly EventDispatcher\EventDispatcherInterface|null $dispatcher = null,
Log\LoggerInterface|null $logger = null,
string|null $name = null,
)
{
parent::__construct($name);

$this->logger = $logger ?? new Log\NullLogger();
}

protected function configure(): void
{
parent::configure();

$this
->setName(self::NAME)
->setDescription('Start redis client.');
}

protected function execute(
Input\InputInterface $input,
Output\OutputInterface $output,
): int
{
$this->logger->info(
'Launching Redis client',
[
'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB,
'type' => 'command',
],
);

try {
$this->dispatcher?->dispatch(new Events\Startup());

$this->clientFactory->create($this->eventLoop);

$this->eventLoop->run();

} catch (Exceptions\Terminate $ex) {
// Log error action reason
$this->logger->error(
'Redis client was forced to close',
[
'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB,
'type' => 'command',
'exception' => [
'message' => $ex->getMessage(),
'code' => $ex->getCode(),
],
'cmd' => $this->getName(),
],
);

$this->eventLoop->stop();

} catch (Throwable $ex) {
// Log error action reason
$this->logger->error(
'An unhandled error occurred. Stopping Redis client',
[
'source' => MetadataTypes\PluginSource::SOURCE_PLUGIN_EXCHANGE_REDISDB,
'type' => 'command',
'exception' => [
'message' => $ex->getMessage(),
'code' => $ex->getCode(),
],
'cmd' => $this->getName(),
],
);

$this->eventLoop->stop();

return self::FAILURE;
}

return self::SUCCESS;
}

}
6 changes: 0 additions & 6 deletions src/Connections/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ final class Connection
use Nette\SmartObject;

public function __construct(
private readonly string $identifier,
private readonly string $host = '127.0.0.1',
private readonly int $port = 6_379,
private readonly string|null $username = null,
Expand Down Expand Up @@ -60,9 +59,4 @@ public function getPassword(): string|null
return $this->password;
}

public function getIdentifier(): string
{
return $this->identifier;
}

}
Loading

0 comments on commit 4e92665

Please sign in to comment.