From 04543ab25edea7fb44de170e8461d2cbc5ef6e36 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Tue, 10 Sep 2019 08:33:52 +0200 Subject: [PATCH] Refactor connection handling --- src/ConnectException.php | 7 - src/Connection.php | 233 ---------------------------------- src/QueryExecutor.php | 11 ++ src/QueryExecutorFactory.php | 3 + src/RemoteExecutor.php | 166 +++++++++++------------- src/RemoteExecutorFactory.php | 8 +- src/RespSocket.php | 106 ++++++++++++++++ src/SocketException.php | 7 + src/SubscribeClient.php | 222 -------------------------------- src/Subscriber.php | 184 +++++++++++++++++++++++++++ src/functions.php | 54 ++++++++ test/AuthTest.php | 20 ++- test/CloseTest.php | 10 ++ test/DownTest.php | 6 +- test/IntegrationTest.php | 2 +- test/Mutex/MutexTest.php | 7 +- test/PubSubTest.php | 40 ++---- 17 files changed, 492 insertions(+), 594 deletions(-) delete mode 100644 src/ConnectException.php delete mode 100644 src/Connection.php create mode 100644 src/RespSocket.php create mode 100644 src/SocketException.php delete mode 100644 src/SubscribeClient.php create mode 100644 src/Subscriber.php diff --git a/src/ConnectException.php b/src/ConnectException.php deleted file mode 100644 index f377dfd..0000000 --- a/src/ConnectException.php +++ /dev/null @@ -1,7 +0,0 @@ -applyUri($uri); - - $this->state = self::STATE_DISCONNECTED; - - $this->handlers = [ - 'connect' => [], - 'response' => [], - 'error' => [], - 'close' => [], - ]; - - $this->parser = new RespParser(function ($response) { - foreach ($this->handlers['response'] as $handler) { - $handler($response); - } - }); - } - - public function addEventHandler($event, callable $callback): void - { - $events = (array) $event; - - /** @noinspection SuspiciousLoopInspection */ - foreach ($events as $event) { - if (!isset($this->handlers[$event])) { - throw new \Error('Unknown event: ' . $event); - } - - $this->handlers[$event][] = $callback; - } - } - - public function send(array $strings): Promise - { - foreach ($strings as $string) { - if (!\is_scalar($string)) { - throw new \TypeError('All elements must be of type string or scalar and convertible to a string.'); - } - } - - return call(function () use ($strings) { - $this->setIdle(false); - - $payload = ''; - foreach ($strings as $string) { - $payload .= '$' . \strlen($string) . "\r\n{$string}\r\n"; - } - $payload = '*' . \count($strings) . "\r\n{$payload}"; - - yield $this->connect(); - yield $this->socket->write($payload); - }); - } - - public function setIdle(bool $idle): void - { - if (!$this->socket) { - return; - } - - if ($idle) { - $this->socket->unreference(); - } else { - $this->socket->reference(); - } - } - - public function close(): void - { - $this->parser->reset(); - - if ($this->socket) { - $this->socket->close(); - $this->socket = null; - } - - foreach ($this->handlers['close'] as $handler) { - $handler(); - } - - $this->state = self::STATE_DISCONNECTED; - } - - public function getState(): int - { - return $this->state; - } - - public function __destruct() - { - $this->close(); - } - - private function applyUri(string $uri): void - { - $parts = Uri\parse($uri); - - $scheme = $parts['scheme'] ?? ''; - - if ($scheme === 'tcp') { - $this->uri = $scheme . '://' . ($parts['host'] ?? '') . ':' . ($parts['port'] ?? 0); - } else { - $this->uri = $scheme . '://' . ($parts['path'] ?? ''); - } - - $pairs = Internal\parseUriQuery($parts['query'] ?? ''); - - $this->timeout = $pairs['timeout'] ?? $this->timeout; - } - - private function connect(): Promise - { - // If we're in the process of connecting already return that same promise - if ($this->connectPromisor) { - return $this->connectPromisor->promise(); - } - - // If a socket exists we know we're already connected - if ($this->socket) { - return new Success; - } - - $this->state = self::STATE_CONNECTING; - $this->connectPromisor = new Deferred; - $connectPromise = $this->connectPromisor->promise(); - /** @noinspection PhpUnhandledExceptionInspection */ - $socketPromise = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout)); - - $socketPromise->onResolve(function ($error, Socket $socket = null) { - $connectPromisor = $this->connectPromisor; - $this->connectPromisor = null; - - if ($error) { - $this->state = self::STATE_DISCONNECTED; - - $connectException = new ConnectException( - 'Connection attempt failed', - $code = 0, - $error - ); - - $this->onError($connectException); - $connectPromisor->fail($connectException); - - return; - } - - $this->state = self::STATE_CONNECTED; - $this->socket = $socket; - - foreach ($this->handlers['connect'] as $handler) { - $pipelinedCommand = $handler(); - - if (!empty($pipelinedCommand)) { - $this->socket->write($pipelinedCommand); - } - } - - asyncCall(function () use ($socket) { - while (null !== $chunk = yield $socket->read()) { - $this->parser->append($chunk); - } - - $this->close(); - }); - - $connectPromisor->resolve(); - }); - - return $connectPromise; - } - - private function onError(\Throwable $exception): void - { - try { - foreach ($this->handlers['error'] as $handler) { - $handler($exception); - } - } finally { - $this->close(); - } - } -} diff --git a/src/QueryExecutor.php b/src/QueryExecutor.php index 404665f..6df5d0c 100644 --- a/src/QueryExecutor.php +++ b/src/QueryExecutor.php @@ -6,5 +6,16 @@ interface QueryExecutor { + /** + * @param string[] $query + * @param callable|null $responseTransform + * + * @return Promise + * + * @see toBool() + * @see toNull() + * @see toFloat() + * @see toMap() + */ public function execute(array $query, callable $responseTransform = null): Promise; } diff --git a/src/QueryExecutorFactory.php b/src/QueryExecutorFactory.php index 1b394d5..3805690 100644 --- a/src/QueryExecutorFactory.php +++ b/src/QueryExecutorFactory.php @@ -4,5 +4,8 @@ interface QueryExecutorFactory { + /** + * @return QueryExecutor New QueryExecutor instance. + */ public function createQueryExecutor(): QueryExecutor; } diff --git a/src/RemoteExecutor.php b/src/RemoteExecutor.php index e1afe45..2bde10b 100644 --- a/src/RemoteExecutor.php +++ b/src/RemoteExecutor.php @@ -2,97 +2,31 @@ namespace Amp\Redis; +use Amp\ByteStream\StreamException; use Amp\Deferred; use Amp\Promise; -use League\Uri; +use Amp\Socket; +use function Amp\asyncCall; use function Amp\call; final class RemoteExecutor implements QueryExecutor { /** @var Deferred[] */ - private $deferreds; + private $queue = []; - /** @var Connection */ - private $connection; - - /** @var string */ - private $password; + /** @var Config */ + private $config; /** @var int */ - private $database = 0; - - /** - * @param string $uri - */ - public function __construct(string $uri) - { - $this->applyUri($uri); - - $this->deferreds = []; - $this->connection = new Connection($uri); - - $this->connection->addEventHandler('response', function ($response) { - $deferred = \array_shift($this->deferreds); - - if (empty($this->deferreds)) { - $this->connection->setIdle(true); - } - - if ($response instanceof \Exception) { - $deferred->fail($response); - } else { - $deferred->resolve($response); - } - }); - - $this->connection->addEventHandler(['close', 'error'], function ($error = null) { - if ($error) { - // Fail any outstanding promises - while ($this->deferreds) { - $deferred = \array_shift($this->deferreds); - $deferred->fail($error); - } - } - }); - - if (!empty($this->password)) { - $this->connection->addEventHandler('connect', function () { - // AUTH must be before any other command, so we unshift it last - \array_unshift($this->deferreds, new Deferred); - - return "*2\r\n$4\r\rAUTH\r\n$" . \strlen($this->password) . "\r\n{$this->password}\r\n"; - }); - } + private $database; - if ($this->database !== 0) { - $this->connection->addEventHandler('connect', function () { - // SELECT must be called for every new connection if another database than 0 is used - \array_unshift($this->deferreds, new Deferred); + /** @var Promise|null */ + private $connect; - return "*2\r\n$6\r\rSELECT\r\n$" . \strlen($this->database) . "\r\n{$this->database}\r\n"; - }); - } - } - - /** - * @return Promise - */ - public function close(): Promise + public function __construct(Config $config) { - $promise = Promise\all(\array_map(static function (Deferred $deferred) { - return $deferred->promise(); - }, $this->deferreds)); - - $promise->onResolve(function () { - $this->connection->close(); - }); - - return $promise; - } - - public function getConnectionState(): int - { - return $this->connection->getState(); + $this->config = $config; + $this->database = $config->getDatabase(); } /** @@ -106,29 +40,83 @@ public function execute(array $args, callable $transform = null): Promise return call(function () use ($args, $transform) { $command = \strtolower($args[0] ?? ''); - $deferred = new Deferred; - $promise = $deferred->promise(); + $connectPromise = $this->connect(); + if ($command === 'quit') { + $this->connect = null; + } - $this->deferreds[] = $deferred; + /** @var RespSocket $resp */ + $resp = yield $connectPromise; - yield $this->connection->send($args); - $response = yield $promise; + $response = yield $this->enqueue($resp, ...$args); if ($command === 'select') { $this->database = (int) $args[1]; - } elseif ($command === 'quit') { - $this->connection->close(); } return $transform ? $transform($response) : $response; }); } - private function applyUri(string $uri): void + private function enqueue(RespSocket $resp, string... $args): Promise { - $pairs = Internal\parseUriQuery(Uri\parse($uri)['query'] ?? ''); + return call(function () use ($resp, $args) { + $deferred = new Deferred; + $this->queue[] = $deferred; + + $resp->reference(); - $this->database = (int) ($pairs['database'] ?? 0); - $this->password = $pairs['password'] ?? null; + try { + yield $resp->write(...$args); + } catch (Socket\SocketException | StreamException $exception) { + throw new SocketException($exception); + } + + return $deferred->promise(); + }); + } + + private function connect(): Promise + { + if ($this->connect) { + return $this->connect; + } + + return $this->connect = call(function () { + /** @var RespSocket $resp */ + $resp = yield connect($this->config); + + asyncCall(function () use ($resp) { + try { + while ([$response] = yield $resp->read()) { + $deferred = \array_shift($this->queue); + if (!$this->queue) { + $resp->unreference(); + } + + if ($response instanceof \Throwable) { + $deferred->fail($response); + } else { + $deferred->resolve($response); + } + } + + throw new SocketException('Socket to redis instance (' . $this->config->getUri() . ') closed unexpectedly'); + } catch (\Throwable $error) { + $queue = $this->queue; + $this->queue = []; + $this->connect = null; + + while ($queue) { + $deferred = \array_shift($queue); + $deferred->fail($error); + } + + throw $error; + } + }); + + return $resp; + }); } } diff --git a/src/RemoteExecutorFactory.php b/src/RemoteExecutorFactory.php index 25b9b90..c39d397 100644 --- a/src/RemoteExecutorFactory.php +++ b/src/RemoteExecutorFactory.php @@ -4,15 +4,15 @@ final class RemoteExecutorFactory implements QueryExecutorFactory { - private $uri; + private $config; - public function __construct(string $uri) + public function __construct(Config $config) { - $this->uri = $uri; + $this->config = $config; } public function createQueryExecutor(): QueryExecutor { - return new RemoteExecutor($this->uri); + return new RemoteExecutor($this->config); } } diff --git a/src/RespSocket.php b/src/RespSocket.php new file mode 100644 index 0000000..9eed64c --- /dev/null +++ b/src/RespSocket.php @@ -0,0 +1,106 @@ +backpressure; + $backpressure = new Success; + + $this->socket = $socket; + $this->iterator = $emitter->iterate(); + $this->parser = new RespParser(static function ($message) use ($emitter, &$backpressure) { + $backpressure = $emitter->emit($message); + }); + + asyncCall(function () use ($socket) { + while (null !== $chunk = yield $socket->read()) { + $this->parser->append($chunk); + yield $this->backpressure; + } + + $this->close(); + }); + } + + public function read(): Promise + { + return call(function () { + if (yield $this->iterator->advance()) { + return [$this->iterator->getCurrent()]; + } + + return null; + }); + } + + public function write(string ...$args): Promise + { + return call(function () use ($args) { + $payload = ''; + foreach ($args as $arg) { + $payload .= '$' . \strlen($arg) . "\r\n{$arg}\r\n"; + } + $payload = '*' . \count($args) . "\r\n{$payload}"; + + yield $this->socket->write($payload); + }); + } + + public function reference(): void + { + if ($this->socket) { + $this->socket->reference(); + } + } + + public function unreference(): void + { + if ($this->socket) { + $this->socket->unreference(); + } + } + + public function close(): void + { + $this->parser->reset(); + + if ($this->socket) { + $this->socket->close(); + $this->socket = null; + } + } + + public function isClosed(): bool + { + return $this->socket === null; + } + + public function __destruct() + { + $this->close(); + } +} diff --git a/src/SocketException.php b/src/SocketException.php new file mode 100644 index 0000000..20a7fc1 --- /dev/null +++ b/src/SocketException.php @@ -0,0 +1,7 @@ +applyUri($uri); - - $this->connection = new Connection($uri); - $this->connection->addEventHandler('response', function ($response) { - if ($this->authDeferred) { - if ($response instanceof \Exception) { - $this->authDeferred->fail($response); - } else { - $this->authDeferred->resolve($response); - } - - $this->authDeferred = null; - - return; - } - - switch ($response[0]) { - case 'message': - foreach ($this->emitters[$response[1]] as $emitter) { - $emitter->emit($response[2]); - } - - break; - - case 'pmessage': - foreach ($this->patternEmitters[$response[1]] as $emitter) { - $emitter->emit([$response[3], $response[2]]); - } - - break; - } - }); - - $this->connection->addEventHandler('error', function ($error) { - if ($error) { - // Fail any outstanding promises - if ($this->authDeferred) { - $this->authDeferred->fail($error); - } - - while ($this->emitters) { - /** @var Emitter[] $emitterGroup */ - $emitterGroup = \array_shift($this->emitters); - - while ($emitterGroup) { - $emitter = \array_shift($emitterGroup); - $emitter->fail($error); - } - } - - while ($this->patternEmitters) { - /** @var Emitter[] $emitterGroup */ - $emitterGroup = \array_shift($this->patternEmitters); - - while ($emitterGroup) { - $emitter = \array_shift($emitterGroup); - $emitter->fail($error); - } - } - } - }); - - if (!empty($this->password)) { - $this->connection->addEventHandler('connect', function () { - // AUTH must be before any other command, so we unshift it here - $this->authDeferred = new Deferred; - - return "*2\r\n$4\r\rAUTH\r\n$" . \strlen($this->password) . "\r\n{$this->password}\r\n"; - }); - } - } - - public function close(): void - { - $this->connection->close(); - } - - /** - * @param string $channel - * - * @return Promise - */ - public function subscribe(string $channel): Promise - { - return call(function () use ($channel) { - yield $this->connection->send(['subscribe', $channel]); - - $emitter = new Emitter; - $this->emitters[$channel][\spl_object_hash($emitter)] = $emitter; - - return new Subscription($emitter->iterate(), function () use ($emitter, $channel) { - $this->unloadEmitter($emitter, $channel); - }); - }); - } - - /** - * @param string $pattern - * - * @return Promise - */ - public function pSubscribe(string $pattern): Promise - { - return call(function () use ($pattern) { - yield $this->connection->send(['psubscribe', $pattern]); - - $emitter = new Emitter; - $this->patternEmitters[$pattern][\spl_object_hash($emitter)] = $emitter; - - return new Subscription($emitter->iterate(), function () use ($emitter, $pattern) { - $this->unloadPatternEmitter($emitter, $pattern); - }); - }); - } - - public function getConnectionState(): int - { - return $this->connection->getState(); - } - - private function applyUri(string $uri): void - { - $parts = Internal\parseUriQuery(Uri\parse($uri)['query'] ?? ''); - $this->password = $parts['password'] ?? null; - } - - private function unloadEmitter(Emitter $emitter, string $channel): void - { - $hash = \spl_object_hash($emitter); - - if (isset($this->emitters[$channel][$hash])) { - unset($this->emitters[$channel][$hash]); - - $emitter->complete(); - - if (empty($this->emitters[$channel])) { - unset($this->emitters[$channel]); - $this->unsubscribe($channel); - - if (!$this->emitters && !$this->patternEmitters) { - $this->connection->setIdle(true); - } - } - } - } - - private function unsubscribe(string $channel = null): Promise - { - if ($channel === null) { - // either unsubscribe succeeds and an unsubscribe message - // will be sent for every channel or promises will fail - // because of a dead connection. - return $this->connection->send(['unsubscribe']); - } - - return $this->connection->send(['unsubscribe', $channel]); - } - - private function unloadPatternEmitter(Emitter $emitter, string $pattern): void - { - $hash = \spl_object_hash($emitter); - - if (isset($this->patternEmitters[$pattern][$hash])) { - unset($this->patternEmitters[$pattern][$hash]); - - $emitter->complete(); - - if (empty($this->patternEmitters[$pattern])) { - unset($this->patternEmitters[$pattern]); - $this->pUnsubscribe($pattern); - - if (!$this->emitters && !$this->patternEmitters) { - $this->connection->setIdle(true); - } - } - } - } - - private function pUnsubscribe(string $pattern = null): Promise - { - if ($pattern === null) { - // either unsubscribe succeeds and an unsubscribe message - // will be sent for every channel or promises will fail - // because of a dead connection. - return $this->connection->send(['punsubscribe']); - } - - return $this->connection->send(['punsubscribe', $pattern]); - } -} diff --git a/src/Subscriber.php b/src/Subscriber.php new file mode 100644 index 0000000..ea87381 --- /dev/null +++ b/src/Subscriber.php @@ -0,0 +1,184 @@ +config = $config; + } + + /** + * @param string $channel + * + * @return Promise + */ + public function subscribe(string $channel): Promise + { + return call(function () use ($channel) { + /** @var RespSocket $resp */ + $resp = yield $this->connect(); + yield $resp->write('subscribe', $channel); + + $emitter = new Emitter; + $this->emitters[$channel][\spl_object_hash($emitter)] = $emitter; + + return new Subscription($emitter->iterate(), function () use ($emitter, $channel) { + $this->unloadEmitter($emitter, $channel); + }); + }); + } + + /** + * @param string $pattern + * + * @return Promise + */ + public function subscribeToPattern(string $pattern): Promise + { + return call(function () use ($pattern) { + /** @var RespSocket $resp */ + $resp = yield $this->connect(); + yield $resp->write('psubscribe', $pattern); + + $emitter = new Emitter; + $this->patternEmitters[$pattern][\spl_object_hash($emitter)] = $emitter; + + return new Subscription($emitter->iterate(), function () use ($emitter, $pattern) { + $this->unloadPatternEmitter($emitter, $pattern); + }); + }); + } + + private function connect(): Promise + { + if ($this->connect) { + return $this->connect; + } + + return $this->connect = call(function () { + /** @var RespSocket $resp */ + $resp = yield connect($this->config); + + asyncCall(function () use ($resp) { + try { + while ([$response] = yield $resp->read()) { + switch ($response[0]) { + case 'message': + $backpressure = []; + foreach ($this->emitters[$response[1]] as $emitter) { + $backpressure[] = $emitter->emit($response[2]); + } + yield Promise\any($backpressure); + + break; + + case 'pmessage': + $backpressure = []; + foreach ($this->patternEmitters[$response[1]] as $emitter) { + $backpressure[] = $emitter->emit([$response[3], $response[2]]); + } + yield Promise\any($backpressure); + + break; + } + } + + throw new SocketException('Socket to redis instance (' . $this->config->getUri() . ') closed unexpectedly'); + } catch (\Throwable $error) { + $emitters = \array_merge($this->emitters, $this->patternEmitters); + + $this->connect = null; + $this->emitters = []; + $this->patternEmitters = []; + + foreach ($emitters as $emitterGroup) { + foreach ($emitterGroup as $emitter) { + $emitter->fail($error); + } + } + + throw $error; + } + }); + + return $resp; + }); + } + + private function unloadEmitter(Emitter $emitter, string $channel): void + { + $hash = \spl_object_hash($emitter); + + if (isset($this->emitters[$channel][$hash])) { + unset($this->emitters[$channel][$hash]); + + $emitter->complete(); + + if (empty($this->emitters[$channel])) { + unset($this->emitters[$channel]); + + asyncCall(function () use ($channel) { + try { + /** @var RespSocket $resp */ + $resp = yield $this->connect(); + $resp->write('unsubscribe', $channel); + + if (!$this->emitters && !$this->patternEmitters) { + $resp->unreference(); + } + } catch (RedisException $exception) { + // if there's an exception, the unsubscribe is implicitly successful, because the connection broke + } + }); + } + } + } + + private function unloadPatternEmitter(Emitter $emitter, string $pattern): void + { + $hash = \spl_object_hash($emitter); + + if (isset($this->patternEmitters[$pattern][$hash])) { + unset($this->patternEmitters[$pattern][$hash]); + + $emitter->complete(); + + if (empty($this->patternEmitters[$pattern])) { + unset($this->patternEmitters[$pattern]); + + asyncCall(function () use ($pattern) { + try { + /** @var RespSocket $resp */ + $resp = yield $this->connect(); + $resp->write('punsubscribe', $pattern); + + if (!$this->emitters && !$this->patternEmitters) { + $resp->unreference(); + } + } catch (RedisException $exception) { + // if there's an exception, the unsubscribe is implicitly successful, because the connection broke + } + }); + } + } + } +} diff --git a/src/functions.php b/src/functions.php index d116d97..6bc2f58 100644 --- a/src/functions.php +++ b/src/functions.php @@ -2,6 +2,11 @@ namespace Amp\Redis; +use Amp\Promise; +use Amp\Socket; +use Amp\Socket\ConnectContext; +use function Amp\call; + const toFloat = __NAMESPACE__ . '\toFloat'; const toBool = __NAMESPACE__ . '\toBool'; const toNull = __NAMESPACE__ . '\toNull'; @@ -45,3 +50,52 @@ function toNull($response): void { // nothing to do } + +/** + * @param Config $config + * + * @return Promise + * + * @throws RedisException + */ +function connect(Config $config): Promise +{ + return call(static function () use ($config) { + try { + $connectContext = (new ConnectContext)->withConnectTimeout($config->getTimeout()); + $resp = new RespSocket(yield Socket\connect($config->getUri(), $connectContext)); + } catch (Socket\SocketException $e) { + throw new SocketException( + 'Failed to connect to redis instance (' . $config->getUri() . ')', + 0, + $e + ); + } + + $promises = []; + + if ($config->hasPassword()) { + // pipeline, don't await + $promises[] = $resp->write('AUTH', $config->getPassword()); + } + + if ($config->getDatabase() !== 0) { + // pipeline, don't await + $promises[] = $resp->write('SELECT', $config->getDatabase()); + } + + foreach ($promises as $promise) { + yield $promise; + + if ([$response] = yield $resp->read()) { + if ($response instanceof \Throwable) { + throw $response; + } + } else { + throw new RedisException('Failed to connect to redis instance (' . $config->getUri() . ')'); + } + } + + return $resp; + }); +} diff --git a/test/AuthTest.php b/test/AuthTest.php index 4534aa6..6c55ddc 100644 --- a/test/AuthTest.php +++ b/test/AuthTest.php @@ -1,4 +1,4 @@ -assertEquals('PONG', yield $redis->echo('PONG')); - $redis->quit(); + $redis = new Redis(new RemoteExecutor(Config::fromUri('tcp://127.0.0.1:25325?password=secret'))); + $this->assertSame('PONG', yield $redis->echo('PONG')); + yield $redis->quit(); + } + + public function testFailure(): \Generator + { + $redis = new Redis(new RemoteExecutor(Config::fromUri('tcp://127.0.0.1:25325?password=wrong'))); + + $this->expectException(QueryException::class); + $this->expectExceptionMessage('ERR invalid password'); + + yield $redis->echo('PONG'); } } diff --git a/test/CloseTest.php b/test/CloseTest.php index e4dd156..4a24609 100644 --- a/test/CloseTest.php +++ b/test/CloseTest.php @@ -11,4 +11,14 @@ public function testReconnect(): \Generator yield $redis->quit(); $this->assertEquals('PONG', yield $redis->echo('PONG')); } + + public function testReconnect2(): \Generator + { + $redis = $this->createInstance(); + $this->assertEquals('PONG', yield $redis->echo('PONG')); + $quitPromise = $redis->quit(); + $promise = $redis->echo('PONG'); + yield $quitPromise; + $this->assertEquals('PONG', yield $promise); + } } diff --git a/test/DownTest.php b/test/DownTest.php index 97c63df..8b93d5d 100644 --- a/test/DownTest.php +++ b/test/DownTest.php @@ -1,4 +1,4 @@ -expectException(ConnectException::class); + $this->expectException(SocketException::class); - $redis = new Redis(new RemoteExecutor('tcp://127.0.0.1:25325')); + $redis = new Redis(new RemoteExecutor(Config::fromUri('tcp://127.0.0.1:25325'))); yield $redis->ping(); } } diff --git a/test/IntegrationTest.php b/test/IntegrationTest.php index 05cdf17..424d4f3 100644 --- a/test/IntegrationTest.php +++ b/test/IntegrationTest.php @@ -26,7 +26,7 @@ protected function setUp(): void final protected function createInstance(): Redis { - return new Redis(new RemoteExecutor($this->getUri())); + return new Redis(new RemoteExecutor(Config::fromUri($this->getUri()))); } final protected function getUri(): ?string diff --git a/test/Mutex/MutexTest.php b/test/Mutex/MutexTest.php index 2b9b635..d18a003 100644 --- a/test/Mutex/MutexTest.php +++ b/test/Mutex/MutexTest.php @@ -3,6 +3,7 @@ namespace Amp\Redis\Mutex; use Amp\Delayed; +use Amp\Redis\Config; use Amp\Redis\IntegrationTest; use Amp\Redis\RemoteExecutorFactory; @@ -10,7 +11,7 @@ class MutexTest extends IntegrationTest { public function testTimeout(): \Generator { - $mutex = new Mutex(new RemoteExecutorFactory($this->getUri())); + $mutex = new Mutex(new RemoteExecutorFactory(Config::fromUri($this->getUri()))); yield $mutex->lock('foo1', '123456789'); @@ -28,7 +29,7 @@ public function testTimeout(): \Generator public function testFree(): \Generator { - $mutex = new Mutex(new RemoteExecutorFactory($this->getUri())); + $mutex = new Mutex(new RemoteExecutorFactory(Config::fromUri($this->getUri()))); yield $mutex->lock('foo2', '123456789'); @@ -47,7 +48,7 @@ public function testFree(): \Generator public function testRenew(): \Generator { - $mutex = new Mutex(new RemoteExecutorFactory($this->getUri())); + $mutex = new Mutex(new RemoteExecutorFactory(Config::fromUri($this->getUri()))); yield $mutex->lock('foo3', '123456789'); diff --git a/test/PubSubTest.php b/test/PubSubTest.php index c289972..0de1685 100644 --- a/test/PubSubTest.php +++ b/test/PubSubTest.php @@ -9,7 +9,7 @@ class PubSubTest extends IntegrationTest { public function testBasic(): \Generator { - $subscriber = new SubscribeClient($this->getUri()); + $subscriber = new Subscriber(Config::fromUri($this->getUri())); /** @var Subscription $subscription */ $subscription = yield $subscriber->subscribe('foo'); @@ -31,7 +31,7 @@ public function testBasic(): \Generator public function testDoubleCancel(): \Generator { - $subscriber = new SubscribeClient($this->getUri()); + $subscriber = new Subscriber(Config::fromUri($this->getUri())); /** @var Subscription $subscription */ $subscription = yield $subscriber->subscribe('foo'); @@ -43,50 +43,36 @@ public function testDoubleCancel(): \Generator public function testMulti(): \Generator { - $subscriber = new SubscribeClient($this->getUri()); + $subscriber = new Subscriber(Config::fromUri($this->getUri())); /** @var Subscription $subscription1 */ $subscription1 = yield $subscriber->subscribe('foo'); /** @var Subscription $subscription2 */ $subscription2 = yield $subscriber->subscribe('foo'); - $result1 = $result2 = null; - - $subscription1->advance()->onResolve(function ($error) use (&$result1, $subscription1) { - $this->assertNull($error); - $result1 = $subscription1->getCurrent(); - }); - - $subscription2->advance()->onResolve(function ($error) use (&$result2, $subscription2) { - $this->assertNull($error); - $result2 = $subscription2->getCurrent(); - }); - yield $this->redis->publish('foo', 'bar'); - yield new Delayed(1000); - $this->assertEquals('bar', $result1); - $this->assertEquals('bar', $result2); + yield $subscription1->advance(); + yield $subscription2->advance(); - $subscription1->cancel(); + $this->assertEquals('bar', $subscription1->getCurrent()); + $this->assertEquals('bar', $subscription2->getCurrent()); - $subscription2->advance()->onResolve(function ($error) use (&$result2, $subscription2) { - $this->assertNull($error); - $result2 = $subscription2->getCurrent(); - }); + $subscription1->cancel(); yield $this->redis->publish('foo', 'xxx'); - yield new Delayed(1000); - $this->assertEquals('bar', $result1); - $this->assertEquals('xxx', $result2); + yield $subscription2->advance(); + + $this->assertEquals('bar', $subscription1->getCurrent()); + $this->assertEquals('xxx', $subscription2->getCurrent()); $subscription2->cancel(); } public function testStream(): \Generator { - $subscriber = new SubscribeClient($this->getUri()); + $subscriber = new Subscriber(Config::fromUri($this->getUri())); /** @var Subscription $subscription */ $subscription = yield $subscriber->subscribe('foo');