Skip to content

Commit

Permalink
bug #21908 [Cache] Fix Redis pipelining/multi-ops (nicolas-grekas)
Browse files Browse the repository at this point in the history
This PR was merged into the 3.2 branch.

Discussion
----------

[Cache] Fix Redis pipelining/multi-ops

| Q             | A
| ------------- | ---
| Branch?       | 3.2
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #21858
| License       | MIT
| Doc PR        | -

`MSET` doesn't work on clustered connections. Let's use pipelining instead.

Commits
-------

f1648e2 [Cache] Fix Redis pipelining/multi-ops
  • Loading branch information
nicolas-grekas committed Mar 7, 2017
2 parents 17f717c + f1648e2 commit 3265ed4
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 48 deletions.
102 changes: 54 additions & 48 deletions src/Symfony/Component/Cache/Adapter/RedisAdapter.php
Expand Up @@ -14,6 +14,7 @@
use Predis\Connection\Factory;
use Predis\Connection\Aggregate\PredisCluster;
use Predis\Connection\Aggregate\RedisCluster;
use Predis\Response\Status;
use Symfony\Component\Cache\Exception\InvalidArgumentException;

/**
Expand Down Expand Up @@ -136,11 +137,14 @@ public static function createConnection($dsn, array $options = array())
protected function doFetch(array $ids)
{
if ($ids) {
$values = $this->redis->mGet($ids);
$index = 0;
foreach ($ids as $id) {
if ($value = $values[$index++]) {
yield $id => parent::unserialize($value);
$values = $this->pipeline(function () use ($ids) {
foreach ($ids as $id) {
yield 'get' => array($id);
}
});
foreach ($values as $id => $v) {
if ($v) {
yield $id => parent::unserialize($v);
}
}
}
Expand Down Expand Up @@ -251,61 +255,63 @@ protected function doSave(array $values, $lifetime)
return $failed;
}

if (0 >= $lifetime) {
$this->redis->mSet($serialized);

return $failed;
}

$this->pipeline(function ($pipe) use (&$serialized, $lifetime) {
$results = $this->pipeline(function () use ($serialized, $lifetime) {
foreach ($serialized as $id => $value) {
$pipe('setEx', $id, array($lifetime, $value));
if (0 >= $lifetime) {
yield 'set' => array($id, $value);
} else {
yield 'setEx' => array($id, $lifetime, $value);
}
}
});
foreach ($results as $id => $result) {
if (true !== $result && (!$result instanceof Status || $result !== Status::get('OK'))) {
$failed[] = $id;
}
}

return $failed;
}

private function execute($command, $id, array $args, $redis = null)
private function pipeline(\Closure $generator)
{
array_unshift($args, $id);
call_user_func_array(array($redis ?: $this->redis, $command), $args);
}
$ids = array();

private function pipeline(\Closure $callback)
{
$redis = $this->redis;

try {
if ($redis instanceof \Predis\Client) {
$redis->pipeline(function ($pipe) use ($callback) {
$this->redis = $pipe;
$callback(array($this, 'execute'));
});
} elseif ($redis instanceof \RedisArray) {
$connections = array();
$callback(function ($command, $id, $args) use (&$connections) {
if (!isset($connections[$h = $this->redis->_target($id)])) {
$connections[$h] = $this->redis->_instance($h);
$connections[$h]->multi(\Redis::PIPELINE);
}
$this->execute($command, $id, $args, $connections[$h]);
});
foreach ($connections as $c) {
$c->exec();
if ($this->redis instanceof \Predis\Client) {
$results = $this->redis->pipeline(function ($redis) use ($generator, &$ids) {
foreach ($generator() as $command => $args) {
call_user_func_array(array($redis, $command), $args);
$ids[] = $args[0];
}
} else {
$pipe = $redis->multi(\Redis::PIPELINE);
try {
$callback(array($this, 'execute'));
} finally {
if ($pipe) {
$redis->exec();
}
});
} elseif ($this->redis instanceof \RedisArray) {
$connections = $results = $ids = array();
foreach ($generator() as $command => $args) {
if (!isset($connections[$h = $this->redis->_target($args[0])])) {
$connections[$h] = array($this->redis->_instance($h), -1);
$connections[$h][0]->multi(\Redis::PIPELINE);
}
call_user_func_array(array($connections[$h][0], $command), $args);
$results[] = array($h, ++$connections[$h][1]);
$ids[] = $args[0];
}
foreach ($connections as $h => $c) {
$connections[$h] = $c[0]->exec();
}
foreach ($results as $k => list($h, $c)) {
$results[$k] = $connections[$h][$c];
}
} finally {
$this->redis = $redis;
} else {
$this->redis->multi(\Redis::PIPELINE);
foreach ($generator() as $command => $args) {
call_user_func_array(array($this->redis, $command), $args);
$ids[] = $args[0];
}
$results = $this->redis->exec();
}

foreach ($ids as $k => $id) {
yield $id => $results[$k];
}
}
}
@@ -0,0 +1,27 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Cache\Tests\Adapter;

class PredisClusterAdapterTest extends AbstractRedisAdapterTest
{
public static function setupBeforeClass()
{
parent::setupBeforeClass();
self::$redis = new \Predis\Client(array(getenv('REDIS_HOST')));
}

public static function tearDownAfterClass()
{
self::$redis->getConnection()->getConnectionByKey('foo')->executeCommand(self::$redis->createCommand('FLUSHDB'));
self::$redis = null;
}
}

0 comments on commit 3265ed4

Please sign in to comment.