Permalink
Browse files

updated the Redis class to support pipelining. Closes #785

Also added a config option to override the system defined socket timeout
  • Loading branch information...
WanWizard committed Jul 29, 2012
1 parent a14840c commit 11b6a890cbfcd7b4a5e98529b9d778dc2eacbdb3
Showing with 156 additions and 69 deletions.
  1. +156 −69 classes/redis.php
View
@@ -15,9 +15,9 @@
*
* It has been modified to work with Fuel and to improve the code slightly.
*
* @author Justin Poliey <jdp34@njit.edu>
* @copyright 2009 Justin Poliey <jdp34@njit.edu>
* @license http://www.opensource.org/licenses/mit-license.php The MIT License
* @author Justin Poliey <justin@getglue.com>
* @copyright 2009-2012 Justin Poliey <justin@getglue.com>
* @license http://www.opensource.org/licenses/ISC The ISC License
*/
namespace Fuel\Core;
@@ -31,144 +31,231 @@ class RedisException extends \FuelException {}
*/
class Redis
{
/**
* Multiton pattern, keep track of all created instances
*/
protected static $instances = array();
/**
* Get or create an instance of the Redis class
*/
public static function instance($name = 'default')
{
if (\array_key_exists($name, static::$instances))
if ( ! array_key_exists($name, static::$instances))
{
return static::$instances[$name];
}
empty(static::$instances) and \Config::load('db', true);
if (empty(static::$instances))
{
\Config::load('db', true);
}
if ( ! ($config = \Config::get('db.redis.'.$name)))
{
throw new \RedisException('Invalid instance name given.');
}
if ( ! ($config = \Config::get('db.redis.'.$name)))
{
throw new \RedisException('Invalid instance name given.');
}
static::$instances[$name] = new static($config);
static::$instances[$name] = new static($config);
}
return static::$instances[$name];
}
/**
* @var resource
*/
protected $connection = false;
/**
* Flag indicating whether or not commands are being pipelined
*
* @var boolean
*/
protected $pipelined = false;
/**
* The queue of commands to be sent to the Redis server
*
* @var array
*/
protected $queue = array();
/**
* Create a new Redis instance using the configuration values supplied
*/
public function __construct(array $config = array())
{
$this->connection = @fsockopen($config['hostname'], $config['port'], $errno, $errstr);
empty($config['timeout']) and $config['timeout'] = ini_get("default_socket_timeout");
$this->connection = @fsockopen($config['hostname'], $config['port'], $errno, $errstr, $config['timeout']);
if ( ! $this->connection)
{
throw new \RedisException($errstr, $errno);
}
}
/**
* Close the open connection on class destruction
*/
public function __destruct()
{
fclose($this->connection);
$this->connection and fclose($this->connection);
}
public function __call($name, $args)
/**
* Returns the Redisent instance ready for pipelining.
*
* Redis commands can now be chained, and the array of the responses will be
* returned when {@link uncork} is called.
* @see uncork
*
*/
public function pipeline()
{
$response = null;
$this->pipelined = true;
$name = strtoupper($name);
return $this;
}
$command = '*'.(count($args) + 1).CRLF;
$command .= '$'.strlen($name).CRLF;
$command .= $name.CRLF;
/**
* Flushes the commands in the pipeline queue to Redis and returns the responses.
* @see pipeline
*/
public function uncork()
{
// open a Redis connection and execute the queued commands
foreach ($this->queue as $command)
{
for ($written = 0; $written < strlen($command); $written += $fwrite)
{
$fwrite = fwrite($this->connection, substr($command, $written));
if ($fwrite === false)
{
throw new \RedisException('Failed to write entire command to stream');
}
}
}
foreach ($args as $arg)
// Read in the results from the pipelined commands
$responses = array();
for ($i = 0; $i < count($this->queue); $i++)
{
$command .= '$'.strlen($arg).CRLF;
$command .= $arg.CRLF;
$responses[] = $this->readResponse();
}
fwrite($this->connection, $command);
// Clear the queue and return the response
$this->queue = array();
if ($this->pipelined)
{
$this->pipelined = false;
return $responses;
}
else
{
return $responses[0];
}
}
/**
*/
public function __call($name, $args)
{
// build the Redis unified protocol command
array_unshift($args, strtoupper($name));
$command = sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(function($arg) {
return sprintf('$%d%s%s', strlen($arg), CRLF, $arg);
}, $args), CRLF), CRLF);
// add it to the pipeline queue
$this->queue[] = $command;
if ($this->pipelined)
{
return $this;
}
else
{
return $this->uncork();
}
}
protected function readResponse()
{
// parse the response based on the reply identifier
$reply = trim(fgets($this->connection, 512));
switch (substr($reply, 0, 1))
{
// Error
// error reply
case '-':
throw new \RedisException(substr(trim($reply), 4));
break;
throw new \RedisException(trim(substr($reply, 4)));
break;
// In-line reply
// inline reply
case '+':
$response = substr(trim($reply), 1);
break;
if ($response === 'OK')
{
$response = true;
}
break;
// Bulk reply
// bulk reply
case '$':
$response = null;
if ($reply == '$-1')
{
$response = null;
break;
}
$read = 0;
$size = substr($reply, 1);
$size = intval(substr($reply, 1));
if ($size > 0)
{
do
{
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
$response .= fread($this->connection, $block_size);
$read += $block_size;
} while ($read < $size);
$r = fread($this->connection, $block_size);
if ($r === false)
{
throw new \RedisException('Failed to read response from stream');
}
else
{
$read += strlen($r);
$response .= $r;
}
}
while ($read < $size);
}
// discard the crlf
fread($this->connection, 2);
break;
break;
// Mult-Bulk reply
// multi-bulk reply
case '*':
$count = substr($reply, 1);
$count = intval(substr($reply, 1));
if ($count == '-1')
{
return null;
}
$response = array();
for ($i = 0; $i < $count; $i++)
{
$bulk_head = trim(fgets($this->connection, 512));
$size = substr($bulk_head, 1);
if ($size == '-1')
{
$response[] = null;
}
else
{
$read = 0;
$block = "";
do
{
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
$block .= fread($this->connection, $block_size);
$read += $block_size;
} while ($read < $size);
fread($this->connection, 2); /* discard crlf */
$response[] = $block;
}
$response[] = $this->readResponse();
}
break;
break;
// Integer Reply
// integer reply
case ':':
$response = substr(trim($reply), 1);
break;
$response = intval(substr(trim($reply), 1));
break;
// Don't know what to do? Throw it outta here
default:
throw new \RedisException("invalid server response: {$reply}");
break;
throw new \RedisException("Unknown response: {$reply}");
break;
}
// party on...
return $response;
}
}
}

0 comments on commit 11b6a89

Please sign in to comment.