Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for RedisURL according to IANA spec #56

Closed
wants to merge 10 commits into from
88 changes: 6 additions & 82 deletions lib/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,85 +2,23 @@

namespace Amp\Redis;

use Amp\Deferred;
use Amp\Promise;
use Amp\Uri\Uri;
use Exception;
use Amp\Uri\InvalidUriException;
use function Amp\call;

class Client extends Redis
{
/** @var Deferred[] */
private $deferreds;

/** @var Connection */
private $connection;

/** @var string */
private $password;

/** @var int */
private $database = 0;

/**
* @param string $uri
* @throws InvalidUriException
* @throws ConnectionConfigException
*/
public function __construct(string $uri)
{
$this->applyUri($uri);

$this->deferreds = [];
$this->connection = new Connection($uri);

$this->connection->addEventHandler("response", function ($response) {
$deferred = \array_shift($this->deferreds);

if (empty($this->deferreds)) {
$this->connection->setIdle(true);
}

if ($response instanceof Exception) {
$deferred->fail($response);
} else {
$deferred->resolve($response);
}
});

$this->connection->addEventHandler(["close", "error"], function ($error = null) {
if ($error) {
// Fail any outstanding promises
while ($this->deferreds) {
$deferred = \array_shift($this->deferreds);
$deferred->fail($error);
}
}
});

if (!empty($this->password)) {
$this->connection->addEventHandler("connect", function () {
// AUTH must be before any other command, so we unshift it last
\array_unshift($this->deferreds, new Deferred);

return "*2\r\n$4\r\rAUTH\r\n$" . \strlen($this->password) . "\r\n{$this->password}\r\n";
});
}

if ($this->database !== 0) {
$this->connection->addEventHandler("connect", function () {
// SELECT must be called for every new connection if another database than 0 is used
\array_unshift($this->deferreds, new Deferred);

return "*2\r\n$6\r\rSELECT\r\n$" . \strlen($this->database) . "\r\n{$this->database}\r\n";
});
}
}

private function applyUri(string $uri)
{
$uri = new Uri($uri);

$this->database = (int) ($uri->getQueryParameter("database") ?? 0);
$this->password = $uri->getQueryParameter("password") ?? null;
$this->connection = new Connection(ConnectionConfig::parse($uri));
}

/**
Expand All @@ -96,15 +34,7 @@ public function transaction(): Transaction
*/
public function close(): Promise
{
$promise = Promise\all(\array_map(function (Deferred $deferred) {
return $deferred->promise();
}, $this->deferreds));

$promise->onResolve(function () {
$this->connection->close();
});

return $promise;
return $this->connection->close();
}

/**
Expand All @@ -116,13 +46,7 @@ public function close(): Promise
protected function send(array $args, callable $transform = null): Promise
{
return call(function () use ($args, $transform) {
$deferred = new Deferred;
$promise = $deferred->promise();

$this->deferreds[] = $deferred;

yield $this->connection->send($args);
$response = yield $promise;
$response = yield $this->connection->send($args);

return $transform ? $transform($response) : $response;
});
Expand Down
117 changes: 81 additions & 36 deletions lib/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
use Amp\Socket\ClientConnectContext;
use Amp\Socket\Socket;
use Amp\Success;
use Amp\Uri\InvalidUriException;
use Amp\Uri\Uri;
use Exception;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\Socket\connect;
Expand All @@ -25,12 +24,6 @@ class Connection
/** @var RespParser */
private $parser;

/** @var string */
private $uri;

/** @var int */
private $timeout = 5000;

/** @var Socket */
private $socket;

Expand All @@ -40,17 +33,19 @@ class Connection
/** @var int */
private $state;

/** @var ConnectionConfig */
private $config;

/** @var Deferred[] */
private $deferreds;

/**
* @param string $uri
* @param ConnectionConfig $config
*/
public function __construct(string $uri)
public function __construct(ConnectionConfig $config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should roll ConnectionConfig::parse in here for now to keep it backward compatible.

{
if (\strpos($uri, "tcp://") !== 0 && \strpos($uri, "unix://") !== 0) {
throw new InvalidUriException("URI must start with tcp:// or unix://");
}

$this->applyUri($uri);

$this->deferreds = [];
$this->config = $config;
$this->state = self::STATE_DISCONNECTED;

$this->handlers = [
Expand All @@ -65,19 +60,53 @@ public function __construct(string $uri)
$handler($response);
}
});
}

private function applyUri(string $uri)
{
$uri = new Uri($uri);
$this->addEventHandler("response", function ($response) {
$deferred = \array_shift($this->deferreds);

if ($uri->getScheme() === "tcp") {
$this->uri = $uri->getScheme() . "://" . $uri->getHost() . ":" . $uri->getPort();
} else {
$this->uri = $uri->getScheme() . "://" . $uri->getPath();
if (empty($this->deferreds)) {
$this->setIdle(true);
}
if (!$deferred instanceof Deferred) {
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't happen, so it should throw instead of returning silently.


if ($response instanceof Exception) {
$deferred->fail($response);
} else {
$deferred->resolve($response);
}
});

$this->addEventHandler(["close", "error"], function ($error = null) {
if ($error) {
// Fail any outstanding promises
while ($this->deferreds) {
$deferred = \array_shift($this->deferreds);
$deferred->fail($error);
}
}
});

if ($this->config->hasPassword()) {
$this->addEventHandler("connect", function () {
// AUTH must be before any other command, so we unshift it last
\array_unshift($this->deferreds, new Deferred);
$password = $this->config->getPassword();

return "*2\r\n$4\r\rAUTH\r\n$" . \strlen($password) . "\r\n{$password}\r\n";
});
}

$this->timeout = $uri->getQueryParameter("timeout") ?? $this->timeout;
if ($this->config->getDatabase() !== 0) {
$this->addEventHandler("connect", function () {
// SELECT must be called for every new connection if another database than 0 is used
\array_unshift($this->deferreds, new Deferred);
$database = $this->config->getDatabase();

return "*2\r\n$6\r\rSELECT\r\n$" . \strlen($database) . "\r\n{$database}\r\n";
});
}
}

public function addEventHandler($event, callable $callback)
Expand All @@ -100,13 +129,16 @@ public function addEventHandler($event, callable $callback)
*/
public function send(array $strings): Promise
{
$deferred = new Deferred;
$this->deferreds[] = $deferred;

foreach ($strings as $string) {
if (!\is_scalar($string)) {
throw new \TypeError("All elements must be of type string or scalar and convertible to a string.");
}
}

return call(function () use ($strings) {
call(function () use ($strings) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value here shouldn't be ignored. I'd move return $deferred->promise(); into the closure.

$this->setIdle(false);

$payload = "";
Expand All @@ -118,6 +150,8 @@ public function send(array $strings): Promise
yield $this->connect();
yield $this->socket->write($payload);
});

return $deferred->promise();
}

private function connect(): Promise
Expand All @@ -135,7 +169,10 @@ private function connect(): Promise
$this->state = self::STATE_CONNECTING;
$this->connectPromisor = new Deferred;
$connectPromise = $this->connectPromisor->promise();
$socketPromise = connect($this->uri, (new ClientConnectContext)->withConnectTimeout($this->timeout));
$socketPromise = connect(
$this->config->getUri(),
(new ClientConnectContext)->withConnectTimeout($this->config->getTimeout())
);

$socketPromise->onResolve(function ($error, Socket $socket = null) {
$connectPromisor = $this->connectPromisor;
Expand Down Expand Up @@ -205,18 +242,26 @@ public function setIdle(bool $idle)

public function close()
{
$this->parser->reset();
$promise = Promise\all(\array_map(function (Deferred $deferred) {
return $deferred->promise();
}, $this->deferreds));

if ($this->socket) {
$this->socket->close();
$this->socket = null;
}
$promise->onResolve(function () {
$this->parser->reset();

foreach ($this->handlers["close"] as $handler) {
$handler();
}
if ($this->socket) {
$this->socket->close();
$this->socket = null;
}

$this->state = self::STATE_DISCONNECTED;
foreach ($this->handlers["close"] as $handler) {
$handler();
}

$this->state = self::STATE_DISCONNECTED;
});

return $promise;
}

public function getState(): int
Expand Down