Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Support for pipelining #10

Merged
merged 18 commits into from

3 participants

@arc
arc commented

See also the existing issue #3.

This pull request provides minimal support for pipelining. The API looks like this:

my @responses = $redis->pipeline(sub {
    # many requests here
});
# Now @responses are the gathered responses, including unthrown errors.

The "unthrown errors" bit seems important to me: even if one command yielded an error status, you can still look at the responses to all the other commands.

My measurements are noisier than I'd prefer, but for batches of 1000 requests with status-only replies, I see execution time in pipelined mode about 60% or 70% of the execution time in synchronous mode. Put another way: amortised per-request time on a fairly old Linux machine, running an old, threaded vendor Perl goes from ≥64 µs (with encoding => undef) to ≥41 µs. Combining pipelining with my AUTOLOAD cache code from pull request #9 reduces the minimum time to 36 µs.

There's also an additional commit which slightly improves transaction handling; it's implemented using pipelining's ability to suppress the exceptions which are ordinarily raised for error statuses, so again, this lets you look at the responses to all the commands which did succeed. (This feature is the itch I'm really scratching; from my point of view, the pipelining support is less important, though the speedup is pleasing nonetheless.)

My APIs for both pipelining and transaction execution are to some extent straw men; if you think there's a better approach, I'd love to hear your ideas, and I'd be happy to rework my changes to accommodate them.

@melo
Owner

Hi,

I mentioned in the previous release notes posted on the Redis ML that the next release would include pipeline so I thank you for tackling the issue.

I'll look at the internals of your implementation and reuse as most as possible, but the API I want to implement doesn't require a pipeline() method nor a special mode.

My current design is this: to enable pipeline, add a coderef callback as the last parameter on any API call.

That is it. If Redis sees the codeRef as the last parameter, it sends the command to the servers, sticks the coderef in a internal queue, and returns. The response parser, when a full response is detected, takes the first item from that queue and calls it with the response. Any exceptions detected can be send in a second argument.

This allows you to mix pipeline and non-pipelined commands. You could:

$redis->incr('i', sub { ... });
$redis->get('a', sub { ... });
$redis->get('b');

This would pipeline the first two, send the third, and then way for the answer to the third command. This would in effect make sure all the previous command responses would complete.

A special $redis->wait_all_responses would allow you to make sure all responses were accounted for.

A side-benefit of this API would be to enable support for MULTI/EXEC easily, given that in those sequences, the answer is also delayed.

What do you think?

@arc
arc commented

Hi, Pedro. I hadn't realised you were announcing releases on the Redis mailing list — I'll clearly have to subscribe.

Your API proposal sounds good to me. I'll see what I can come up with by way of an implementation, and submit a new set of changes today or tomorrow.

arc added some commits
@arc arc Factor out new __with_reconnect method fb5f5bd
@arc arc Bug fix: apply reconnect logic to KEYS and INFO 4054b4d
@arc arc Bug fix: forbid PING and SHUTDOWN in SUBSCRIBE mode 2c3b57a
@arc arc Refactor: don't conflate subscription commands with standard commands d8a1856
@arc arc Document scalar-context KEYS return
It's tested for in t/01-basic.t, so I assume it's deliberate.
6a61b12
@arc arc Rename __try_reconnect to __throw_reconnect
Since it never tries to do anything, but always throws an exception, the new
name is more descriptive of what it does.
f859ef4
@arc arc Refactor to use __run_cmd where possible 3b1c2e8
@arc arc Remove final argument in __read_response, __read_response_r
There are no remaining callers that use it.
5cc8b55
@arc arc Always call __read_response_r as a method ba03498
@arc arc Pipelining support
Most command methods now take an optional trailing coderef; if a coderef is
supplied, we don't wait for the command's response, but merely schedule the
coderef to be called once the response has been read.
ad922d7
@arc
arc commented

Hi, Pedro. Sorry that took me a little longer than expected, but I've updated this pull request to use the approach you suggest. (The code from my original pull request is still available as arc@18a81d8 if you want to compare it.)

This new branch builds on my AUTOLOAD-caching branch.

I've included tests, but I haven't yet updated the documentation, because I want to confirm that this looks good to you first. If you like this patch set, I'll add documentation and resubmit.

In this code, QUIT, SHUTDOWN, and PING are always executed synchronously, on the assumption that pipelining them is never going to be useful; but let me know if you disagree, and I'll look at making them pipelinable. For that matter, I'd be happy to tweak any of my changes and resubmit if you'd prefer them to be implemented differently.

Finally, I've also included fixes for a couple of bugs I spotted by inspection while working on this:

  • KEYS and INFO weren't honouring the reconnect setting
  • PING and SHUTDOWN were incorrectly accepted in subscriber mode
@jzawodn

Just wanted to drop in and say I like the direction this is headed. Thanks for the work on this!

@melo
Owner

Hey @arc, excellent stuff!

I'm currently migrating one of my sites to a new server, should be done by Friday. Do you mind waiting for the weekend to merge this?

Again, thanks!

@arc
arc commented

Sure, that's great. I'll work on getting some documentation submitted today or tomorrow.

I'm also making a couple of clarity and performance improvements. This current version has a best-case per-request cost about 15–20 µs (30–50%) longer on my test platform than my original version in arc@18a81d8ea2be8b76de3c9187dae76d9b21553089. That seems to be entirely due to my new __with_reconnect method, which adds a method call and a closure call to the fast path. I'm now looking for a way to remove that additional overhead, without unduly harming maintainability.

arc added some commits
@arc arc Refactor __run_cmd as __queue_cmd
- Takes an additional argument indicating whether nested errors should be
  collected (so callers don't have to manually fix up the relevant data
  structure)

- Renamed to better reflect its purpose
dea5540
@arc arc Add tests for pipelined INFO and KEYS
Since they're the two pipeline-capable commands that have unusual return
values, it seems particularly valuable to test them.
89241f4
@arc arc Factor out new __run_cmd method
Unlike the previous method of the same name, this one actually sends a
request and enqueues a response handler.
8c6f8c8
@arc arc Refactor: inline __queue_cmd in its only caller 9fc61b4
@arc arc Refactor: save one level of closure invocation 9c18872
@arc arc Performance enhancement in non-reconnect mode
If we aren't in reconnect mode, using __with_reconnect adds a method call
and a closure invocation to every request, no matter how trivial.  This
change adds a fast path for directly calling __run_cmd in the three relevant
locations.  In my tests, this saves up to a third of the best-case amortised
per-request execution time.
05a2973
@arc arc Documentation for pipelining 1a9371b
@arc
arc commented

Hi. These changes add documentation, as promised, and match performance with my original pipelining code (at least to within the precision of my measurements). I think the code's also rather cleaner than the slower version.

Let me know if you'd like anything else from me before you merge.

Thanks.

@melo melo merged commit 157cb35 into PerlRedis:master
@melo
Owner

Hey @arc, I've merged your code and will release monday (don't want to do it over the weekend, one of my kids is sick so I ended up with less free time than expected).

The code looks very good, I only have one lingering doubt that I want to discuss with you (and with @jzawodn given that he is listening in :) ). Its about the MULTI() API. Right now, it returns the list of responses for each command we sent.

I was wondering if, when in pipeline() mode, if commands inside MULTI/EXEC are called with a callback I expect those answer will not be present in the exec() return value, correct?

The relevant part of the documentation is this: arc@1a9371b#L0R1016

As I said, I have little time right now, monday morning at the office I'll do a quick test to see if it works like that, as it would be my expected behavior, but I wanted to have your opinion.

Please disregard this if that is the actual current behavior :)

@arc

Hi. Sure, releasing on Monday sounds great, and I hope your kid feels better soon.

As for EXEC, here's more detail on how it works. When nothing's pipelined, MULTI/EXEC work the way they do in 1.926:

$redis->multi;              # +OK
$redis->set(foo => 'bar');  # +QUEUED
$redis->get('baz');         # +QUEUED
$redis->exec;               # ['OK', $value_of_baz]

where, in protocol terms, the response to the EXEC looks like this:

*2
+OK
$12
value of baz

This also means that an exception is thrown if there's an error executing any of the commands in the transaction:

$redis->multi;               # +OK
$redis->set(foo => 'bar');   # +QUEUED
$redis->lpush(foo => 'bar'); # +QUEUED
$redis->exec;                # dies when decoding LPUSH reply

That is, the EXEC itself gets a multi-bulk reply looking like this:

*2
+OK
-ERR Operation against a key holding the wrong kind of value

In the current implementation, using pipelining for the individual commands doesn't affect any of that, only (a) whether we wait for the response before returning, and (b) how the replies in the protocol are provided to the caller. For example:

$redis->multi(sub {});             # ('OK', undef)
$redis->set(foo => 'bar', sub {}); # ('QUEUED', undef)
$redis->get('baz', sub {});        # ('QUEUED', undef)
$redis->exec(sub { ... });         # ([  ['OK', undef],
                                   #     [$value_of_baz, undef]  ],
                                   #  undef)

The MULTI and the individual transactional commands never produce an interesting return, just +OK and +QUEUED respectively; so using a no-op callback to trigger pipelining on them is the obvious approach. But the EXEC produces all the replies, so its callback will (almost always) need to look at the data.

The same holds true when part of the transaction produces an error:

$redis->multi(sub {});               # ('OK', undef)
$redis->set(foo => 'bar', sub {});   # ('QUEUED', undef)
$redis->lpush(foo => 'bar', sub {}); # ('QUEUED', undef)
$redis->exec(sub { ... });           # ([  ['OK', undef],
                                     #     [undef, 'ERR…']  ],
                                     #  undef)

And mixing and matching pipelined and synchronous commands follows the same rules:

$redis->multi(sub {});       # cb:  ('OK', undef)
$redis->set(foo => 'bar');   # ret: 'QUEUED'
$redis->get('baz', sub {});  # cb:  ('QUEUED', undef)
$redis->exec;                # ret: ['OK', $value_of_baz]

I'm not quite sure I understand what you're asking, though. I think you're suggesting it would be better if, for a pipelined in-transaction command, the callback were invoked with the data gathered from the ultimate EXEC, rather than the intermediate placeholder +QUEUED response. That does sound better for users, to be honest, though a little painful to implement. Is that what you had in mind?

@melo
Owner

Yes to the last paragraph, that is what I had in mind.

This morning at the office I'll fix #11 and prepare the release. At that time I'll try and see the difficulties of implementing your last paragraph.

I want to cut a release today, and I don't want to rush this so I'll probably document that for now, during MULTI/EXEC, you cannot use callbacks, and all the responses will be available in the EXEC response.

This allows us more time to see if its feasible to implement it in the way that (IMHO) makes more sense to the end user. The QUEUED responses are important for the client lib, but irrelevant to the programmer, and using a callback to collect the errors later would make more sense to me.

After this release I would also like to use your last comment and paste it into the POD section for the transaction stuff, given that others might have the same doubts about how it all works. :)

Given that I want to fix the test suite to use Test::TCP, and I also want to convert to Dist::Zilla, and I want to rip out the encoding stuff, :) I plan to have at least 2 or 3 releases more before putting 2.0 out there.

Again, thanks for your work.

PS: and yes, the kids are better - classic three days bug

@melo melo referenced this pull request
Open

multi()/exec() + pipelining #17

@melo
Owner

Moved the discussion to a new issue, #17.

@arc

Sounds like a pretty decent plan, and feel free to copy anything I've said here into the documentation.

I've got an idea about how to implement the smarter EXEC handling, but it's got some nasty edge cases, so maybe you'll come up with something better. If you want someone to review anything you work on for #17, I'd gladly take a look.

@melo
Owner

FYI: release 1.950 uploaded to CPAN

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 2, 2012
  1. @arc
Commits on Mar 7, 2012
  1. @arc
  2. @arc
  3. @arc
  4. @arc
  5. @arc

    Document scalar-context KEYS return

    arc authored
    It's tested for in t/01-basic.t, so I assume it's deliberate.
  6. @arc

    Rename __try_reconnect to __throw_reconnect

    arc authored
    Since it never tries to do anything, but always throws an exception, the new
    name is more descriptive of what it does.
  7. @arc
  8. @arc

    Remove final argument in __read_response, __read_response_r

    arc authored
    There are no remaining callers that use it.
  9. @arc
  10. @arc

    Pipelining support

    arc authored
    Most command methods now take an optional trailing coderef; if a coderef is
    supplied, we don't wait for the command's response, but merely schedule the
    coderef to be called once the response has been read.
Commits on Mar 8, 2012
  1. @arc

    Refactor __run_cmd as __queue_cmd

    arc authored
    - Takes an additional argument indicating whether nested errors should be
      collected (so callers don't have to manually fix up the relevant data
      structure)
    
    - Renamed to better reflect its purpose
  2. @arc

    Add tests for pipelined INFO and KEYS

    arc authored
    Since they're the two pipeline-capable commands that have unusual return
    values, it seems particularly valuable to test them.
  3. @arc

    Factor out new __run_cmd method

    arc authored
    Unlike the previous method of the same name, this one actually sends a
    request and enqueues a response handler.
  4. @arc
  5. @arc
  6. @arc

    Performance enhancement in non-reconnect mode

    arc authored
    If we aren't in reconnect mode, using __with_reconnect adds a method call
    and a closure invocation to every request, no matter how trivial.  This
    change adds a fast path for directly calling __run_cmd in the three relevant
    locations.  In my tests, this saves up to a third of the best-case amortised
    per-request execution time.
  7. @arc

    Documentation for pipelining

    arc authored
This page is out of date. Refresh to see the latest.
View
388 lib/Redis.pm
@@ -51,6 +51,19 @@ our $VERSION = '1.926';
$redis->sort('list', 'DESC');
$redis->sort(qw{list LIMIT 0 5 ALPHA DESC});
+ ## Add a coderef argument to run a command in the background
+ $redis->sort(qw{list LIMIT 0 5 ALPHA DESC}, sub {
+ my ($reply, $error) = @_;
+ die "Oops, got an error: $error\n" if defined $error;
+ print "$_\n" for @$reply;
+ });
+ long_computation();
+ $redis->wait_all_responses;
+
+ ## Or run a large batch of commands in a pipeline
+ $redis->hset('h', $_, $hash{$_}, sub {}) for keys %hash;
+ $redis->wait_all_responses;
+
## Publish/Subscribe
$redis->subscribe(
'topic_1',
@@ -205,55 +218,88 @@ sub DESTROY { }
our $AUTOLOAD;
sub AUTOLOAD {
- my $self = shift;
-
my $command = $AUTOLOAD;
$command =~ s/.*://;
+
+ my $method = sub { shift->__std_cmd($command, @_) };
+
+ # Save this method for future calls
+ no strict 'refs';
+ *$AUTOLOAD = $method;
+
+ goto $method;
+}
+
+sub __std_cmd {
+ my $self = shift;
+ my $command = shift;
+
$self->__is_valid_command($command);
- ## Fast path, no reconnect
- return $self->__run_cmd($command, @_) unless $self->{reconnect};
+ my $ret;
+ my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
+
+ # If this is an EXEC command, in pipelined mode, and one of the commands
+ # executed in the transaction yields an error, we must collect all errors
+ # from that command, rather than throwing an exception immediately.
+ my $collect_errors = $cb && uc($command) eq 'EXEC';
+
+ ## Fast path, no reconnect;
+ return $self->__run_cmd($command, $collect_errors, undef, $cb, @_)
+ unless $self->{reconnect};
my @cmd_args = @_;
- return try {
- $self->__run_cmd($command, @cmd_args);
- }
- catch {
+ $self->__with_reconnect(sub {
+ $self->__run_cmd($command, $collect_errors, undef, $cb, @cmd_args);
+ });
+}
+
+sub __with_reconnect {
+ my ($self, $cb) = @_;
+
+ ## Fast path, no reconnect
+ return $cb->() unless $self->{reconnect};
+
+ return &try($cb, catch {
die $_ unless ref($_) eq 'Redis::X::Reconnect';
$self->__connect;
- $self->__run_cmd($command, @cmd_args);
- };
+ $cb->();
+ });
}
sub __run_cmd {
- my $self = shift;
- my $command = shift;
- my $sock = $self->{sock} || $self->__try_reconnect('Not connected to any server');
- my $enc = $self->{encoding};
- my $deb = $self->{debug};
+ my ($self, $command, $collect_errors, $custom_decode, $cb, @args) = @_;
+
+ my $ret;
+ my $wrapper = $cb && $custom_decode ? sub {
+ my ($reply, $error) = @_;
+ $cb->(scalar $custom_decode->($reply), $error);
+ } : $cb || sub {
+ my ($reply, $error) = @_;
+ confess "[$command] $error, " if defined $error;
+ $ret = $reply;
+ };
- ## PubSub commands use a different answer handling
- if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
- $pr = '' unless $pr;
+ $self->__send_command($command, @args);
+ push @{ $self->{queue} }, [$command, $wrapper, $collect_errors];
- my $cb = pop;
- confess("Missing required callback in call to $command(), ")
- unless ref($cb) eq 'CODE';
+ return 1 if $cb;
- my @subs = @_;
- @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
- if $unsub;
- return unless @subs;
+ $self->wait_all_responses;
+ return $custom_decode ? $custom_decode->($ret, !wantarray)
+ : wantarray && ref $ret eq 'ARRAY' ? @$ret : $ret;
+}
- $self->__send_command($command, @subs);
+sub wait_all_responses {
+ my ($self) = @_;
- my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
- return $self->__process_subscription_changes($command, \%cbs);
+ for my $handler (splice @{ $self->{queue} }) {
+ my ($command, $cb, $collect_errors) = @$handler;
+ $cb->($self->__read_response($command, $collect_errors));
}
- $self->__send_command($command, @_);
- return $self->__read_response($command);
+ return;
}
@@ -262,6 +308,10 @@ sub quit {
my ($self) = @_;
return unless $self->{sock};
+ confess "[quit] only works in synchronous mode, "
+ if @_ && ref $_[-1] eq 'CODE';
+
+ $self->wait_all_responses;
$self->__send_command('QUIT');
close(delete $self->{sock}) || confess("Can't close socket: $!");
@@ -270,8 +320,14 @@ sub quit {
sub shutdown {
my ($self) = @_;
+ $self->__is_valid_command('SHUTDOWN');
+
+ confess "[shutdown] only works in synchronous mode, "
+ if @_ && ref $_[-1] eq 'CODE';
+
return unless $self->{sock};
+ $self->wait_all_responses;
$self->__send_command('SHUTDOWN');
close(delete $self->{sock}) || confess("Can't close socket: $!");
@@ -279,46 +335,69 @@ sub shutdown {
}
sub ping {
- my ($self) = @_;
+ my $self = shift;
+ $self->__is_valid_command('PING');
+
+ confess "[ping] only works in synchronous mode, "
+ if @_ && ref $_[-1] eq 'CODE';
+
return unless exists $self->{sock};
- my $reply;
- eval {
- $self->__send_command('PING');
- $reply = $self->__read_response('PING');
- };
- if ($@) {
+ $self->wait_all_responses;
+ return scalar try {
+ $self->__std_cmd('PING');
+ }
+ catch {
close(delete $self->{sock});
return;
- }
-
- return $reply;
+ };
}
sub info {
- my ($self) = @_;
+ my $self = shift;
$self->__is_valid_command('INFO');
- $self->__send_command('INFO');
+ my $custom_decode = sub {
+ my ($reply) = @_;
+ return $reply if !defined $reply || ref $reply;
+ return { map { split(/:/, $_, 2) } split(/\r\n/, $reply) };
+ };
+
+ my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
- my $info = $self->__read_response('INFO');
+ ## Fast path, no reconnect
+ return $self->__run_cmd('INFO', 0, $custom_decode, $cb, @_)
+ unless $self->{reconnect};
- return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
+ my @cmd_args = @_;
+ $self->__with_reconnect(sub {
+ $self->__run_cmd('INFO', 0, $custom_decode, $cb, @cmd_args);
+ });
}
sub keys {
my $self = shift;
$self->__is_valid_command('KEYS');
- $self->__send_command('KEYS', @_);
+ my $custom_decode = sub {
+ my ($reply, $synchronous_scalar) = @_;
- my @keys = $self->__read_response('KEYS', \my $type);
- ## Support redis > 1.26
- return @keys if $type eq '*';
+ ## Support redis <= 1.2.6
+ $reply = [split(/\s/, $reply)] if defined $reply && !ref $reply;
- ## Support redis <= 1.2.6
- return split(/\s/, $keys[0]) if $keys[0];
- return;
+ return ref $reply && ($synchronous_scalar || wantarray) ? @$reply : $reply;
+ };
+
+ my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
+
+ ## Fast path, no reconnect
+ return $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @_)
+ unless $self->{reconnect};
+
+ my @cmd_args = @_;
+ $self->__with_reconnect(sub {
+ $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @cmd_args);
+ });
}
@@ -333,8 +412,9 @@ sub wait_for_messages {
my $count = 0;
while ($s->can_read($timeout)) {
while (__try_read_sock($sock)) {
- my @m = $self->__read_response('WAIT_FOR_MESSAGES');
- $self->__process_pubsub_msg(\@m);
+ my ($reply, $error) = $self->__read_response('WAIT_FOR_MESSAGES');
+ confess "[WAIT_FOR_MESSAGES] $error, " if defined $error;
+ $self->__process_pubsub_msg($reply);
$count++;
}
}
@@ -342,6 +422,39 @@ sub wait_for_messages {
return $count;
}
+sub __subscription_cmd {
+ my $self = shift;
+ my $pr = shift;
+ my $unsub = shift;
+ my $command = shift;
+ my $cb = pop;
+
+ confess("Missing required callback in call to $command(), ")
+ unless ref($cb) eq 'CODE';
+
+ $self->wait_all_responses;
+
+ my @subs = @_;
+ $self->__with_reconnect(sub {
+ $self->__throw_reconnect('Not connected to any server')
+ unless $self->{sock};
+
+ @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
+ if $unsub;
+ return unless @subs;
+
+ $self->__send_command($command, @subs);
+
+ my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
+ return $self->__process_subscription_changes($command, \%cbs);
+ });
+}
+
+sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) }
+sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) }
+sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) }
+sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
+
sub __process_unsubscribe_requests {
my ($self, $cb, $pr, @unsubs) = @_;
my $subs = $self->{subscribers};
@@ -364,21 +477,22 @@ sub __process_subscription_changes {
my $subs = $self->{subscribers};
while (%$expected) {
- my @m = $self->__read_response($cmd);
+ my ($m, $error) = $self->__read_response($cmd);
+ confess "[$cmd] $error, " if defined $error;
## Deal with pending PUBLISH'ed messages
- if ($m[0] =~ /^p?message$/) {
- $self->__process_pubsub_msg(\@m);
+ if ($m->[0] =~ /^p?message$/) {
+ $self->__process_pubsub_msg($m);
next;
}
- my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
- $key .= "message:$m[1]";
+ my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
+ $key .= "message:$m->[1]";
my $cb = delete $expected->{$key};
push @{$subs->{$key}}, $cb unless $unsub;
- $self->{is_subscriber} = $m[2];
+ $self->{is_subscriber} = $m->[2];
}
}
@@ -407,9 +521,8 @@ sub __process_pubsub_msg {
sub __is_valid_command {
my ($self, $cmd) = @_;
- return unless $self->{is_subscriber};
- return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i;
- confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ");
+ confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
+ if $self->{is_subscriber};
}
@@ -418,6 +531,11 @@ sub __connect {
my ($self) = @_;
delete $self->{sock};
+ # Suppose we have at least one command response pending, but we're about
+ # to reconnect. The new connection will never get a response to any of
+ # the pending commands, so delete all those pending responses now.
+ $self->{queue} = [];
+
## Fast path, no reconnect
return $self->__build_sock() unless $self->{reconnect};
@@ -453,7 +571,7 @@ sub __send_command {
my $deb = $self->{debug};
my $sock = $self->{sock}
- || $self->__try_reconnect('Not connected to any server');
+ || $self->__throw_reconnect('Not connected to any server');
warn "[SEND] $cmd ", Dumper([@_]) if $deb;
@@ -467,14 +585,14 @@ sub __send_command {
## Check to see if socket was closed: reconnect on EOF
my $status = __try_read_sock($sock);
- $self->__try_reconnect('Not connected to any server')
+ $self->__throw_reconnect('Not connected to any server')
if defined $status && $status == 0;
## Send command, take care for partial writes
warn "[SEND RAW] $buf" if $deb;
while ($buf) {
my $len = syswrite $sock, $buf, length $buf;
- $self->__try_reconnect("Could not write to Redis server: $!")
+ $self->__throw_reconnect("Could not write to Redis server: $!")
unless $len;
substr $buf, 0, $len, "";
}
@@ -483,55 +601,50 @@ sub __send_command {
}
sub __read_response {
- my ($self, $cmd) = @_;
+ my ($self, $cmd, $collect_errors) = @_;
confess("Not connected to any server") unless $self->{sock};
local $/ = "\r\n";
## no debug => fast path
- return __read_response_r(@_) unless $self->{debug};
+ return $self->__read_response_r($cmd, $collect_errors) unless $self->{debug};
- if (wantarray) {
- my @r = __read_response_r(@_);
- warn "[RECV] $cmd ", Dumper(\@r);
- return @r;
- }
- else {
- my $r = __read_response_r(@_);
- warn "[RECV] $cmd ", Dumper($r);
- return $r;
- }
+ my ($result, $error) = $self->__read_response_r($cmd, $collect_errors);
+ warn "[RECV] $cmd ", Dumper($result, $error) if $self->{debug};
+ return $result, $error;
}
sub __read_response_r {
- my ($self, $command, $type_r) = @_;
+ my ($self, $command, $collect_errors) = @_;
my ($type, $result) = $self->__read_line;
- $$type_r = $type if $type_r;
if ($type eq '-') {
- confess "[$command] $result, ";
+ return undef, $result;
}
- elsif ($type eq '+') {
- return $result;
+ elsif ($type eq '+' || $type eq ':') {
+ return $result, undef;
}
elsif ($type eq '$') {
- return if $result < 0;
- return $self->__read_len($result + 2);
+ return undef, undef if $result < 0;
+ return $self->__read_len($result + 2), undef;
}
elsif ($type eq '*') {
- return if $result < 0;
+ return undef, undef if $result < 0;
my @list;
while ($result--) {
- push @list, scalar($self->__read_response_r($command));
+ my @nested = $self->__read_response_r($command, $collect_errors);
+ if ($collect_errors) {
+ push @list, \@nested;
+ }
+ else {
+ confess "[$command] $nested[1], " if defined $nested[1];
+ push @list, $nested[0];
+ }
}
- return @list if wantarray;
- return \@list;
- }
- elsif ($type eq ':') {
- return $result;
+ return \@list, undef;
}
else {
confess "unknown answer type: $type ($result), ";
@@ -632,7 +745,7 @@ BEGIN {
##########################
# I take exception to that
-sub __try_reconnect {
+sub __throw_reconnect {
my ($self, $m) = @_;
die bless(\$m, 'Redis::X::Reconnect') if $self->{reconnect};
die $m;
@@ -643,16 +756,27 @@ sub __try_reconnect {
__END__
+=head1 Pipeline management
+
+=head2 wait_all_responses
+
+Waits until all pending pipelined responses have been received, and invokes
+the pipeline callback for each one. See L</PIPELINING>.
+
=head1 Connection Handling
=head2 quit
$r->quit;
+The C<quit> method does not support pipelined operation.
+
=head2 ping
$r->ping || die "no server?";
+The C<ping> method does not support pipelined operation.
+
=head1 Commands operating on string values
=head2 set
@@ -698,6 +822,13 @@ __END__
=head2 keys
my @keys = $r->keys( '*glob_pattern*' );
+ my $keys = $r->keys( '*glob_pattern*' ); # count of matching keys
+
+Note that synchronous C<keys> calls in a scalar context return the number of
+matching keys (not an array ref of matching keys as you might expect). This
+does not apply in pipelined mode: assuming the server returns a list of
+keys, as expected, it is always passed to the pipeline callback as an array
+ref.
=head2 randomkey
@@ -860,12 +991,85 @@ See also L<Redis::List> for tie interface.
$r->shutdown;
+The C<shutdown> method does not support pipelined operation.
+
=head1 Remote server control commands
=head2 info
my $info_hash = $r->info;
+The C<info> method is unique in that it decodes the server's response into a
+hashref, if possible. This decoding happens in both synchronous and
+pipelined modes.
+
+=head1 Transaction-handling commands
+
+=head2 multi
+
+ $r->multi;
+
+=head2 discard
+
+ $r->discard;
+
+=head2 exec
+
+ my @individual_replies = $r->exec;
+
+C<exec> has special behaviour when run in a pipeline: the C<$reply> argument
+to the pipeline callback is an array ref whose elements are themselves
+C<[$reply, $error]> pairs. This means that you can accurately detect errors
+yielded by any command in the transaction, and without any exceptions being
+thrown.
+
+
+=head1 PIPELINING
+
+Usually, running a command will wait for a response. However, if you're
+doing large numbers of requests, it can be more efficient to use what Redis
+calls I<pipelining>: send multiple commands to Redis without waiting for a
+response, then wait for the responses that come in.
+
+To use pipelining, add a coderef argument as the last argument to a command
+method call:
+
+ $r->set('foo', 'bar', sub {});
+
+Pending responses to pipelined commands are processed in a single batch, as
+soon as at least one of the following conditions holds:
+
+=over 4
+
+=item *
+
+A non-pipelined (synchronous) command has been sent on the same connection
+
+=item *
+
+A pub/sub subscription command (one of C<subscribe>, C<unsubscribe>,
+C<psubscribe>, or C<punsubscribe>) is about to be sent on the same
+connection.
+
+=item *
+
+The L</wait_all_responses> method is called explicitly.
+
+=back
+
+The coderef you supply to a pipelined command method is invoked once the
+response is available. It takes two arguments, C<$reply> and C<$error>. If
+C<$error> is defined, it contains the text of an error reply sent by the
+Redis server. Otherwise, C<$reply> is the non-error reply. For almost all
+commands, that means it's C<undef>, or a defined but non-reference scalar,
+or an array ref of any of those; but see L</keys>, L</info>, and L</exec>.
+
+Note the contrast with synchronous commands, which throw an exception on
+receipt of an error reply, or return a non-error reply directly.
+
+The fact that pipelined commands never throw an exception can be
+particularly useful for Redis transactions; see L</exec>.
+
=head1 ENCODING
View
4 t/01-basic.t
@@ -92,6 +92,10 @@ ok($@, 'rename to existing key');
ok(my $nr_keys = $o->dbsize, 'dbsize');
+throws_ok sub { $o->lpush('foo', 'bar') },
+ qr/\[lpush\] ERR Operation against a key holding the wrong kind of value,/,
+ 'Error responses throw exception';
+
## Commands operating on lists
View
66 t/02-responses.t 100644 → 100755
@@ -22,67 +22,75 @@ sub r {
## -ERR responses
r('-you must die!!');
-throws_ok sub { $r->__read_response('cmd') }, qr/\[cmd\] you must die!!/,
- 'Error response must throw exception';
+is_deeply([$r->__read_response('cmd')], [undef, 'you must die!!'],
+ 'Error response detected');
## +TEXT responses
my $m;
r('+all your text are belong to us');
-lives_ok sub { $m = $r->__read_response('cmd') }, 'Text response ok';
-is($m, 'all your text are belong to us', '... with the expected message');
+is_deeply([$r->__read_response('cmd')],
+ ['all your text are belong to us', undef],
+ 'Text response ok');
## :NUMBER responses
r(':234');
-lives_ok sub { $m = $r->__read_response('cmd') }, 'Integer response ok';
-is($m, 234, '... with the expected value');
+is_deeply([$r->__read_response('cmd')], [234, undef],
+ 'Integer response ok');
## $SIZE PAYLOAD responses
r('$19', "Redis\r\nis\r\ngreat!\r\n");
-lives_ok sub { $m = $r->__read_response('cmd') }, 'Size+payload response ok';
-is($m, "Redis\r\nis\r\ngreat!\r\n", '... with the expected message');
+is_deeply([$r->__read_response('cmd')], ["Redis\r\nis\r\ngreat!\r\n", undef],
+ 'Size+payload response ok');
r('$0', "");
-lives_ok sub { $m = $r->__read_response('cmd') },
- 'Zero-size+payload response ok';
-is($m, "", '... with the expected message');
+is_deeply([$r->__read_response('cmd')], ['', undef],
+ 'Zero-size+payload response ok');
r('$-1');
-lives_ok sub { $m = $r->__read_response('cmd') },
- 'Negative-size+payload response ok';
-ok(!defined($m), '... with the expected undefined message');
+is_deeply([$r->__read_response('cmd')], [undef, undef],
+ 'Negative-size+payload response ok');
## Multi-bulk responses
my @m;
r('*4', '$5', 'Redis', ':42', '$-1', '+Cool stuff');
-lives_ok sub { @m = $r->__read_response('cmd') },
- 'Simple multi-bulk response ok';
-cmp_deeply(
- \@m,
- ['Redis', 42, undef, 'Cool stuff'],
- '... with the expected list of values'
-);
+cmp_deeply([$r->__read_response('cmd')],
+ [['Redis', 42, undef, 'Cool stuff'], undef],
+ 'Simple multi-bulk response ok');
## Nested Multi-bulk responses
r('*5', '$5', 'Redis', ':42', '*4', ':1', ':2', '$4', 'hope', '*2', ':4',
':5', '$-1', '+Cool stuff');
-lives_ok sub { @m = $r->__read_response('cmd') },
- 'Nested multi-bulk response ok';
cmp_deeply(
- \@m,
- ['Redis', 42, [1, 2, 'hope', [4, 5]], undef, 'Cool stuff'],
- '... with the expected list of values'
+ [$r->__read_response('cmd')],
+ [['Redis', 42, [1, 2, 'hope', [4, 5]], undef, 'Cool stuff'], undef],
+ 'Nested multi-bulk response ok'
);
## Nil multi-bulk responses
r('*-1');
-lives_ok sub { $m = $r->__read_response('blpop') },
- 'Read a NIL multi-bulk response';
-is($m, undef, '... with the expected "undef" value');
+is_deeply([$r->__read_response('cmd')], [undef, undef],
+ 'Read a NIL multi-bulk response');
+
+
+## Multi-bulk responses with nested error
+r('*3', '$5', 'Redis', '-you must die!!', ':42');
+throws_ok sub { $r->__read_response('cmd') },
+ qr/\[cmd\] you must die!!/,
+ 'Nested errors must usually throw exceptions';
+
+r('*3', '$5', 'Redis', '-you must die!!', ':42');
+is_deeply([$r->__read_response('cmd', 1)], [
+ [['Redis', undef],
+ [undef, 'you must die!!'],
+ [42, undef]],
+ undef,
+], 'Nested errors must be collected in collect-errors mode');
+
done_testing();
View
13 t/03-pubsub.t 100644 → 100755
@@ -25,11 +25,16 @@ ok(
is($pub->publish('aa', 'v1'), 0, "No subscribers to 'aa' topic");
+my $db_size = -1;
+$sub->dbsize(sub { $db_size = $_[0] });
+
## Basic pubsub
my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
$sub->subscribe('aa', 'bb', $sub_cb);
is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'");
+is($db_size, 0, 'subscribing processes pending queued commands');
+
is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
cmp_deeply(\%got, {'aa' => 'v1:aa'}, "... for the expected topic, 'aa'");
@@ -116,9 +121,11 @@ is($sub->wait_for_messages(1), 0, '... yep, no messages delivered');
cmp_deeply(\%got, {}, '... and an empty messages recorded set');
is($sub->is_subscriber, 1, 'Still some pending subcriptions active');
-throws_ok sub { $sub->info },
- qr/Cannot use command 'INFO' while in SUBSCRIBE mode/,
- '... still an error to try commands in subscribe mode';
+for my $cmd (qw<ping info keys dbsize shutdown>) {
+ throws_ok sub { $sub->$cmd },
+ qr/Cannot use command '(?i:$cmd)' while in SUBSCRIBE mode/,
+ ".. still an error to try \U$cmd\E while in SUBSCRIBE mode";
+}
$sub->punsubscribe('c*', $psub_cb);
is($sub->is_subscriber, 0, '... but none anymore');
View
100 t/04-pipeline.t
@@ -0,0 +1,100 @@
+#!perl
+
+use warnings;
+use strict;
+use Test::More;
+use Redis;
+use lib 't/tlib';
+use Test::SpawnRedisServer;
+use Test::Exception;
+
+my ($c, $srv) = redis();
+END { $c->() if $c }
+
+ok(my $r = Redis->new(server => $srv), 'connected to our test redis-server');
+
+sub pipeline_ok {
+ my ($desc, @commands) = @_;
+ my (@responses, @expected_responses);
+ for my $cmd (@commands) {
+ my ($method, $args, $expected, $expected_err) = @$cmd;
+ push @expected_responses, [$expected, $expected_err];
+ $r->$method(@$args, sub { push @responses, [@_] });
+ }
+ $r->wait_all_responses;
+
+ # An expected response consisting of a hashref means that any non-empty
+ # hashref should be accepted. But reimplementing is_deeply() sounds like
+ # a pain, so fake it:
+ for my $i (0 .. $#expected_responses) {
+ $expected_responses[$i] = $responses[$i]
+ if ref $expected_responses[$i][0] eq 'HASH'
+ && ref $responses[$i][0] eq 'HASH'
+ && keys %{ $responses[$i][0] };
+ }
+
+ is_deeply(\@responses, \@expected_responses, $desc);
+}
+
+pipeline_ok 'single-command pipeline', (
+ [set => [foo => 'bar'], 'OK'],
+);
+
+pipeline_ok 'pipeline with embedded error', (
+ [set => [clunk => 'eth'], 'OK'],
+ [oops => [], undef, q[ERR unknown command 'OOPS']],
+ [get => ['clunk'], 'eth'],
+);
+
+pipeline_ok 'keys in pipelined mode', (
+ [keys => ['*'], [qw<foo clunk>]],
+ [keys => [], undef, q[ERR wrong number of arguments for 'keys' command]],
+);
+
+pipeline_ok 'info in pipelined mode', (
+ [info => [], {}], # any non-empty hashref
+ [info => ['oops'], undef, q[ERR wrong number of arguments for 'info' command]],
+);
+
+pipeline_ok 'pipeline with multi-bulk reply', (
+ [hmset => [kapow => (a => 1, b => 2, c => 3)], 'OK'],
+ [hmget => [kapow => qw<c b a>], [3, 2, 1]],
+);
+
+pipeline_ok 'large pipeline', (
+ (map { [hset => [zzapp => $_ => -$_], 1] } 1 .. 5000),
+ [hmget => [zzapp => (1 .. 5000)], [reverse -5000 .. -1]],
+ [del => ['zzapp'], 1],
+);
+
+subtest 'synchronous request with pending pipeline' => sub {
+ my $clunk;
+ is($r->get('clunk', sub { $clunk = $_[0] }), 1, 'queue a request');
+ is($r->set('kapow', 'zzapp', sub {}), 1, 'queue another request');
+ is($r->get('kapow'), 'zzapp', 'synchronous request has expected return');
+ is($clunk, 'eth', 'synchronous request processes pending ones');
+};
+
+pipeline_ok 'transaction', (
+ [multi => [], 'OK'],
+ [set => ['clunk' => 'eth'], 'QUEUED'],
+ [rpush => ['clunk' => 'oops'], 'QUEUED'],
+ [get => ['clunk'], 'QUEUED'],
+ [exec => [], [
+ ['OK', undef],
+ [undef, 'ERR Operation against a key holding the wrong kind of value'],
+ ['eth', undef],
+ ]],
+);
+
+subtest 'transaction with error and no pipeline' => sub {
+ is($r->multi, 'OK', 'multi');
+ is($r->set('clunk', 'eth'), 'QUEUED', 'transactional SET');
+ is($r->rpush('clunk', 'oops'), 'QUEUED', 'transactional bad RPUSH');
+ is($r->get('clunk'), 'QUEUED', 'transactional GET');
+ throws_ok sub { $r->exec },
+ qr/\[exec\] ERR Operation against a key holding the wrong kind of value,/,
+ 'synchronous EXEC dies for intervening error';
+};
+
+done_testing();
View
39 t/07-reconnect.t 100644 → 100755
@@ -42,6 +42,45 @@ subtest 'Command without connection or timeout, with reconnect' => sub {
};
+subtest 'Reconnection discards pending commands' => sub {
+ ok(my $r = Redis->new(reconnect => 2, server => $srv),
+ 'connected to our test redis-server');
+
+ my $processed_pending = 0;
+ $r->dbsize(sub { $processed_pending++ });
+
+ ok(close(delete $r->{sock}), 'evilly close connection to the server');
+ ok($r->set(foo => 'bar'), 'send command with reconnect');
+
+ is($processed_pending, 0, 'pending command discarded on reconnect');
+};
+
+
+subtest 'INFO commands with extra logic triggers reconnect' => sub {
+ ok(my $r = Redis->new(reconnect => 2, server => $srv),
+ 'connected to our test redis-server');
+
+ ok($r->quit, 'close connection to the server');
+
+ my $info = $r->info;
+ is(ref $info, 'HASH', 'reconnect on INFO command');
+};
+
+
+subtest 'KEYS commands with extra logic triggers reconnect' => sub {
+ ok(my $r = Redis->new(reconnect => 2, server => $srv),
+ 'connected to our test redis-server');
+
+ ok($r->flushdb, 'delete all keys');
+ ok($r->set(reconnect => $$), 'set known key');
+
+ ok($r->quit, 'close connection to the server');
+
+ my @keys = $r->keys('*');
+ is_deeply(\@keys, ['reconnect'], 'reconnect on KEYS command');
+};
+
+
subtest "Bad commnands don't trigger reconnect" => sub {
ok(my $r = Redis->new(reconnect => 2, server => $srv),
'connected to our test redis-server');
Something went wrong with that request. Please try again.