Permalink
Browse files

Pipelining support

Most command methods now take an optional trailing coderef; if a coderef is
supplied, we don't wait for the command's response, but merely schedule the
coderef to be called once the response has been read.
  • Loading branch information...
1 parent ba03498 commit ad922d728de69a067608bfd1fcc85af4a900fb26 @arc committed Mar 7, 2012
Showing with 271 additions and 76 deletions.
  1. +132 −47 lib/Redis.pm
  2. +4 −0 t/01-basic.t
  3. +37 −29 t/02-responses.t
  4. +5 −0 t/03-pubsub.t
  5. +79 −0 t/04-pipeline.t
  6. +14 −0 t/07-reconnect.t
View
@@ -223,10 +223,27 @@ sub __std_cmd {
$self->__is_valid_command($command);
+ my $ret;
+ my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
my @cmd_args = @_;
- return $self->__with_reconnect(sub {
- $self->__run_cmd($command, @cmd_args);
+ $self->__with_reconnect(sub {
+ $self->__run_cmd($command, @cmd_args, $cb || sub {
+ my ($reply, $error) = @_;
+ confess "[$command] $error, " if defined $error;
+ $ret = $reply;
+ });
+
+ # If this is an EXEC command, in pipelined mode, and one of the commands
+ # executed in the transaction yields an error, we must collect the error
+ # from that command, rather than throwing an exception immediately.
+ $self->{queue}[-1][2] = 1
+ if $cb && uc($command) eq 'EXEC';
});
+
+ return 1 if $cb;
+
+ $self->wait_all_responses;
+ return wantarray && ref $ret eq 'ARRAY' ? @$ret : $ret;
}
sub __with_reconnect {
@@ -249,9 +266,22 @@ sub __with_reconnect {
sub __run_cmd {
my $self = shift;
my $command = shift;
+ my $cb = pop;
$self->__send_command($command, @_);
- return $self->__read_response($command);
+ push @{ $self->{queue} }, [$command, $cb];
+ return;
+}
+
+sub wait_all_responses {
+ my ($self) = @_;
+
+ for my $handler (splice @{ $self->{queue} }) {
+ my ($command, $cb, $collect_errors) = @$handler;
+ $cb->($self->__read_response($command, $collect_errors));
+ }
+
+ return;
}
@@ -260,6 +290,10 @@ sub quit {
my ($self) = @_;
return unless $self->{sock};
+ confess "[quit] only works in synchronous mode, "
+ if @_ && ref $_[-1] eq 'CODE';
+
+ $self->wait_all_responses;
$self->__send_command('QUIT');
close(delete $self->{sock}) || confess("Can't close socket: $!");
@@ -270,55 +304,101 @@ sub shutdown {
my ($self) = @_;
$self->__is_valid_command('SHUTDOWN');
+ confess "[shutdown] only works in synchronous mode, "
+ if @_ && ref $_[-1] eq 'CODE';
+
return unless $self->{sock};
+ $self->wait_all_responses;
$self->__send_command('SHUTDOWN');
close(delete $self->{sock}) || confess("Can't close socket: $!");
return 1;
}
sub ping {
- my ($self) = @_;
+ my $self = shift;
$self->__is_valid_command('PING');
+ confess "[ping] only works in synchronous mode, "
+ if @_ && ref $_[-1] eq 'CODE';
+
return unless exists $self->{sock};
+ $self->wait_all_responses;
return scalar try {
- $self->__run_cmd('PING');
+ $self->__std_cmd('PING');
}
catch {
close(delete $self->{sock});
return;
};
}
+sub __decode_info {
+ my ($self, $raw) = @_;
+ return @$raw if wantarray && ref $raw eq 'ARRAY'; # very unlikely
+ return { map { split(/:/, $_, 2) } split(/\r\n/, $raw) } if defined $raw;
+ return $raw;
+}
+
sub info {
my ($self) = @_;
$self->__is_valid_command('INFO');
- $self->__with_reconnect(sub {
- my $info = $self->__run_cmd('INFO');
+ my $info;
+ my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
+ my $real_cb = $cb ? sub {
+ my ($reply, $error) = @_;
+ my $decoded = $self->__decode_info($reply);
+ $cb->($decoded, $error);
+ } : sub {
+ my ($reply, $error) = @_;
+ confess "[INFO] $error, " if defined $error;
+ $info = $reply;
+ };
- return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
+ $self->__with_reconnect(sub {
+ $self->__run_cmd('INFO', $real_cb);
});
+
+ return 1 if $cb;
+
+ $self->wait_all_responses;
+ return $self->__decode_info($info);
}
sub keys {
my $self = shift;
$self->__is_valid_command('KEYS');
- my @cmd_args = @_;
- $self->__with_reconnect(sub {
- my $keys = $self->__run_cmd('KEYS', @cmd_args);
+ my $keys;
+ my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
+ my $real_cb = $cb ? sub {
+ my ($reply, $error) = @_;
## Support redis > 1.26
- return @$keys if ref $keys eq 'ARRAY';
+ my $decoded = $reply;
## Support redis <= 1.2.6
- return split(/\s/, $keys) if $keys;
- return;
+ $decoded = [split(/\s/, $reply)] if defined $reply && !ref $reply;
+
+ $cb->($decoded, $error);
+ } : sub {
+ my ($reply, $error) = @_;
+ confess "[KEYS] $error, " if defined $error;
+ $keys = defined $reply && !ref $reply ? [split(/\s/, $reply)] : $reply;
+ };
+
+ my @cmd_args = @_;
+ $self->__with_reconnect(sub {
+ $self->__run_cmd('KEYS', @cmd_args, $real_cb);
});
+
+ return 1 if $cb;
+
+ $self->wait_all_responses;
+ return ref $keys eq 'ARRAY' ? @$keys : $keys;
}
@@ -333,8 +413,9 @@ sub wait_for_messages {
my $count = 0;
while ($s->can_read($timeout)) {
while (__try_read_sock($sock)) {
- my @m = $self->__read_response('WAIT_FOR_MESSAGES');
- $self->__process_pubsub_msg(\@m);
+ my ($reply, $error) = $self->__read_response('WAIT_FOR_MESSAGES');
+ confess "[WAIT_FOR_MESSAGES] $error, " if defined $error;
+ $self->__process_pubsub_msg($reply);
$count++;
}
}
@@ -352,6 +433,8 @@ sub __subscription_cmd {
confess("Missing required callback in call to $command(), ")
unless ref($cb) eq 'CODE';
+ $self->wait_all_responses;
+
my @subs = @_;
$self->__with_reconnect(sub {
$self->__throw_reconnect('Not connected to any server')
@@ -395,21 +478,22 @@ sub __process_subscription_changes {
my $subs = $self->{subscribers};
while (%$expected) {
- my @m = $self->__read_response($cmd);
+ my ($m, $error) = $self->__read_response($cmd);
+ confess "[$cmd] $error, " if defined $error;
## Deal with pending PUBLISH'ed messages
- if ($m[0] =~ /^p?message$/) {
- $self->__process_pubsub_msg(\@m);
+ if ($m->[0] =~ /^p?message$/) {
+ $self->__process_pubsub_msg($m);
next;
}
- my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
- $key .= "message:$m[1]";
+ my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
+ $key .= "message:$m->[1]";
my $cb = delete $expected->{$key};
push @{$subs->{$key}}, $cb unless $unsub;
- $self->{is_subscriber} = $m[2];
+ $self->{is_subscriber} = $m->[2];
}
}
@@ -448,6 +532,11 @@ sub __connect {
my ($self) = @_;
delete $self->{sock};
+ # Suppose we have at least one command response pending, but we're about
+ # to reconnect. The new connection will never get a response to any of
+ # the pending commands, so delete all those pending responses now.
+ $self->{queue} = [];
+
## Fast path, no reconnect
return $self->__build_sock() unless $self->{reconnect};
@@ -513,54 +602,50 @@ sub __send_command {
}
sub __read_response {
- my ($self, $cmd) = @_;
+ my ($self, $cmd, $collect_errors) = @_;
confess("Not connected to any server") unless $self->{sock};
local $/ = "\r\n";
## no debug => fast path
- return $self->__read_response_r($cmd) unless $self->{debug};
+ return $self->__read_response_r($cmd, $collect_errors) unless $self->{debug};
- if (wantarray) {
- my @r = $self->__read_response_r($cmd);
- warn "[RECV] $cmd ", Dumper(\@r);
- return @r;
- }
- else {
- my $r = $self->__read_response_r($cmd);
- warn "[RECV] $cmd ", Dumper($r);
- return $r;
- }
+ my ($result, $error) = $self->__read_response_r($cmd, $collect_errors);
+ warn "[RECV] $cmd ", Dumper($result, $error) if $self->{debug};
+ return $result, $error;
}
sub __read_response_r {
- my ($self, $command) = @_;
+ my ($self, $command, $collect_errors) = @_;
my ($type, $result) = $self->__read_line;
if ($type eq '-') {
- confess "[$command] $result, ";
+ return undef, $result;
}
- elsif ($type eq '+') {
- return $result;
+ elsif ($type eq '+' || $type eq ':') {
+ return $result, undef;
}
elsif ($type eq '$') {
- return if $result < 0;
- return $self->__read_len($result + 2);
+ return undef, undef if $result < 0;
+ return $self->__read_len($result + 2), undef;
}
elsif ($type eq '*') {
- return if $result < 0;
+ return undef, undef if $result < 0;
my @list;
while ($result--) {
- push @list, scalar($self->__read_response_r($command));
+ my @nested = $self->__read_response_r($command, $collect_errors);
+ if ($collect_errors) {
+ push @list, \@nested;
+ }
+ else {
+ confess "[$command] $nested[1], " if defined $nested[1];
+ push @list, $nested[0];
+ }
}
- return @list if wantarray;
- return \@list;
- }
- elsif ($type eq ':') {
- return $result;
+ return \@list, undef;
}
else {
confess "unknown answer type: $type ($result), ";
View
@@ -92,6 +92,10 @@ ok($@, 'rename to existing key');
ok(my $nr_keys = $o->dbsize, 'dbsize');
+throws_ok sub { $o->lpush('foo', 'bar') },
+ qr/\[lpush\] ERR Operation against a key holding the wrong kind of value,/,
+ 'Error responses throw exception';
+
## Commands operating on lists
Oops, something went wrong.

0 comments on commit ad922d7

Please sign in to comment.