Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Fetching contributors…
Cannot retrieve contributors at this time
1277 lines (862 sloc) 30.1 KB
package Redis;
# ABSTRACT: Perl binding for Redis database
use warnings;
use strict;
use IO::Socket::INET;
use IO::Socket::UNIX;
use IO::Select;
use IO::Handle;
use Fcntl qw( O_NONBLOCK F_SETFL );
use Data::Dumper;
use Carp qw/confess/;
use Encode;
use Try::Tiny;
use Scalar::Util ();
use constant WIN32 => $^O =~ /mswin32/i;
sub new {
my $class = shift;
my %args = @_;
my $self = bless {}, $class;
$self->{debug} = $args{debug} || $ENV{REDIS_DEBUG};
## default to lax utf8
$self->{encoding} = exists $args{encoding}? $args{encoding} : 'utf8';
if ($ENV{REDIS_SERVER} && !$args{sock} && !$args{server}) {
if ($ENV{REDIS_SERVER} =~ m!^/!) {
$args{sock} = $ENV{REDIS_SERVER};
elsif ($ENV{REDIS_SERVER} =~ m!^unix:(.+)!) {
$args{sock} = $1;
elsif ($ENV{REDIS_SERVER} =~ m!^(tcp:)?(.+)!) {
$args{server} = $2;
$self->{password} = $args{password} if $args{password};
if ($args{sock}) {
$self->{server} = $args{sock};
$self->{builder} = sub { IO::Socket::UNIX->new($_[0]->{server}) };
else {
$self->{server} = $args{server} || '';
$self->{builder} = sub {
PeerAddr => $_[0]->{server},
Proto => 'tcp',
$self->{is_subscriber} = 0;
$self->{subscribers} = {};
$self->{reconnect} = $args{reconnect} || 0;
$self->{every} = $args{every} || 1000;
return $self;
sub is_subscriber { $_[0]{is_subscriber} }
### we don't want DESTROY to fallback into AUTOLOAD
sub DESTROY { }
### Deal with common, general case, Redis commands
my $command = $AUTOLOAD;
$command =~ s/.*://;
my $method = sub { shift->__std_cmd($command, @_) };
# Save this method for future calls
no strict 'refs';
*$AUTOLOAD = $method;
goto $method;
sub __std_cmd {
my $self = shift;
my $command = shift;
my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
# If this is an EXEC command, in pipelined mode, and one of the commands
# executed in the transaction yields an error, we must collect all errors
# from that command, rather than throwing an exception immediately.
my $collect_errors = $cb && uc($command) eq 'EXEC';
## Fast path, no reconnect;
return $self->__run_cmd($command, $collect_errors, undef, $cb, @_)
unless $self->{reconnect};
my @cmd_args = @_;
$self->__with_reconnect(sub {
$self->__run_cmd($command, $collect_errors, undef, $cb, @cmd_args);
sub __with_reconnect {
my ($self, $cb) = @_;
## Fast path, no reconnect
return $cb->() unless $self->{reconnect};
return &try($cb, catch {
die $_ unless ref($_) eq 'Redis::X::Reconnect';
sub __run_cmd {
my ($self, $command, $collect_errors, $custom_decode, $cb, @args) = @_;
my $ret;
my $wrapper = $cb && $custom_decode ? sub {
my ($reply, $error) = @_;
$cb->(scalar $custom_decode->($reply), $error);
} : $cb || sub {
my ($reply, $error) = @_;
confess "[$command] $error, " if defined $error;
$ret = $reply;
$self->__send_command($command, @args);
push @{ $self->{queue} }, [$command, $wrapper, $collect_errors];
return 1 if $cb;
return $custom_decode ? $custom_decode->($ret, !wantarray)
: wantarray && ref $ret eq 'ARRAY' ? @$ret : $ret;
sub wait_all_responses {
my ($self) = @_;
for my $handler (splice @{ $self->{queue} }) {
my ($command, $cb, $collect_errors) = @$handler;
$cb->($self->__read_response($command, $collect_errors));
### Commands with extra logic
sub quit {
my ($self) = @_;
return unless $self->{sock};
confess "[quit] only works in synchronous mode, "
if @_ && ref $_[-1] eq 'CODE';
close(delete $self->{sock}) || confess("Can't close socket: $!");
return 1;
sub shutdown {
my ($self) = @_;
confess "[shutdown] only works in synchronous mode, "
if @_ && ref $_[-1] eq 'CODE';
return unless $self->{sock};
close(delete $self->{sock}) || confess("Can't close socket: $!");
return 1;
sub ping {
my $self = shift;
confess "[ping] only works in synchronous mode, "
if @_ && ref $_[-1] eq 'CODE';
return unless exists $self->{sock};
return scalar try {
catch {
close(delete $self->{sock});
sub info {
my $self = shift;
my $custom_decode = sub {
my ($reply) = @_;
return $reply if !defined $reply || ref $reply;
return { map { split(/:/, $_, 2) } grep { /^[^#]/ } split(/\r\n/, $reply) };
my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
## Fast path, no reconnect
return $self->__run_cmd('INFO', 0, $custom_decode, $cb, @_)
unless $self->{reconnect};
my @cmd_args = @_;
$self->__with_reconnect(sub {
$self->__run_cmd('INFO', 0, $custom_decode, $cb, @cmd_args);
sub keys {
my $self = shift;
my $custom_decode = sub {
my ($reply, $synchronous_scalar) = @_;
## Support redis <= 1.2.6
$reply = [split(/\s/, $reply)] if defined $reply && !ref $reply;
return ref $reply && ($synchronous_scalar || wantarray) ? @$reply : $reply;
my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
## Fast path, no reconnect
return $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @_)
unless $self->{reconnect};
my @cmd_args = @_;
$self->__with_reconnect(sub {
$self->__run_cmd('KEYS', 0, $custom_decode, $cb, @cmd_args);
### PubSub
sub wait_for_messages {
my ($self, $timeout) = @_;
my $sock = $self->{sock};
my $s = IO::Select->new;
my $count = 0;
while ($s->can_read($timeout)) {
while (__try_read_sock($sock)) {
my ($reply, $error) = $self->__read_response('WAIT_FOR_MESSAGES');
confess "[WAIT_FOR_MESSAGES] $error, " if defined $error;
return $count;
sub __subscription_cmd {
my $self = shift;
my $pr = shift;
my $unsub = shift;
my $command = shift;
my $cb = pop;
confess("Missing required callback in call to $command(), ")
unless ref($cb) eq 'CODE';
my @subs = @_;
$self->__with_reconnect(sub {
$self->__throw_reconnect('Not connected to any server')
unless $self->{sock};
@subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
if $unsub;
return unless @subs;
$self->__send_command($command, @subs);
my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
return $self->__process_subscription_changes($command, \%cbs);
sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) }
sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) }
sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) }
sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
sub __process_unsubscribe_requests {
my ($self, $cb, $pr, @unsubs) = @_;
my $subs = $self->{subscribers};
my @subs_to_unsubscribe;
for my $sub (@unsubs) {
my $key = "${pr}message:$sub";
my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}];
next if @$cbs;
delete $subs->{$key};
push @subs_to_unsubscribe, $sub;
return @subs_to_unsubscribe;
sub __process_subscription_changes {
my ($self, $cmd, $expected) = @_;
my $subs = $self->{subscribers};
while (%$expected) {
my ($m, $error) = $self->__read_response($cmd);
confess "[$cmd] $error, " if defined $error;
## Deal with pending PUBLISH'ed messages
if ($m->[0] =~ /^p?message$/) {
my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
$key .= "message:$m->[1]";
my $cb = delete $expected->{$key};
push @{$subs->{$key}}, $cb unless $unsub;
$self->{is_subscriber} = $m->[2];
sub __process_pubsub_msg {
my ($self, $m) = @_;
my $subs = $self->{subscribers};
my $sub = $m->[1];
my $cbid = "$m->[0]:$sub";
my $data = pop @$m;
my $topic = $m->[2] || $sub;
if (!exists $subs->{$cbid}) {
warn "Message for topic '$topic' ($cbid) without expected callback, ";
$_->($data, $topic, $sub) for @{$subs->{$cbid}};
return 1;
### Mode validation
sub __is_valid_command {
my ($self, $cmd) = @_;
confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
if $self->{is_subscriber};
### Socket operations
sub __connect {
my ($self) = @_;
delete $self->{sock};
# Suppose we have at least one command response pending, but we're about
# to reconnect. The new connection will never get a response to any of
# the pending commands, so delete all those pending responses now.
$self->{queue} = [];
## Fast path, no reconnect
return $self->__build_sock() unless $self->{reconnect};
## Use precise timers on reconnections
require Time::HiRes;
my $t0 = [Time::HiRes::gettimeofday()];
## Reconnect...
while (1) {
eval { $self->__build_sock };
last unless $@; ## Connected!
die if Time::HiRes::tv_interval($t0) > $self->{reconnect}; ## Timeout
Time::HiRes::usleep($self->{every}); ## Retry in...
sub __build_sock {
my ($self) = @_;
$self->{sock} = $self->{builder}->($self)
|| confess("Could not connect to Redis server at $self->{server}: $!");
if (exists $self->{password}) {
try { $self->auth($self->{password}) }
catch {
$self->{reconnect} = 0;
confess("Redis server refused password");
sub __send_command {
my $self = shift;
my $cmd = uc(shift);
my $enc = $self->{encoding};
my $deb = $self->{debug};
my $sock = $self->{sock}
|| $self->__throw_reconnect('Not connected to any server');
warn "[SEND] $cmd ", Dumper([@_]) if $deb;
## Encode command using multi-bulk format
my $n_elems = scalar(@_) + 1;
my $buf = "\*$n_elems\r\n";
for my $elem ($cmd, @_) {
my $bin = $enc ? encode($enc, $elem) : $elem;
$buf .= defined($bin) ? '$' . length($bin) . "\r\n$bin\r\n" : "\$-1\r\n";
## Check to see if socket was closed: reconnect on EOF
my $status = __try_read_sock($sock);
$self->__throw_reconnect('Not connected to any server')
if defined $status && $status == 0;
## Send command, take care for partial writes
warn "[SEND RAW] $buf" if $deb;
while ($buf) {
my $len = syswrite $sock, $buf, length $buf;
$self->__throw_reconnect("Could not write to Redis server: $!")
unless $len;
substr $buf, 0, $len, "";
sub __read_response {
my ($self, $cmd, $collect_errors) = @_;
confess("Not connected to any server") unless $self->{sock};
local $/ = "\r\n";
## no debug => fast path
return $self->__read_response_r($cmd, $collect_errors) unless $self->{debug};
my ($result, $error) = $self->__read_response_r($cmd, $collect_errors);
warn "[RECV] $cmd ", Dumper($result, $error) if $self->{debug};
return $result, $error;
sub __read_response_r {
my ($self, $command, $collect_errors) = @_;
my ($type, $result) = $self->__read_line;
if ($type eq '-') {
return undef, $result;
elsif ($type eq '+' || $type eq ':') {
return $result, undef;
elsif ($type eq '$') {
return undef, undef if $result < 0;
return $self->__read_len($result + 2), undef;
elsif ($type eq '*') {
return undef, undef if $result < 0;
my @list;
while ($result--) {
my @nested = $self->__read_response_r($command, $collect_errors);
if ($collect_errors) {
push @list, \@nested;
else {
confess "[$command] $nested[1], " if defined $nested[1];
push @list, $nested[0];
return \@list, undef;
else {
confess "unknown answer type: $type ($result), ";
sub __read_line {
my $self = $_[0];
my $sock = $self->{sock};
my $data = <$sock>;
confess("Error while reading from Redis server: $!")
unless defined $data;
chomp $data;
warn "[RECV RAW] '$data'" if $self->{debug};
my $type = substr($data, 0, 1, '');
return ($type, $data) unless $self->{encoding};
return ($type, decode($self->{encoding}, $data));
sub __read_len {
my ($self, $len) = @_;
my $data = '';
my $offset = 0;
while ($len) {
my $bytes = read $self->{sock}, $data, $len, $offset;
confess("Error while reading from Redis server: $!")
unless defined $bytes;
confess("Redis server closed connection") unless $bytes;
$offset += $bytes;
$len -= $bytes;
chomp $data;
warn "[RECV RAW] '$data'" if $self->{debug};
return $data unless $self->{encoding};
return decode($self->{encoding}, $data);
# The reason for this code:
# IO::Select and buffered reads like <$sock> and read() dont mix
# For example, if I receive two MESSAGE messages (from Redis PubSub),
# the first read for the first message will probably empty to socket
# buffer and move the data to the perl IO buffer.
# This means that IO::Select->can_read will return false (after all
# the socket buffer is empty) but from the application point of view
# there is still data to be read and process
# Hence this code. We try to do a non-blocking read() of 1 byte, and if
# we succeed, we put it back and signal "yes, Virginia, there is still
# stuff out there"
# We could just use sysread and leave the socket buffer with the second
# message, and then use IO::Select as intended, and previous versions of
# this code did that (check the git history for this file), but
# performance suffers, about 20/30% slower, mostly because we do a lot
# of "read one line", where <$sock> beats the crap of anything you can
# write on Perl-land.
sub __try_read_sock {
my $sock = shift;
my $data = '';
__fh_nonblocking($sock, 1);
my $result = read($sock, $data, 1);
my $err = 0 + $!;
__fh_nonblocking($sock, 0);
## No errno?? This happens sometimes on my tests when the server
## timesout the client. I traced the system calls and I see the read()
## system call return 0 for EOF, but on this side of perl, we get
## undef...
return 0 if !defined($result) && $err == 0;
return $result unless $result;
return 1;
### Copied from AnyEvent::Util
*__fh_nonblocking = (WIN32)
? sub($$) { ioctl $_[0], 0x8004667e, pack "L", $_[1]; } # FIONBIO
: sub($$) { fcntl $_[0], F_SETFL, $_[1] ? O_NONBLOCK : 0; };
# I take exception to that
sub __throw_reconnect {
my ($self, $m) = @_;
die bless(\$m, 'Redis::X::Reconnect') if $self->{reconnect};
die $m;
1; # End of
## Defaults to $ENV{REDIS_SERVER} or
my $redis = Redis->new;
my $redis = Redis->new(server => '');
## Use UNIX domain socket
my $redis = Redis->new(sock => '/path/to/socket');
## Enable auto-reconnect
## Try to reconnect every 500ms up to 60 seconds until success
## Die if you can't after that
my $redis = Redis->new(reconnect => 60);
## Try each 100ms upto 2 seconds (every is in milisecs)
my $redis = Redis->new(reconnect => 2, every => 100);
## Disable the automatic utf8 encoding => much more performance
## !!!! This will be the default after 2.000, see ENCODING below
my $redis = Redis->new(encoding => undef);
## Use all the regular Redis commands, they all accept a list of
## arguments
## See for full list
$redis->set('key' => 'value');
$redis->sort('list', 'DESC');
$redis->sort(qw{list LIMIT 0 5 ALPHA DESC});
## Add a coderef argument to run a command in the background
$redis->sort(qw{list LIMIT 0 5 ALPHA DESC}, sub {
my ($reply, $error) = @_;
die "Oops, got an error: $error\n" if defined $error;
print "$_\n" for @$reply;
## Or run a large batch of commands in a pipeline
my %hash = _get_large_batch_of_commands();
$redis->hset('h', $_, $hash{$_}, sub {}) for keys %hash;
## Publish/Subscribe
sub {
my ($message, $topic, $subscribed_topic) = @_
## $subscribed_topic can be different from topic if
## you use psubscribe() with wildcards
$redis->psubscribe('nasdaq.*', sub {...});
## Blocks and waits for messages, calls subscribe() callbacks
## ... forever
my $timeout = 10;
$redis->wait_for_messages($timeout) while 1;
## ... until some condition
my $keep_going = 1; ## other code will set to false to quit
$redis->wait_for_messages($timeout) while $keep_going;
$redis->publish('topic_1', 'message');
Pure perl bindings for L<>
This version supports protocol 2.x (multi-bulk) or later of Redis
available at L<>.
This documentation lists commands which are exercised in test suite, but
additional commands will work correctly since protocol specifies enough
information to support almost all commands with same piece of code with
a little help of C<AUTOLOAD>.
Usually, running a command will wait for a response. However, if you're
doing large numbers of requests, it can be more efficient to use what Redis
calls I<pipelining>: send multiple commands to Redis without waiting for a
response, then wait for the responses that come in.
To use pipelining, add a coderef argument as the last argument to a command
method call:
$r->set('foo', 'bar', sub {});
Pending responses to pipelined commands are processed in a single batch, as
soon as at least one of the following conditions holds:
=over 4
=item *
A non-pipelined (synchronous) command has been sent on the same connection
=item *
A pub/sub subscription command (one of C<subscribe>, C<unsubscribe>,
C<psubscribe>, or C<punsubscribe>) is about to be sent on the same
=item *
The L</wait_all_responses> method is called explicitly.
The coderef you supply to a pipelined command method is invoked once the
response is available. It takes two arguments, C<$reply> and C<$error>. If
C<$error> is defined, it contains the text of an error reply sent by the
Redis server. Otherwise, C<$reply> is the non-error reply. For almost all
commands, that means it's C<undef>, or a defined but non-reference scalar,
or an array ref of any of those; but see L</keys>, L</info>, and L</exec>.
Note the contrast with synchronous commands, which throw an exception on
receipt of an error reply, or return a non-error reply directly.
The fact that pipelined commands never throw an exception can be
particularly useful for Redis transactions; see L</exec>.
B<This feature is deprecated and will be removed before 2.000>. You
should start testing your code with C<< encoding => undef >> because
that will be the new default with 2.000.
Since Redis knows nothing about encoding, we are forcing utf-8 flag on
all data received from Redis. This change was introduced in 1.2001
version. B<Please note> that this encoding option severely degrades
You can disable this automatic encoding by passing an option to
L</new>: C<< encoding => undef >>.
This allows us to round-trip utf-8 encoded characters correctly, but
might be problem if you push binary junk into Redis and expect to get it
back without utf-8 flag turned on.
=head1 METHODS
=head2 Constructors
=head3 new
my $r = Redis->new; # $ENV{REDIS_SERVER} or
my $r = Redis->new( server => '', debug => 0 );
my $r = Redis->new( server => '', encoding => undef );
my $r = Redis->new( sock => '/path/to/sock' );
my $r = Redis->new( reconnect => 60, every => 5000 );
my $r = Redis->new( password => 'boo' );
The C<< server >> parameter specifies the Redis server we should connect
to, via TCP. Use the 'IP:PORT' format. If no C<< server >> option is
present, we will attempt to use the C<< REDIS_SERVER >> environment
variable. If neither of those options are present, it defaults to
Alternatively you can use the C<< sock >> parameter to specify the path
of the UNIX domain socket where the Redis server is listening.
The C<< REDIS_SERVER >> can be used for UNIX domain sockets too. The following formats are supported:
=over 4
=item *
=item *
=item *
=item *
The C<< encoding >> parameter speficies the encoding we will use to
decode all the data we receive and encode all the data sent to the redis
server. Due to backwards-compatibility we default to C<< utf8 >>. To
disable all this encoding/decoding, you must use C<<encoding => undef>>.
B<< This is the recommended option >>.
B<< Warning >>: this option has several problems and it is
B<deprecated>. A future version might add other filtering options though.
The C<< reconnect >> option enables auto-reconnection mode. If we cannot
connect to the Redis server, or if a network write fails, we enter retry
mode. We will try a new connection every C<< every >> miliseconds
(1000ms by default), up-to C<< reconnect >> seconds.
Be aware that read errors will always thrown an exception, and will not
trigger a retry until the new command is sent.
If we cannot re-establish a connection after C<< reconnect >> seconds,
an exception will be thrown.
If your Redis server requires authentication, you can use the
C<< password >> attribute. After each established connection (at the
start or when reconnecting), the Redis C<< AUTH >> command will be send
to the server. If the password is wrong, an exception will be thrown and
reconnect will be disabled.
The C<< debug >> parameter enables debug information to STDERR,
including all interactions with the server. You can also enable debug
with the C<REDIS_DEBUG> environment variable.
=head2 Connection Handling
=head3 quit
Closes the connection to the server. The C<quit> method does not support
pipelined operation.
=head3 ping
$r->ping || die "no server?";
The C<ping> method does not support pipelined operation.
=head2 Pipeline management
=head3 wait_all_responses
Waits until all pending pipelined responses have been received, and invokes
the pipeline callback for each one. See L</PIPELINING>.
=head2 Transaction-handling commands
B<Warning:> the behaviour of these commands when combined with
pipelining is still under discussion, and you should B<NOT> use them at
the same time just now.
You can L<follow the discussion to see the open issues with this|>.
=head3 multi
=head3 discard
=head3 exec
my @individual_replies = $r->exec;
C<exec> has special behaviour when run in a pipeline: the C<$reply> argument
to the pipeline callback is an array ref whose elements are themselves
C<[$reply, $error]> pairs. This means that you can accurately detect errors
yielded by any command in the transaction, and without any exceptions being
=head2 Commands operating on string values
=head3 set
$r->set( foo => 'bar' );
$r->setnx( foo => 42 );
=head3 get
my $value = $r->get( 'foo' );
=head3 mget
my @values = $r->mget( 'foo', 'bar', 'baz' );
=head3 incr
$r->incrby('tripplets', 3);
=head3 decr
$r->decrby('tripplets', 3);
=head3 exists
$r->exists( 'key' ) && print "got key!";
=head3 del
$r->del( 'key' ) || warn "key doesn't exist";
=head3 type
$r->type( 'key' ); # = string
=head2 Commands operating on the key space
=head3 keys
my @keys = $r->keys( '*glob_pattern*' );
my $keys = $r->keys( '*glob_pattern*' ); # count of matching keys
Note that synchronous C<keys> calls in a scalar context return the number of
matching keys (not an array ref of matching keys as you might expect). This
does not apply in pipelined mode: assuming the server returns a list of
keys, as expected, it is always passed to the pipeline callback as an array
=head3 randomkey
my $key = $r->randomkey;
=head3 rename
my $ok = $r->rename( 'old-key', 'new-key', $new );
=head3 dbsize
my $nr_keys = $r->dbsize;
=head2 Commands operating on lists
See also L<Redis::List> for tie interface.
=head3 rpush
$r->rpush( $key, $value );
=head3 lpush
$r->lpush( $key, $value );
=head3 llen
$r->llen( $key );
=head3 lrange
my @list = $r->lrange( $key, $start, $end );
=head3 ltrim
my $ok = $r->ltrim( $key, $start, $end );
=head3 lindex
$r->lindex( $key, $index );
=head3 lset
$r->lset( $key, $index, $value );
=head3 lrem
my $modified_count = $r->lrem( $key, $count, $value );
=head3 lpop
my $value = $r->lpop( $key );
=head3 rpop
my $value = $r->rpop( $key );
=head2 Commands operating on sets
=head3 sadd
my $ok = $r->sadd( $key, $member );
=head3 scard
my $n_elements = $r->scard( $key );
=head3 sdiff
my @elements = $r->sdiff( $key1, $key2, ... );
my $elements = $r->sdiff( $key1, $key2, ... ); # ARRAY ref
=head3 sdiffstore
my $ok = $r->sdiffstore( $dstkey, $key1, $key2, ... );
=head3 sinter
my @elements = $r->sinter( $key1, $key2, ... );
my $elements = $r->sinter( $key1, $key2, ... ); # ARRAY ref
=head3 sinterstore
my $ok = $r->sinterstore( $dstkey, $key1, $key2, ... );
=head3 sismember
my $bool = $r->sismember( $key, $member );
=head3 smembers
my @elements = $r->smembers( $key );
my $elements = $r->smembers( $key ); # ARRAY ref
=head3 smove
my $ok = $r->smove( $srckey, $dstkey, $element );
=head3 spop
my $element = $r->spop( $key );
=head3 spop
my $element = $r->srandmember( $key );
=head3 srem
$r->srem( $key, $member );
=head3 sunion
my @elements = $r->sunion( $key1, $key2, ... );
my $elements = $r->sunion( $key1, $key2, ... ); # ARRAY ref
=head3 sunionstore
my $ok = $r->sunionstore( $dstkey, $key1, $key2, ... );
=head2 Sorting
=head3 sort
$r->sort("key BY pattern LIMIT start end GET pattern ASC|DESC ALPHA');
=head2 Publish/Subscribe commands
When one of L</subscribe> or L</psubscribe> is used, the Redis object
will enter I<PubSub> mode. When in I<PubSub> mode only commands in this
section, plus L</quit>, will be accepted.
If you plan on using PubSub and other Redis functions, you should
use two Redis objects, one dedicated to PubSub and the other for
regular commands.
All Pub/Sub commands receive a callback as the last parameter. This callback receives three arguments:
=over 4
=item *
The published message.
=item *
The topic over which the message was sent.
=item *
The subscribed topic that matched the topic for the message. With
L</subscribe> these last two are the same, always. But with
L</psubscribe>, this parameter tells you the pattern that matched.
See the L<Pub/Sub notes|> for more
information about the messages you will receive on your callbacks after
each L</subscribe>, L</unsubscribe>, L</psubscribe> and
=head3 publish
$r->publish($topic, $message);
Publishes the C<< $message >> to the C<< $topic >>.
=head3 subscribe
sub {
my ($message, $topic, $subscribed_topic) = @_;
Subscribe one or more topics. Messages published into one of them will
be received by Redis, and the specificed callback will be executed.
=head3 unsubscribe
$r->unsubscribe(@topic_list, sub { my ($m, $t, $s) = @_; ... });
Stops receiving messages for all the topics in C<@topic_list>.
=head3 psubscribe
my @topic_matches = ('prefix1.*', 'prefix2.*');
$r->psubscribe(@topic_matches, sub { my ($m, $t, $s) = @_; ... });
Subscribes a pattern of topics. All messages to topics that match the
pattern will be delivered to the callback.
=head3 punsubscribe
my @topic_matches = ('prefix1.*', 'prefix2.*');
$r->punsubscribe(@topic_matches, sub { my ($m, $t, $s) = @_; ... });
Stops receiving messages for all the topics pattern matches in C<@topic_list>.
=head3 is_subscriber
if ($r->is_subscriber) { say "We are in Pub/Sub mode!" }
Returns true if we are in I<Pub/Sub> mode.
=head3 wait_for_messages
my $keep_going = 1; ## Set to false somewhere to leave the loop
my $timeout = 5;
$r->wait_for_messages($timeout) while $keep_going;
Blocks, waits for incoming messages and delivers them to the appropriate callbacks.
Requires a single parameter, the number of seconds to wait for messages.
Use 0 to wait for ever. If a positive non-zero value is used, it will
return after that ammount of seconds without a single notification.
Please note that the timeout is not a commitement to return control to
the caller at most each C<timeout> seconds, but more a idle timeout,
were control will return to the caller if Redis is idle (as in no
messages were received during the timeout period) for more than
C<timeout> seconds.
The L</wait_for_messages> call returns the number of messages processed
during the run.
=head2 Persistence control commands
=head3 save
=head3 bgsave
=head3 lastsave
=head2 Remote server control commands
=head3 info
my $info_hash = $r->info;
The C<info> method is unique in that it decodes the server's response into a
hashref, if possible. This decoding happens in both synchronous and
pipelined modes.
=head3 shutdown
The C<shutdown> method does not support pipelined operation.
=head2 Multiple databases handling commands
=head3 select
$r->select( $dbindex ); # 0 for new clients
=head3 move
$r->move( $key, $dbindex );
=head3 flushdb
=head3 flushall
The following persons contributed to this project (alphabetical order):
=over 4
=item *
Aaron Crane (pipelining and AUTOLOAD caching support)
=item *
Dirk Vleugels
=item *
Flavio Poletti
=item *
Jeremy Zawodny
=item *
sunnavy at
=item *
Thiago Berlitz Rondon
=item *
Ulrich Habel
Jump to Line
Something went wrong with that request. Please try again.