Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #10 from arc/pipelining

Support for pipelining
  • Loading branch information...
commit 157cb35105e77c1204174a8c8623771d62be340d 2 parents 5c007ba + 1a9371b
@melo melo authored
View
372 lib/Redis.pm
@@ -51,6 +51,19 @@ our $VERSION = '1.926';
$redis->sort('list', 'DESC');
$redis->sort(qw{list LIMIT 0 5 ALPHA DESC});
+ ## Add a coderef argument to run a command in the background
+ $redis->sort(qw{list LIMIT 0 5 ALPHA DESC}, sub {
+ my ($reply, $error) = @_;
+ die "Oops, got an error: $error\n" if defined $error;
+ print "$_\n" for @$reply;
+ });
+ long_computation();
+ $redis->wait_all_responses;
+
+ ## Or run a large batch of commands in a pipeline
+ $redis->hset('h', $_, $hash{$_}, sub {}) for keys %hash;
+ $redis->wait_all_responses;
+
## Publish/Subscribe
$redis->subscribe(
'topic_1',
@@ -223,49 +236,70 @@ sub __std_cmd {
$self->__is_valid_command($command);
- ## Fast path, no reconnect
- return $self->__run_cmd($command, @_) unless $self->{reconnect};
+ my $ret;
+ my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
+
+ # If this is an EXEC command, in pipelined mode, and one of the commands
+ # executed in the transaction yields an error, we must collect all errors
+ # from that command, rather than throwing an exception immediately.
+ my $collect_errors = $cb && uc($command) eq 'EXEC';
+
+ ## Fast path, no reconnect;
+ return $self->__run_cmd($command, $collect_errors, undef, $cb, @_)
+ unless $self->{reconnect};
my @cmd_args = @_;
- return try {
- $self->__run_cmd($command, @cmd_args);
- }
- catch {
+ $self->__with_reconnect(sub {
+ $self->__run_cmd($command, $collect_errors, undef, $cb, @cmd_args);
+ });
+}
+
+sub __with_reconnect {
+ my ($self, $cb) = @_;
+
+ ## Fast path, no reconnect
+ return $cb->() unless $self->{reconnect};
+
+ return &try($cb, catch {
die $_ unless ref($_) eq 'Redis::X::Reconnect';
$self->__connect;
- $self->__run_cmd($command, @cmd_args);
- };
+ $cb->();
+ });
}
sub __run_cmd {
- my $self = shift;
- my $command = shift;
- my $sock = $self->{sock} || $self->__try_reconnect('Not connected to any server');
- my $enc = $self->{encoding};
- my $deb = $self->{debug};
+ my ($self, $command, $collect_errors, $custom_decode, $cb, @args) = @_;
+
+ my $ret;
+ my $wrapper = $cb && $custom_decode ? sub {
+ my ($reply, $error) = @_;
+ $cb->(scalar $custom_decode->($reply), $error);
+ } : $cb || sub {
+ my ($reply, $error) = @_;
+ confess "[$command] $error, " if defined $error;
+ $ret = $reply;
+ };
- ## PubSub commands use a different answer handling
- if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
- $pr = '' unless $pr;
+ $self->__send_command($command, @args);
+ push @{ $self->{queue} }, [$command, $wrapper, $collect_errors];
- my $cb = pop;
- confess("Missing required callback in call to $command(), ")
- unless ref($cb) eq 'CODE';
+ return 1 if $cb;
- my @subs = @_;
- @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
- if $unsub;
- return unless @subs;
+ $self->wait_all_responses;
+ return $custom_decode ? $custom_decode->($ret, !wantarray)
+ : wantarray && ref $ret eq 'ARRAY' ? @$ret : $ret;
+}
- $self->__send_command($command, @subs);
+sub wait_all_responses {
+ my ($self) = @_;
- my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
- return $self->__process_subscription_changes($command, \%cbs);
+ for my $handler (splice @{ $self->{queue} }) {
+ my ($command, $cb, $collect_errors) = @$handler;
+ $cb->($self->__read_response($command, $collect_errors));
}
- $self->__send_command($command, @_);
- return $self->__read_response($command);
+ return;
}
@@ -274,6 +308,10 @@ sub quit {
my ($self) = @_;
return unless $self->{sock};
+ confess "[quit] only works in synchronous mode, "
+ if @_ && ref $_[-1] eq 'CODE';
+
+ $self->wait_all_responses;
$self->__send_command('QUIT');
close(delete $self->{sock}) || confess("Can't close socket: $!");
@@ -282,8 +320,14 @@ sub quit {
sub shutdown {
my ($self) = @_;
+ $self->__is_valid_command('SHUTDOWN');
+
+ confess "[shutdown] only works in synchronous mode, "
+ if @_ && ref $_[-1] eq 'CODE';
+
return unless $self->{sock};
+ $self->wait_all_responses;
$self->__send_command('SHUTDOWN');
close(delete $self->{sock}) || confess("Can't close socket: $!");
@@ -291,46 +335,69 @@ sub shutdown {
}
sub ping {
- my ($self) = @_;
+ my $self = shift;
+ $self->__is_valid_command('PING');
+
+ confess "[ping] only works in synchronous mode, "
+ if @_ && ref $_[-1] eq 'CODE';
+
return unless exists $self->{sock};
- my $reply;
- eval {
- $self->__send_command('PING');
- $reply = $self->__read_response('PING');
- };
- if ($@) {
+ $self->wait_all_responses;
+ return scalar try {
+ $self->__std_cmd('PING');
+ }
+ catch {
close(delete $self->{sock});
return;
- }
-
- return $reply;
+ };
}
sub info {
- my ($self) = @_;
+ my $self = shift;
$self->__is_valid_command('INFO');
- $self->__send_command('INFO');
+ my $custom_decode = sub {
+ my ($reply) = @_;
+ return $reply if !defined $reply || ref $reply;
+ return { map { split(/:/, $_, 2) } split(/\r\n/, $reply) };
+ };
+
+ my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
- my $info = $self->__read_response('INFO');
+ ## Fast path, no reconnect
+ return $self->__run_cmd('INFO', 0, $custom_decode, $cb, @_)
+ unless $self->{reconnect};
- return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
+ my @cmd_args = @_;
+ $self->__with_reconnect(sub {
+ $self->__run_cmd('INFO', 0, $custom_decode, $cb, @cmd_args);
+ });
}
sub keys {
my $self = shift;
$self->__is_valid_command('KEYS');
- $self->__send_command('KEYS', @_);
+ my $custom_decode = sub {
+ my ($reply, $synchronous_scalar) = @_;
- my @keys = $self->__read_response('KEYS', \my $type);
- ## Support redis > 1.26
- return @keys if $type eq '*';
+ ## Support redis <= 1.2.6
+ $reply = [split(/\s/, $reply)] if defined $reply && !ref $reply;
- ## Support redis <= 1.2.6
- return split(/\s/, $keys[0]) if $keys[0];
- return;
+ return ref $reply && ($synchronous_scalar || wantarray) ? @$reply : $reply;
+ };
+
+ my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
+
+ ## Fast path, no reconnect
+ return $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @_)
+ unless $self->{reconnect};
+
+ my @cmd_args = @_;
+ $self->__with_reconnect(sub {
+ $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @cmd_args);
+ });
}
@@ -345,8 +412,9 @@ sub wait_for_messages {
my $count = 0;
while ($s->can_read($timeout)) {
while (__try_read_sock($sock)) {
- my @m = $self->__read_response('WAIT_FOR_MESSAGES');
- $self->__process_pubsub_msg(\@m);
+ my ($reply, $error) = $self->__read_response('WAIT_FOR_MESSAGES');
+ confess "[WAIT_FOR_MESSAGES] $error, " if defined $error;
+ $self->__process_pubsub_msg($reply);
$count++;
}
}
@@ -354,6 +422,39 @@ sub wait_for_messages {
return $count;
}
+sub __subscription_cmd {
+ my $self = shift;
+ my $pr = shift;
+ my $unsub = shift;
+ my $command = shift;
+ my $cb = pop;
+
+ confess("Missing required callback in call to $command(), ")
+ unless ref($cb) eq 'CODE';
+
+ $self->wait_all_responses;
+
+ my @subs = @_;
+ $self->__with_reconnect(sub {
+ $self->__throw_reconnect('Not connected to any server')
+ unless $self->{sock};
+
+ @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
+ if $unsub;
+ return unless @subs;
+
+ $self->__send_command($command, @subs);
+
+ my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
+ return $self->__process_subscription_changes($command, \%cbs);
+ });
+}
+
+sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) }
+sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) }
+sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) }
+sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
+
sub __process_unsubscribe_requests {
my ($self, $cb, $pr, @unsubs) = @_;
my $subs = $self->{subscribers};
@@ -376,21 +477,22 @@ sub __process_subscription_changes {
my $subs = $self->{subscribers};
while (%$expected) {
- my @m = $self->__read_response($cmd);
+ my ($m, $error) = $self->__read_response($cmd);
+ confess "[$cmd] $error, " if defined $error;
## Deal with pending PUBLISH'ed messages
- if ($m[0] =~ /^p?message$/) {
- $self->__process_pubsub_msg(\@m);
+ if ($m->[0] =~ /^p?message$/) {
+ $self->__process_pubsub_msg($m);
next;
}
- my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
- $key .= "message:$m[1]";
+ my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
+ $key .= "message:$m->[1]";
my $cb = delete $expected->{$key};
push @{$subs->{$key}}, $cb unless $unsub;
- $self->{is_subscriber} = $m[2];
+ $self->{is_subscriber} = $m->[2];
}
}
@@ -419,9 +521,8 @@ sub __process_pubsub_msg {
sub __is_valid_command {
my ($self, $cmd) = @_;
- return unless $self->{is_subscriber};
- return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i;
- confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ");
+ confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
+ if $self->{is_subscriber};
}
@@ -430,6 +531,11 @@ sub __connect {
my ($self) = @_;
delete $self->{sock};
+ # Suppose we have at least one command response pending, but we're about
+ # to reconnect. The new connection will never get a response to any of
+ # the pending commands, so delete all those pending responses now.
+ $self->{queue} = [];
+
## Fast path, no reconnect
return $self->__build_sock() unless $self->{reconnect};
@@ -465,7 +571,7 @@ sub __send_command {
my $deb = $self->{debug};
my $sock = $self->{sock}
- || $self->__try_reconnect('Not connected to any server');
+ || $self->__throw_reconnect('Not connected to any server');
warn "[SEND] $cmd ", Dumper([@_]) if $deb;
@@ -479,14 +585,14 @@ sub __send_command {
## Check to see if socket was closed: reconnect on EOF
my $status = __try_read_sock($sock);
- $self->__try_reconnect('Not connected to any server')
+ $self->__throw_reconnect('Not connected to any server')
if defined $status && $status == 0;
## Send command, take care for partial writes
warn "[SEND RAW] $buf" if $deb;
while ($buf) {
my $len = syswrite $sock, $buf, length $buf;
- $self->__try_reconnect("Could not write to Redis server: $!")
+ $self->__throw_reconnect("Could not write to Redis server: $!")
unless $len;
substr $buf, 0, $len, "";
}
@@ -495,55 +601,50 @@ sub __send_command {
}
sub __read_response {
- my ($self, $cmd) = @_;
+ my ($self, $cmd, $collect_errors) = @_;
confess("Not connected to any server") unless $self->{sock};
local $/ = "\r\n";
## no debug => fast path
- return __read_response_r(@_) unless $self->{debug};
+ return $self->__read_response_r($cmd, $collect_errors) unless $self->{debug};
- if (wantarray) {
- my @r = __read_response_r(@_);
- warn "[RECV] $cmd ", Dumper(\@r);
- return @r;
- }
- else {
- my $r = __read_response_r(@_);
- warn "[RECV] $cmd ", Dumper($r);
- return $r;
- }
+ my ($result, $error) = $self->__read_response_r($cmd, $collect_errors);
+ warn "[RECV] $cmd ", Dumper($result, $error) if $self->{debug};
+ return $result, $error;
}
sub __read_response_r {
- my ($self, $command, $type_r) = @_;
+ my ($self, $command, $collect_errors) = @_;
my ($type, $result) = $self->__read_line;
- $$type_r = $type if $type_r;
if ($type eq '-') {
- confess "[$command] $result, ";
+ return undef, $result;
}
- elsif ($type eq '+') {
- return $result;
+ elsif ($type eq '+' || $type eq ':') {
+ return $result, undef;
}
elsif ($type eq '$') {
- return if $result < 0;
- return $self->__read_len($result + 2);
+ return undef, undef if $result < 0;
+ return $self->__read_len($result + 2), undef;
}
elsif ($type eq '*') {
- return if $result < 0;
+ return undef, undef if $result < 0;
my @list;
while ($result--) {
- push @list, scalar($self->__read_response_r($command));
+ my @nested = $self->__read_response_r($command, $collect_errors);
+ if ($collect_errors) {
+ push @list, \@nested;
+ }
+ else {
+ confess "[$command] $nested[1], " if defined $nested[1];
+ push @list, $nested[0];
+ }
}
- return @list if wantarray;
- return \@list;
- }
- elsif ($type eq ':') {
- return $result;
+ return \@list, undef;
}
else {
confess "unknown answer type: $type ($result), ";
@@ -644,7 +745,7 @@ BEGIN {
##########################
# I take exception to that
-sub __try_reconnect {
+sub __throw_reconnect {
my ($self, $m) = @_;
die bless(\$m, 'Redis::X::Reconnect') if $self->{reconnect};
die $m;
@@ -655,16 +756,27 @@ sub __try_reconnect {
__END__
+=head1 Pipeline management
+
+=head2 wait_all_responses
+
+Waits until all pending pipelined responses have been received, and invokes
+the pipeline callback for each one. See L</PIPELINING>.
+
=head1 Connection Handling
=head2 quit
$r->quit;
+The C<quit> method does not support pipelined operation.
+
=head2 ping
$r->ping || die "no server?";
+The C<ping> method does not support pipelined operation.
+
=head1 Commands operating on string values
=head2 set
@@ -710,6 +822,13 @@ __END__
=head2 keys
my @keys = $r->keys( '*glob_pattern*' );
+ my $keys = $r->keys( '*glob_pattern*' ); # count of matching keys
+
+Note that synchronous C<keys> calls in a scalar context return the number of
+matching keys (not an array ref of matching keys as you might expect). This
+does not apply in pipelined mode: assuming the server returns a list of
+keys, as expected, it is always passed to the pipeline callback as an array
+ref.
=head2 randomkey
@@ -872,12 +991,85 @@ See also L<Redis::List> for tie interface.
$r->shutdown;
+The C<shutdown> method does not support pipelined operation.
+
=head1 Remote server control commands
=head2 info
my $info_hash = $r->info;
+The C<info> method is unique in that it decodes the server's response into a
+hashref, if possible. This decoding happens in both synchronous and
+pipelined modes.
+
+=head1 Transaction-handling commands
+
+=head2 multi
+
+ $r->multi;
+
+=head2 discard
+
+ $r->discard;
+
+=head2 exec
+
+ my @individual_replies = $r->exec;
+
+C<exec> has special behaviour when run in a pipeline: the C<$reply> argument
+to the pipeline callback is an array ref whose elements are themselves
+C<[$reply, $error]> pairs. This means that you can accurately detect errors
+yielded by any command in the transaction, and without any exceptions being
+thrown.
+
+
+=head1 PIPELINING
+
+Usually, running a command will wait for a response. However, if you're
+doing large numbers of requests, it can be more efficient to use what Redis
+calls I<pipelining>: send multiple commands to Redis without waiting for a
+response, then wait for the responses that come in.
+
+To use pipelining, add a coderef argument as the last argument to a command
+method call:
+
+ $r->set('foo', 'bar', sub {});
+
+Pending responses to pipelined commands are processed in a single batch, as
+soon as at least one of the following conditions holds:
+
+=over 4
+
+=item *
+
+A non-pipelined (synchronous) command has been sent on the same connection
+
+=item *
+
+A pub/sub subscription command (one of C<subscribe>, C<unsubscribe>,
+C<psubscribe>, or C<punsubscribe>) is about to be sent on the same
+connection.
+
+=item *
+
+The L</wait_all_responses> method is called explicitly.
+
+=back
+
+The coderef you supply to a pipelined command method is invoked once the
+response is available. It takes two arguments, C<$reply> and C<$error>. If
+C<$error> is defined, it contains the text of an error reply sent by the
+Redis server. Otherwise, C<$reply> is the non-error reply. For almost all
+commands, that means it's C<undef>, or a defined but non-reference scalar,
+or an array ref of any of those; but see L</keys>, L</info>, and L</exec>.
+
+Note the contrast with synchronous commands, which throw an exception on
+receipt of an error reply, or return a non-error reply directly.
+
+The fact that pipelined commands never throw an exception can be
+particularly useful for Redis transactions; see L</exec>.
+
=head1 ENCODING
View
4 t/01-basic.t
@@ -92,6 +92,10 @@ ok($@, 'rename to existing key');
ok(my $nr_keys = $o->dbsize, 'dbsize');
+throws_ok sub { $o->lpush('foo', 'bar') },
+ qr/\[lpush\] ERR Operation against a key holding the wrong kind of value,/,
+ 'Error responses throw exception';
+
## Commands operating on lists
View
66 t/02-responses.t 100644 → 100755
@@ -22,67 +22,75 @@ sub r {
## -ERR responses
r('-you must die!!');
-throws_ok sub { $r->__read_response('cmd') }, qr/\[cmd\] you must die!!/,
- 'Error response must throw exception';
+is_deeply([$r->__read_response('cmd')], [undef, 'you must die!!'],
+ 'Error response detected');
## +TEXT responses
my $m;
r('+all your text are belong to us');
-lives_ok sub { $m = $r->__read_response('cmd') }, 'Text response ok';
-is($m, 'all your text are belong to us', '... with the expected message');
+is_deeply([$r->__read_response('cmd')],
+ ['all your text are belong to us', undef],
+ 'Text response ok');
## :NUMBER responses
r(':234');
-lives_ok sub { $m = $r->__read_response('cmd') }, 'Integer response ok';
-is($m, 234, '... with the expected value');
+is_deeply([$r->__read_response('cmd')], [234, undef],
+ 'Integer response ok');
## $SIZE PAYLOAD responses
r('$19', "Redis\r\nis\r\ngreat!\r\n");
-lives_ok sub { $m = $r->__read_response('cmd') }, 'Size+payload response ok';
-is($m, "Redis\r\nis\r\ngreat!\r\n", '... with the expected message');
+is_deeply([$r->__read_response('cmd')], ["Redis\r\nis\r\ngreat!\r\n", undef],
+ 'Size+payload response ok');
r('$0', "");
-lives_ok sub { $m = $r->__read_response('cmd') },
- 'Zero-size+payload response ok';
-is($m, "", '... with the expected message');
+is_deeply([$r->__read_response('cmd')], ['', undef],
+ 'Zero-size+payload response ok');
r('$-1');
-lives_ok sub { $m = $r->__read_response('cmd') },
- 'Negative-size+payload response ok';
-ok(!defined($m), '... with the expected undefined message');
+is_deeply([$r->__read_response('cmd')], [undef, undef],
+ 'Negative-size+payload response ok');
## Multi-bulk responses
my @m;
r('*4', '$5', 'Redis', ':42', '$-1', '+Cool stuff');
-lives_ok sub { @m = $r->__read_response('cmd') },
- 'Simple multi-bulk response ok';
-cmp_deeply(
- \@m,
- ['Redis', 42, undef, 'Cool stuff'],
- '... with the expected list of values'
-);
+cmp_deeply([$r->__read_response('cmd')],
+ [['Redis', 42, undef, 'Cool stuff'], undef],
+ 'Simple multi-bulk response ok');
## Nested Multi-bulk responses
r('*5', '$5', 'Redis', ':42', '*4', ':1', ':2', '$4', 'hope', '*2', ':4',
':5', '$-1', '+Cool stuff');
-lives_ok sub { @m = $r->__read_response('cmd') },
- 'Nested multi-bulk response ok';
cmp_deeply(
- \@m,
- ['Redis', 42, [1, 2, 'hope', [4, 5]], undef, 'Cool stuff'],
- '... with the expected list of values'
+ [$r->__read_response('cmd')],
+ [['Redis', 42, [1, 2, 'hope', [4, 5]], undef, 'Cool stuff'], undef],
+ 'Nested multi-bulk response ok'
);
## Nil multi-bulk responses
r('*-1');
-lives_ok sub { $m = $r->__read_response('blpop') },
- 'Read a NIL multi-bulk response';
-is($m, undef, '... with the expected "undef" value');
+is_deeply([$r->__read_response('cmd')], [undef, undef],
+ 'Read a NIL multi-bulk response');
+
+
+## Multi-bulk responses with nested error
+r('*3', '$5', 'Redis', '-you must die!!', ':42');
+throws_ok sub { $r->__read_response('cmd') },
+ qr/\[cmd\] you must die!!/,
+ 'Nested errors must usually throw exceptions';
+
+r('*3', '$5', 'Redis', '-you must die!!', ':42');
+is_deeply([$r->__read_response('cmd', 1)], [
+ [['Redis', undef],
+ [undef, 'you must die!!'],
+ [42, undef]],
+ undef,
+], 'Nested errors must be collected in collect-errors mode');
+
done_testing();
View
13 t/03-pubsub.t 100644 → 100755
@@ -25,11 +25,16 @@ ok(
is($pub->publish('aa', 'v1'), 0, "No subscribers to 'aa' topic");
+my $db_size = -1;
+$sub->dbsize(sub { $db_size = $_[0] });
+
## Basic pubsub
my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
$sub->subscribe('aa', 'bb', $sub_cb);
is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'");
+is($db_size, 0, 'subscribing processes pending queued commands');
+
is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
cmp_deeply(\%got, {'aa' => 'v1:aa'}, "... for the expected topic, 'aa'");
@@ -116,9 +121,11 @@ is($sub->wait_for_messages(1), 0, '... yep, no messages delivered');
cmp_deeply(\%got, {}, '... and an empty messages recorded set');
is($sub->is_subscriber, 1, 'Still some pending subcriptions active');
-throws_ok sub { $sub->info },
- qr/Cannot use command 'INFO' while in SUBSCRIBE mode/,
- '... still an error to try commands in subscribe mode';
+for my $cmd (qw<ping info keys dbsize shutdown>) {
+ throws_ok sub { $sub->$cmd },
+ qr/Cannot use command '(?i:$cmd)' while in SUBSCRIBE mode/,
+ ".. still an error to try \U$cmd\E while in SUBSCRIBE mode";
+}
$sub->punsubscribe('c*', $psub_cb);
is($sub->is_subscriber, 0, '... but none anymore');
View
100 t/04-pipeline.t
@@ -0,0 +1,100 @@
+#!perl
+
+use warnings;
+use strict;
+use Test::More;
+use Redis;
+use lib 't/tlib';
+use Test::SpawnRedisServer;
+use Test::Exception;
+
+my ($c, $srv) = redis();
+END { $c->() if $c }
+
+ok(my $r = Redis->new(server => $srv), 'connected to our test redis-server');
+
+sub pipeline_ok {
+ my ($desc, @commands) = @_;
+ my (@responses, @expected_responses);
+ for my $cmd (@commands) {
+ my ($method, $args, $expected, $expected_err) = @$cmd;
+ push @expected_responses, [$expected, $expected_err];
+ $r->$method(@$args, sub { push @responses, [@_] });
+ }
+ $r->wait_all_responses;
+
+ # An expected response consisting of a hashref means that any non-empty
+ # hashref should be accepted. But reimplementing is_deeply() sounds like
+ # a pain, so fake it:
+ for my $i (0 .. $#expected_responses) {
+ $expected_responses[$i] = $responses[$i]
+ if ref $expected_responses[$i][0] eq 'HASH'
+ && ref $responses[$i][0] eq 'HASH'
+ && keys %{ $responses[$i][0] };
+ }
+
+ is_deeply(\@responses, \@expected_responses, $desc);
+}
+
+pipeline_ok 'single-command pipeline', (
+ [set => [foo => 'bar'], 'OK'],
+);
+
+pipeline_ok 'pipeline with embedded error', (
+ [set => [clunk => 'eth'], 'OK'],
+ [oops => [], undef, q[ERR unknown command 'OOPS']],
+ [get => ['clunk'], 'eth'],
+);
+
+pipeline_ok 'keys in pipelined mode', (
+ [keys => ['*'], [qw<foo clunk>]],
+ [keys => [], undef, q[ERR wrong number of arguments for 'keys' command]],
+);
+
+pipeline_ok 'info in pipelined mode', (
+ [info => [], {}], # any non-empty hashref
+ [info => ['oops'], undef, q[ERR wrong number of arguments for 'info' command]],
+);
+
+pipeline_ok 'pipeline with multi-bulk reply', (
+ [hmset => [kapow => (a => 1, b => 2, c => 3)], 'OK'],
+ [hmget => [kapow => qw<c b a>], [3, 2, 1]],
+);
+
+pipeline_ok 'large pipeline', (
+ (map { [hset => [zzapp => $_ => -$_], 1] } 1 .. 5000),
+ [hmget => [zzapp => (1 .. 5000)], [reverse -5000 .. -1]],
+ [del => ['zzapp'], 1],
+);
+
+subtest 'synchronous request with pending pipeline' => sub {
+ my $clunk;
+ is($r->get('clunk', sub { $clunk = $_[0] }), 1, 'queue a request');
+ is($r->set('kapow', 'zzapp', sub {}), 1, 'queue another request');
+ is($r->get('kapow'), 'zzapp', 'synchronous request has expected return');
+ is($clunk, 'eth', 'synchronous request processes pending ones');
+};
+
+pipeline_ok 'transaction', (
+ [multi => [], 'OK'],
+ [set => ['clunk' => 'eth'], 'QUEUED'],
+ [rpush => ['clunk' => 'oops'], 'QUEUED'],
+ [get => ['clunk'], 'QUEUED'],
+ [exec => [], [
+ ['OK', undef],
+ [undef, 'ERR Operation against a key holding the wrong kind of value'],
+ ['eth', undef],
+ ]],
+);
+
+subtest 'transaction with error and no pipeline' => sub {
+ is($r->multi, 'OK', 'multi');
+ is($r->set('clunk', 'eth'), 'QUEUED', 'transactional SET');
+ is($r->rpush('clunk', 'oops'), 'QUEUED', 'transactional bad RPUSH');
+ is($r->get('clunk'), 'QUEUED', 'transactional GET');
+ throws_ok sub { $r->exec },
+ qr/\[exec\] ERR Operation against a key holding the wrong kind of value,/,
+ 'synchronous EXEC dies for intervening error';
+};
+
+done_testing();
View
39 t/07-reconnect.t 100644 → 100755
@@ -42,6 +42,45 @@ subtest 'Command without connection or timeout, with reconnect' => sub {
};
+subtest 'Reconnection discards pending commands' => sub {
+ ok(my $r = Redis->new(reconnect => 2, server => $srv),
+ 'connected to our test redis-server');
+
+ my $processed_pending = 0;
+ $r->dbsize(sub { $processed_pending++ });
+
+ ok(close(delete $r->{sock}), 'evilly close connection to the server');
+ ok($r->set(foo => 'bar'), 'send command with reconnect');
+
+ is($processed_pending, 0, 'pending command discarded on reconnect');
+};
+
+
+subtest 'INFO commands with extra logic triggers reconnect' => sub {
+ ok(my $r = Redis->new(reconnect => 2, server => $srv),
+ 'connected to our test redis-server');
+
+ ok($r->quit, 'close connection to the server');
+
+ my $info = $r->info;
+ is(ref $info, 'HASH', 'reconnect on INFO command');
+};
+
+
+subtest 'KEYS commands with extra logic triggers reconnect' => sub {
+ ok(my $r = Redis->new(reconnect => 2, server => $srv),
+ 'connected to our test redis-server');
+
+ ok($r->flushdb, 'delete all keys');
+ ok($r->set(reconnect => $$), 'set known key');
+
+ ok($r->quit, 'close connection to the server');
+
+ my @keys = $r->keys('*');
+ is_deeply(\@keys, ['reconnect'], 'reconnect on KEYS command');
+};
+
+
subtest "Bad commnands don't trigger reconnect" => sub {
ok(my $r = Redis->new(reconnect => 2, server => $srv),
'connected to our test redis-server');
Please sign in to comment.
Something went wrong with that request. Please try again.