Skip to content

Commit

Permalink
feature #28713 [Cache] added support for connecting to Redis clusters…
Browse files Browse the repository at this point in the history
… via DSN (nicolas-grekas)

This PR was merged into the 4.2-dev branch.

Discussion
----------

[Cache] added support for connecting to Redis clusters via DSN

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | -
| License       | MIT
| Doc PR        | -

Replaces #28300 and #28175

This PR allows configuring a cluster of Redis servers using all available options of either the phpredis extension or the Predis package:
- the `redis_cluster=0/1` boolean option configures whether the client should use the Redis cluster protocol;
- several hosts can be provided using a syntax very similar to #28598, enabling consistent hashing distribution of keys;
- `failover=error/distribute/slaves` can be set to direct reads at slave servers;
- extra options are passed as is to the driver (e.g. `profile=2.8`)
- Predis per-server settings are also possible, using e.g. `host[localhost][alias]=foo` in the query string, or `host[localhost]=alias%3Dfoo` (ie PHP query arrays or urlencoded key/value pairs)

Commits
-------

a42e877 [Cache] added support for connecting to Redis clusters via DSN
  • Loading branch information
fabpot committed Oct 10, 2018
2 parents 493c13a + a42e877 commit 620094a
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 60 deletions.
2 changes: 1 addition & 1 deletion composer.json
Expand Up @@ -99,7 +99,7 @@
"doctrine/doctrine-bundle": "~1.4",
"monolog/monolog": "~1.11",
"ocramius/proxy-manager": "~0.4|~1.0|~2.0",
"predis/predis": "~1.0",
"predis/predis": "~1.1",
"egulias/email-validator": "~1.2,>=1.2.8|~2.0",
"symfony/phpunit-bridge": "~3.4|~4.0",
"symfony/security-acl": "~2.8|~3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Cache/Adapter/AbstractAdapter.php
Expand Up @@ -136,7 +136,7 @@ public static function createConnection($dsn, array $options = array())
if (!\is_string($dsn)) {
throw new InvalidArgumentException(sprintf('The %s() method expect argument #1 to be string, %s given.', __METHOD__, \gettype($dsn)));
}
if (0 === strpos($dsn, 'redis://')) {
if (0 === strpos($dsn, 'redis:')) {
return RedisAdapter::createConnection($dsn, $options);
}
if (0 === strpos($dsn, 'memcached:')) {
Expand Down
3 changes: 2 additions & 1 deletion src/Symfony/Component/Cache/CHANGELOG.md
Expand Up @@ -4,7 +4,8 @@ CHANGELOG
4.2.0
-----

* added support for configuring multiple Memcached servers in one DSN
* added support for connecting to Redis clusters via DSN
* added support for configuring multiple Memcached servers via DSN
* added `MarshallerInterface` and `DefaultMarshaller` to allow changing the serializer and provide one that automatically uses igbinary when available
* added `CacheInterface`, which provides stampede protection via probabilistic early expiration and should become the preferred way to use a cache
* added sub-second expiry accuracy for backends that support it
Expand Down
16 changes: 4 additions & 12 deletions src/Symfony/Component/Cache/Tests/Adapter/PredisAdapterTest.php
Expand Up @@ -34,21 +34,13 @@ public function testCreateConnection()

$params = array(
'scheme' => 'tcp',
'host' => $redisHost,
'path' => '',
'dbindex' => '1',
'host' => 'localhost',
'port' => 6379,
'class' => 'Predis\Client',
'timeout' => 3,
'persistent' => 0,
'persistent_id' => null,
'read_timeout' => 0,
'retry_interval' => 0,
'compression' => true,
'tcp_keepalive' => 0,
'lazy' => false,
'timeout' => 3,
'read_write_timeout' => 0,
'tcp_nodelay' => true,
'database' => '1',
'password' => null,
);
$this->assertSame($params, $connection->getParameters()->toArray());
}
Expand Down
Expand Up @@ -11,14 +11,17 @@

namespace Symfony\Component\Cache\Tests\Adapter;

use Symfony\Component\Cache\Adapter\RedisAdapter;

class PredisRedisClusterAdapterTest extends AbstractRedisAdapterTest
{
public static function setupBeforeClass()
{
if (!$hosts = getenv('REDIS_CLUSTER_HOSTS')) {
self::markTestSkipped('REDIS_CLUSTER_HOSTS env var is not defined.');
}
self::$redis = new \Predis\Client(explode(' ', $hosts), array('cluster' => 'redis'));

self::$redis = RedisAdapter::createConnection('redis:?host['.str_replace(' ', ']&host[', $hosts).']', array('class' => \Predis\Client::class, 'redis_cluster' => true));
}

public static function tearDownAfterClass()
Expand Down
Expand Up @@ -33,6 +33,11 @@ public function createCachePool($defaultLifetime = 0)

public function testCreateConnection()
{
$redis = RedisAdapter::createConnection('redis:?host[h1]&host[h2]&host[/foo:]');
$this->assertInstanceOf(\RedisArray::class, $redis);
$this->assertSame(array('h1:6379', 'h2:6379', '/foo'), $redis->_hosts());
@$redis = null; // some versions of phpredis connect on destruct, let's silence the warning

$redisHost = getenv('REDIS_HOST');

$redis = RedisAdapter::createConnection('redis://'.$redisHost);
Expand Down
Expand Up @@ -11,6 +11,10 @@

namespace Symfony\Component\Cache\Tests\Adapter;

use Symfony\Component\Cache\Adapter\AbstractAdapter;
use Symfony\Component\Cache\Adapter\RedisAdapter;
use Symfony\Component\Cache\Traits\RedisClusterProxy;

class RedisClusterAdapterTest extends AbstractRedisAdapterTest
{
public static function setupBeforeClass()
Expand All @@ -22,6 +26,33 @@ public static function setupBeforeClass()
self::markTestSkipped('REDIS_CLUSTER_HOSTS env var is not defined.');
}

self::$redis = new \RedisCluster(null, explode(' ', $hosts));
self::$redis = AbstractAdapter::createConnection('redis:?host['.str_replace(' ', ']&host[', $hosts).']', array('lazy' => true, 'redis_cluster' => true));
}

public function createCachePool($defaultLifetime = 0)
{
$this->assertInstanceOf(RedisClusterProxy::class, self::$redis);
$adapter = new RedisAdapter(self::$redis, str_replace('\\', '.', __CLASS__), $defaultLifetime);

return $adapter;
}

/**
* @dataProvider provideFailedCreateConnection
* @expectedException \Symfony\Component\Cache\Exception\InvalidArgumentException
* @expectedExceptionMessage Redis connection failed
*/
public function testFailedCreateConnection($dsn)
{
RedisAdapter::createConnection($dsn);
}

public function provideFailedCreateConnection()
{
return array(
array('redis://localhost:1234?redis_cluster=1'),
array('redis://foo@localhost?redis_cluster=1'),
array('redis://localhost/123?redis_cluster=1'),
);
}
}
183 changes: 141 additions & 42 deletions src/Symfony/Component/Cache/Traits/RedisTrait.php
Expand Up @@ -13,7 +13,6 @@

use Predis\Connection\Aggregate\ClusterInterface;
use Predis\Connection\Aggregate\RedisCluster;
use Predis\Connection\Factory;
use Predis\Response\Status;
use Symfony\Component\Cache\Exception\CacheException;
use Symfony\Component\Cache\Exception\InvalidArgumentException;
Expand All @@ -37,7 +36,10 @@ trait RedisTrait
'retry_interval' => 0,
'compression' => true,
'tcp_keepalive' => 0,
'lazy' => false,
'lazy' => null,
'redis_cluster' => false,
'dbindex' => 0,
'failover' => 'none',
);
private $redis;
private $marshaller;
Expand All @@ -53,7 +55,7 @@ private function init($redisClient, $namespace, $defaultLifetime, ?MarshallerInt
throw new InvalidArgumentException(sprintf('RedisAdapter namespace contains "%s" but only characters in [-+_.A-Za-z0-9] are allowed.', $match[0]));
}
if (!$redisClient instanceof \Redis && !$redisClient instanceof \RedisArray && !$redisClient instanceof \RedisCluster && !$redisClient instanceof \Predis\Client && !$redisClient instanceof RedisProxy && !$redisClient instanceof RedisClusterProxy) {
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given', __METHOD__, \is_object($redisClient) ? \get_class($redisClient) : \gettype($redisClient)));
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given.', __METHOD__, \is_object($redisClient) ? \get_class($redisClient) : \gettype($redisClient)));
}
$this->redis = $redisClient;
$this->marshaller = $marshaller ?? new DefaultMarshaller();
Expand All @@ -74,57 +76,87 @@ private function init($redisClient, $namespace, $defaultLifetime, ?MarshallerInt
*
* @throws InvalidArgumentException when the DSN is invalid
*
* @return \Redis|\Predis\Client According to the "class" option
* @return \Redis|\RedisCluster|\Predis\Client According to the "class" option
*/
public static function createConnection($dsn, array $options = array())
{
if (0 !== strpos($dsn, 'redis://')) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s does not start with "redis://"', $dsn));
if (0 !== strpos($dsn, 'redis:')) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s does not start with "redis:".', $dsn));
}
$params = preg_replace_callback('#^redis://(?:(?:[^:@]*+:)?([^@]*+)@)?#', function ($m) use (&$auth) {
if (isset($m[1])) {
$auth = $m[1];

if (!\extension_loaded('redis') && !class_exists(\Predis\Client::class)) {
throw new CacheException(sprintf('Cannot find the "redis" extension nor the "predis/predis" package: %s', $dsn));
}

$params = preg_replace_callback('#^redis:(//)?(?:(?:[^:@]*+:)?([^@]*+)@)?#', function ($m) use (&$auth) {
if (isset($m[2])) {
$auth = $m[2];
}

return 'file://';
return 'file:'.($m[1] ?? '');
}, $dsn);
if (false === $params = parse_url($params)) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
}
if (!isset($params['host']) && !isset($params['path'])) {

if (false === $params = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
}
if (isset($params['path']) && preg_match('#/(\d+)$#', $params['path'], $m)) {
$params['dbindex'] = $m[1];
$params['path'] = substr($params['path'], 0, -\strlen($m[0]));
}
if (isset($params['host'])) {
$scheme = 'tcp';
} else {
$scheme = 'unix';
}
$params += array(
'host' => isset($params['host']) ? $params['host'] : $params['path'],
'port' => isset($params['host']) ? 6379 : null,
'dbindex' => 0,
);

$query = $hosts = array();

if (isset($params['query'])) {
parse_str($params['query'], $query);
$params += $query;

if (isset($query['host'])) {
if (!\is_array($hosts = $query['host'])) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
}
foreach ($hosts as $host => $parameters) {
if (\is_string($parameters)) {
parse_str($parameters, $parameters);
}
if (false === $i = strrpos($host, ':')) {
$hosts[$host] = array('scheme' => 'tcp', 'host' => $host, 'port' => 6379) + $parameters;
} elseif ($port = (int) substr($host, 1 + $i)) {
$hosts[$host] = array('scheme' => 'tcp', 'host' => substr($host, 0, $i), 'port' => $port) + $parameters;
} else {
$hosts[$host] = array('scheme' => 'unix', 'path' => substr($host, 0, $i)) + $parameters;
}
}
$hosts = array_values($hosts);
}
}

if (isset($params['host']) || isset($params['path'])) {
if (!isset($params['dbindex']) && isset($params['path']) && preg_match('#/(\d+)$#', $params['path'], $m)) {
$params['dbindex'] = $m[1];
$params['path'] = substr($params['path'], 0, -\strlen($m[0]));
}

if (isset($params['host'])) {
array_unshift($hosts, array('scheme' => 'tcp', 'host' => $params['host'], 'port' => $params['port'] ?? 6379));
} else {
array_unshift($hosts, array('scheme' => 'unix', 'path' => $params['path']));
}
}
$params += $options + self::$defaultConnectionOptions;
if (null === $params['class'] && !\extension_loaded('redis') && !class_exists(\Predis\Client::class)) {
throw new CacheException(sprintf('Cannot find the "redis" extension, and "predis/predis" is not installed: %s', $dsn));

if (!$hosts) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
}

$params += $query + $options + self::$defaultConnectionOptions;

if (null === $params['class'] && \extension_loaded('redis')) {
$class = $params['redis_cluster'] ? \RedisCluster::class : (1 < \count($hosts) ? \RedisArray::class : \Redis::class);
} else {
$class = null === $params['class'] ? \Predis\Client::class : $params['class'];
}
$class = null === $params['class'] ? (\extension_loaded('redis') ? \Redis::class : \Predis\Client::class) : $params['class'];

if (is_a($class, \Redis::class, true)) {
$connect = $params['persistent'] || $params['persistent_id'] ? 'pconnect' : 'connect';
$redis = new $class();

$initializer = function ($redis) use ($connect, $params, $dsn, $auth) {
$initializer = function ($redis) use ($connect, $params, $dsn, $auth, $hosts) {
try {
@$redis->{$connect}($params['host'], $params['port'], $params['timeout'], $params['persistent_id'], $params['retry_interval']);
@$redis->{$connect}($hosts[0]['host'], $hosts[0]['port'], $params['timeout'], (string) $params['persistent_id'], $params['retry_interval']);
} catch (\RedisException $e) {
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
}
Expand Down Expand Up @@ -160,15 +192,82 @@ public static function createConnection($dsn, array $options = array())
} else {
$initializer($redis);
}
} elseif (is_a($class, \RedisArray::class, true)) {
foreach ($hosts as $i => $host) {
$hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path'];
}
$params['lazy_connect'] = $params['lazy'] ?? true;
$params['connect_timeout'] = $params['timeout'];

try {
$redis = new $class($hosts, $params);
} catch (\RedisClusterException $e) {
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
}

if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) {
$redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
}
if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) {
$redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF);
}
} elseif (is_a($class, \RedisCluster::class, true)) {
$initializer = function () use ($class, $params, $dsn, $hosts) {
foreach ($hosts as $i => $host) {
$hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path'];
}

try {
$redis = new $class(null, $hosts, $params['timeout'], $params['read_timeout'], (bool) $params['persistent']);
} catch (\RedisClusterException $e) {
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
}

if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) {
$redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
}
if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) {
$redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF);
}
switch ($params['failover']) {
case 'error': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_ERROR); break;
case 'distribute': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE); break;
case 'slaves': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE_SLAVES); break;
}

return $redis;
};

$redis = $params['lazy'] ? new RedisClusterProxy($initializer) : $initializer();
} elseif (is_a($class, \Predis\Client::class, true)) {
$params['scheme'] = $scheme;
$params['database'] = $params['dbindex'] ?: null;
$params['password'] = $auth;
$redis = new $class((new Factory())->create($params));
if ($params['redis_cluster']) {
$params['cluster'] = 'redis';
}
$params += array('parameters' => array());
$params['parameters'] += array(
'persistent' => $params['persistent'],
'timeout' => $params['timeout'],
'read_write_timeout' => $params['read_timeout'],
'tcp_nodelay' => true,
);
if ($params['dbindex']) {
$params['parameters']['database'] = $params['dbindex'];
}
if (null !== $auth) {
$params['parameters']['password'] = $auth;
}
if (1 === \count($hosts) && !$params['redis_cluster']) {
$hosts = $hosts[0];
} elseif (\in_array($params['failover'], array('slaves', 'distribute'), true) && !isset($params['replication'])) {
$params['replication'] = true;
$hosts[0] += array('alias' => 'master');
}

$redis = new $class($hosts, array_diff_key($params, self::$defaultConnectionOptions));
} elseif (class_exists($class, false)) {
throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis" or "Predis\Client"', $class));
throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis", "RedisArray", "RedisCluster" nor "Predis\Client".', $class));
} else {
throw new InvalidArgumentException(sprintf('Class "%s" does not exist', $class));
throw new InvalidArgumentException(sprintf('Class "%s" does not exist.', $class));
}

return $redis;
Expand All @@ -183,7 +282,6 @@ protected function doFetch(array $ids)
return array();
}

$i = -1;
$result = array();

if ($this->redis instanceof \Predis\Client) {
Expand Down Expand Up @@ -244,6 +342,7 @@ protected function doClear($namespace)
$h->connect($host[0], $host[1]);
}
}

foreach ($hosts as $host) {
if (!isset($namespace[0])) {
$cleared = $host->flushDb() && $cleared;
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Cache/composer.json
Expand Up @@ -32,7 +32,7 @@
"cache/integration-tests": "dev-master",
"doctrine/cache": "~1.6",
"doctrine/dbal": "~2.5",
"predis/predis": "~1.0",
"predis/predis": "~1.1",
"symfony/config": "~4.2",
"symfony/dependency-injection": "~3.4",
"symfony/var-dumper": "^4.1.1"
Expand Down

0 comments on commit 620094a

Please sign in to comment.