Skip to content
This repository

Support for pipelining #10

Merged
merged 18 commits into from about 2 years ago

3 participants

Aaron Crane Pedro Melo Jeremy Zawodny
Aaron Crane
arc commented

See also the existing issue #3.

This pull request provides minimal support for pipelining. The API looks like this:

my @responses = $redis->pipeline(sub {
    # many requests here
});
# Now @responses are the gathered responses, including unthrown errors.

The "unthrown errors" bit seems important to me: even if one command yielded an error status, you can still look at the responses to all the other commands.

My measurements are noisier than I'd prefer, but for batches of 1000 requests with status-only replies, I see execution time in pipelined mode about 60% or 70% of the execution time in synchronous mode. Put another way: amortised per-request time on a fairly old Linux machine, running an old, threaded vendor Perl goes from ≥64 µs (with encoding => undef) to ≥41 µs. Combining pipelining with my AUTOLOAD cache code from pull request #9 reduces the minimum time to 36 µs.

There's also an additional commit which slightly improves transaction handling; it's implemented using pipelining's ability to suppress the exceptions which are ordinarily raised for error statuses, so again, this lets you look at the responses to all the commands which did succeed. (This feature is the itch I'm really scratching; from my point of view, the pipelining support is less important, though the speedup is pleasing nonetheless.)

My APIs for both pipelining and transaction execution are to some extent straw men; if you think there's a better approach, I'd love to hear your ideas, and I'd be happy to rework my changes to accommodate them.

Pedro Melo
Owner
melo commented

Hi,

I mentioned in the previous release notes posted on the Redis ML that the next release would include pipeline so I thank you for tackling the issue.

I'll look at the internals of your implementation and reuse as most as possible, but the API I want to implement doesn't require a pipeline() method nor a special mode.

My current design is this: to enable pipeline, add a coderef callback as the last parameter on any API call.

That is it. If Redis sees the codeRef as the last parameter, it sends the command to the servers, sticks the coderef in a internal queue, and returns. The response parser, when a full response is detected, takes the first item from that queue and calls it with the response. Any exceptions detected can be send in a second argument.

This allows you to mix pipeline and non-pipelined commands. You could:

$redis->incr('i', sub { ... });
$redis->get('a', sub { ... });
$redis->get('b');

This would pipeline the first two, send the third, and then way for the answer to the third command. This would in effect make sure all the previous command responses would complete.

A special $redis->wait_all_responses would allow you to make sure all responses were accounted for.

A side-benefit of this API would be to enable support for MULTI/EXEC easily, given that in those sequences, the answer is also delayed.

What do you think?

Aaron Crane
arc commented

Hi, Pedro. I hadn't realised you were announcing releases on the Redis mailing list — I'll clearly have to subscribe.

Your API proposal sounds good to me. I'll see what I can come up with by way of an implementation, and submit a new set of changes today or tomorrow.

added some commits
Aaron Crane Factor out new __with_reconnect method fb5f5bd
Aaron Crane Bug fix: apply reconnect logic to KEYS and INFO 4054b4d
Aaron Crane Bug fix: forbid PING and SHUTDOWN in SUBSCRIBE mode 2c3b57a
Aaron Crane Refactor: don't conflate subscription commands with standard commands d8a1856
Aaron Crane Document scalar-context KEYS return
It's tested for in t/01-basic.t, so I assume it's deliberate.
6a61b12
Aaron Crane Rename __try_reconnect to __throw_reconnect
Since it never tries to do anything, but always throws an exception, the new
name is more descriptive of what it does.
f859ef4
Aaron Crane Refactor to use __run_cmd where possible 3b1c2e8
Aaron Crane Remove final argument in __read_response, __read_response_r
There are no remaining callers that use it.
5cc8b55
Aaron Crane Always call __read_response_r as a method ba03498
Aaron Crane Pipelining support
Most command methods now take an optional trailing coderef; if a coderef is
supplied, we don't wait for the command's response, but merely schedule the
coderef to be called once the response has been read.
ad922d7
Aaron Crane
arc commented

Hi, Pedro. Sorry that took me a little longer than expected, but I've updated this pull request to use the approach you suggest. (The code from my original pull request is still available as arc@18a81d8 if you want to compare it.)

This new branch builds on my AUTOLOAD-caching branch.

I've included tests, but I haven't yet updated the documentation, because I want to confirm that this looks good to you first. If you like this patch set, I'll add documentation and resubmit.

In this code, QUIT, SHUTDOWN, and PING are always executed synchronously, on the assumption that pipelining them is never going to be useful; but let me know if you disagree, and I'll look at making them pipelinable. For that matter, I'd be happy to tweak any of my changes and resubmit if you'd prefer them to be implemented differently.

Finally, I've also included fixes for a couple of bugs I spotted by inspection while working on this:

  • KEYS and INFO weren't honouring the reconnect setting
  • PING and SHUTDOWN were incorrectly accepted in subscriber mode
Jeremy Zawodny

Just wanted to drop in and say I like the direction this is headed. Thanks for the work on this!

Pedro Melo
Owner
melo commented

Hey @arc, excellent stuff!

I'm currently migrating one of my sites to a new server, should be done by Friday. Do you mind waiting for the weekend to merge this?

Again, thanks!

Aaron Crane
arc commented

Sure, that's great. I'll work on getting some documentation submitted today or tomorrow.

I'm also making a couple of clarity and performance improvements. This current version has a best-case per-request cost about 15–20 µs (30–50%) longer on my test platform than my original version in arc@18a81d8ea2be8b76de3c9187dae76d9b21553089. That seems to be entirely due to my new __with_reconnect method, which adds a method call and a closure call to the fast path. I'm now looking for a way to remove that additional overhead, without unduly harming maintainability.

added some commits
Aaron Crane Refactor __run_cmd as __queue_cmd
- Takes an additional argument indicating whether nested errors should be
  collected (so callers don't have to manually fix up the relevant data
  structure)

- Renamed to better reflect its purpose
dea5540
Aaron Crane Add tests for pipelined INFO and KEYS
Since they're the two pipeline-capable commands that have unusual return
values, it seems particularly valuable to test them.
89241f4
Aaron Crane Factor out new __run_cmd method
Unlike the previous method of the same name, this one actually sends a
request and enqueues a response handler.
8c6f8c8
Aaron Crane Refactor: inline __queue_cmd in its only caller 9fc61b4
Aaron Crane Refactor: save one level of closure invocation 9c18872
Aaron Crane Performance enhancement in non-reconnect mode
If we aren't in reconnect mode, using __with_reconnect adds a method call
and a closure invocation to every request, no matter how trivial.  This
change adds a fast path for directly calling __run_cmd in the three relevant
locations.  In my tests, this saves up to a third of the best-case amortised
per-request execution time.
05a2973
Aaron Crane Documentation for pipelining 1a9371b
Aaron Crane
arc commented

Hi. These changes add documentation, as promised, and match performance with my original pipelining code (at least to within the precision of my measurements). I think the code's also rather cleaner than the slower version.

Let me know if you'd like anything else from me before you merge.

Thanks.

Pedro Melo melo merged commit 157cb35 into from
Pedro Melo melo closed this
Pedro Melo
Owner
melo commented

Hey @arc, I've merged your code and will release monday (don't want to do it over the weekend, one of my kids is sick so I ended up with less free time than expected).

The code looks very good, I only have one lingering doubt that I want to discuss with you (and with @jzawodn given that he is listening in :) ). Its about the MULTI() API. Right now, it returns the list of responses for each command we sent.

I was wondering if, when in pipeline() mode, if commands inside MULTI/EXEC are called with a callback I expect those answer will not be present in the exec() return value, correct?

The relevant part of the documentation is this: arc@1a9371b#L0R1016

As I said, I have little time right now, monday morning at the office I'll do a quick test to see if it works like that, as it would be my expected behavior, but I wanted to have your opinion.

Please disregard this if that is the actual current behavior :)

Aaron Crane
arc commented

Hi. Sure, releasing on Monday sounds great, and I hope your kid feels better soon.

As for EXEC, here's more detail on how it works. When nothing's pipelined, MULTI/EXEC work the way they do in 1.926:

$redis->multi;              # +OK
$redis->set(foo => 'bar');  # +QUEUED
$redis->get('baz');         # +QUEUED
$redis->exec;               # ['OK', $value_of_baz]

where, in protocol terms, the response to the EXEC looks like this:

*2
+OK
$12
value of baz

This also means that an exception is thrown if there's an error executing any of the commands in the transaction:

$redis->multi;               # +OK
$redis->set(foo => 'bar');   # +QUEUED
$redis->lpush(foo => 'bar'); # +QUEUED
$redis->exec;                # dies when decoding LPUSH reply

That is, the EXEC itself gets a multi-bulk reply looking like this:

*2
+OK
-ERR Operation against a key holding the wrong kind of value

In the current implementation, using pipelining for the individual commands doesn't affect any of that, only (a) whether we wait for the response before returning, and (b) how the replies in the protocol are provided to the caller. For example:

$redis->multi(sub {});             # ('OK', undef)
$redis->set(foo => 'bar', sub {}); # ('QUEUED', undef)
$redis->get('baz', sub {});        # ('QUEUED', undef)
$redis->exec(sub { ... });         # ([  ['OK', undef],
                                   #     [$value_of_baz, undef]  ],
                                   #  undef)

The MULTI and the individual transactional commands never produce an interesting return, just +OK and +QUEUED respectively; so using a no-op callback to trigger pipelining on them is the obvious approach. But the EXEC produces all the replies, so its callback will (almost always) need to look at the data.

The same holds true when part of the transaction produces an error:

$redis->multi(sub {});               # ('OK', undef)
$redis->set(foo => 'bar', sub {});   # ('QUEUED', undef)
$redis->lpush(foo => 'bar', sub {}); # ('QUEUED', undef)
$redis->exec(sub { ... });           # ([  ['OK', undef],
                                     #     [undef, 'ERR…']  ],
                                     #  undef)

And mixing and matching pipelined and synchronous commands follows the same rules:

$redis->multi(sub {});       # cb:  ('OK', undef)
$redis->set(foo => 'bar');   # ret: 'QUEUED'
$redis->get('baz', sub {});  # cb:  ('QUEUED', undef)
$redis->exec;                # ret: ['OK', $value_of_baz]

I'm not quite sure I understand what you're asking, though. I think you're suggesting it would be better if, for a pipelined in-transaction command, the callback were invoked with the data gathered from the ultimate EXEC, rather than the intermediate placeholder +QUEUED response. That does sound better for users, to be honest, though a little painful to implement. Is that what you had in mind?

Pedro Melo
Owner
melo commented

Yes to the last paragraph, that is what I had in mind.

This morning at the office I'll fix #11 and prepare the release. At that time I'll try and see the difficulties of implementing your last paragraph.

I want to cut a release today, and I don't want to rush this so I'll probably document that for now, during MULTI/EXEC, you cannot use callbacks, and all the responses will be available in the EXEC response.

This allows us more time to see if its feasible to implement it in the way that (IMHO) makes more sense to the end user. The QUEUED responses are important for the client lib, but irrelevant to the programmer, and using a callback to collect the errors later would make more sense to me.

After this release I would also like to use your last comment and paste it into the POD section for the transaction stuff, given that others might have the same doubts about how it all works. :)

Given that I want to fix the test suite to use Test::TCP, and I also want to convert to Dist::Zilla, and I want to rip out the encoding stuff, :) I plan to have at least 2 or 3 releases more before putting 2.0 out there.

Again, thanks for your work.

PS: and yes, the kids are better - classic three days bug

Pedro Melo
Owner
melo commented

Moved the discussion to a new issue, #17.

Aaron Crane
arc commented

Sounds like a pretty decent plan, and feel free to copy anything I've said here into the documentation.

I've got an idea about how to implement the smarter EXEC handling, but it's got some nasty edge cases, so maybe you'll come up with something better. If you want someone to review anything you work on for #17, I'd gladly take a look.

Pedro Melo
Owner
melo commented

FYI: release 1.950 uploaded to CPAN

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 18 unique commits by 2 authors.

Mar 02, 2012
Aaron Crane Cache AUTOLOAD-generated methods for future use 4dabb72
Mar 07, 2012
Aaron Crane Factor out new __with_reconnect method fb5f5bd
Aaron Crane Bug fix: apply reconnect logic to KEYS and INFO 4054b4d
Aaron Crane Bug fix: forbid PING and SHUTDOWN in SUBSCRIBE mode 2c3b57a
Aaron Crane Refactor: don't conflate subscription commands with standard commands d8a1856
Aaron Crane Document scalar-context KEYS return
It's tested for in t/01-basic.t, so I assume it's deliberate.
6a61b12
Aaron Crane Rename __try_reconnect to __throw_reconnect
Since it never tries to do anything, but always throws an exception, the new
name is more descriptive of what it does.
f859ef4
Aaron Crane Refactor to use __run_cmd where possible 3b1c2e8
Aaron Crane Remove final argument in __read_response, __read_response_r
There are no remaining callers that use it.
5cc8b55
Aaron Crane Always call __read_response_r as a method ba03498
Aaron Crane Pipelining support
Most command methods now take an optional trailing coderef; if a coderef is
supplied, we don't wait for the command's response, but merely schedule the
coderef to be called once the response has been read.
ad922d7
Mar 08, 2012
Aaron Crane Refactor __run_cmd as __queue_cmd
- Takes an additional argument indicating whether nested errors should be
  collected (so callers don't have to manually fix up the relevant data
  structure)

- Renamed to better reflect its purpose
dea5540
Aaron Crane Add tests for pipelined INFO and KEYS
Since they're the two pipeline-capable commands that have unusual return
values, it seems particularly valuable to test them.
89241f4
Aaron Crane Factor out new __run_cmd method
Unlike the previous method of the same name, this one actually sends a
request and enqueues a response handler.
8c6f8c8
Aaron Crane Refactor: inline __queue_cmd in its only caller 9fc61b4
Aaron Crane Refactor: save one level of closure invocation 9c18872
Aaron Crane Performance enhancement in non-reconnect mode
If we aren't in reconnect mode, using __with_reconnect adds a method call
and a closure invocation to every request, no matter how trivial.  This
change adds a fast path for directly calling __run_cmd in the three relevant
locations.  In my tests, this saves up to a third of the best-case amortised
per-request execution time.
05a2973
Aaron Crane Documentation for pipelining 1a9371b
This page is out of date. Refresh to see the latest.
388  lib/Redis.pm
@@ -51,6 +51,19 @@ our $VERSION = '1.926';
51 51
     $redis->sort('list', 'DESC');
52 52
     $redis->sort(qw{list LIMIT 0 5 ALPHA DESC});
53 53
     
  54
+    ## Add a coderef argument to run a command in the background
  55
+    $redis->sort(qw{list LIMIT 0 5 ALPHA DESC}, sub {
  56
+      my ($reply, $error) = @_;
  57
+      die "Oops, got an error: $error\n" if defined $error;
  58
+      print "$_\n" for @$reply;
  59
+    });
  60
+    long_computation();
  61
+    $redis->wait_all_responses;
  62
+    
  63
+    ## Or run a large batch of commands in a pipeline
  64
+    $redis->hset('h', $_, $hash{$_}, sub {}) for keys %hash;
  65
+    $redis->wait_all_responses;
  66
+    
54 67
     ## Publish/Subscribe
55 68
     $redis->subscribe(
56 69
       'topic_1',
@@ -205,55 +218,88 @@ sub DESTROY { }
205 218
 our $AUTOLOAD;
206 219
 
207 220
 sub AUTOLOAD {
208  
-  my $self = shift;
209  
-
210 221
   my $command = $AUTOLOAD;
211 222
   $command =~ s/.*://;
  223
+
  224
+  my $method = sub { shift->__std_cmd($command, @_) };
  225
+
  226
+  # Save this method for future calls
  227
+  no strict 'refs';
  228
+  *$AUTOLOAD = $method;
  229
+
  230
+  goto $method;
  231
+}
  232
+
  233
+sub __std_cmd {
  234
+  my $self    = shift;
  235
+  my $command = shift;
  236
+
212 237
   $self->__is_valid_command($command);
213 238
 
214  
-  ## Fast path, no reconnect
215  
-  return $self->__run_cmd($command, @_) unless $self->{reconnect};
  239
+  my $ret;
  240
+  my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
  241
+
  242
+  # If this is an EXEC command, in pipelined mode, and one of the commands
  243
+  # executed in the transaction yields an error, we must collect all errors
  244
+  # from that command, rather than throwing an exception immediately.
  245
+  my $collect_errors = $cb && uc($command) eq 'EXEC';
  246
+
  247
+  ## Fast path, no reconnect;
  248
+  return $self->__run_cmd($command, $collect_errors, undef, $cb, @_)
  249
+    unless $self->{reconnect};
216 250
 
217 251
   my @cmd_args = @_;
218  
-  return try {
219  
-    $self->__run_cmd($command, @cmd_args);
220  
-  }
221  
-  catch {
  252
+  $self->__with_reconnect(sub {
  253
+    $self->__run_cmd($command, $collect_errors, undef, $cb, @cmd_args);
  254
+  });
  255
+}
  256
+
  257
+sub __with_reconnect {
  258
+  my ($self, $cb) = @_;
  259
+
  260
+  ## Fast path, no reconnect
  261
+  return $cb->() unless $self->{reconnect};
  262
+
  263
+  return &try($cb, catch {
222 264
     die $_ unless ref($_) eq 'Redis::X::Reconnect';
223 265
 
224 266
     $self->__connect;
225  
-    $self->__run_cmd($command, @cmd_args);
226  
-  };
  267
+    $cb->();
  268
+  });
227 269
 }
228 270
 
229 271
 sub __run_cmd {
230  
-  my $self    = shift;
231  
-  my $command = shift;
232  
-  my $sock    = $self->{sock} || $self->__try_reconnect('Not connected to any server');
233  
-  my $enc     = $self->{encoding};
234  
-  my $deb     = $self->{debug};
  272
+  my ($self, $command, $collect_errors, $custom_decode, $cb, @args) = @_;
  273
+
  274
+  my $ret;
  275
+  my $wrapper = $cb && $custom_decode ? sub {
  276
+    my ($reply, $error) = @_;
  277
+    $cb->(scalar $custom_decode->($reply), $error);
  278
+  } : $cb || sub {
  279
+    my ($reply, $error) = @_;
  280
+    confess "[$command] $error, " if defined $error;
  281
+    $ret = $reply;
  282
+  };
235 283
 
236  
-  ## PubSub commands use a different answer handling
237  
-  if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
238  
-    $pr = '' unless $pr;
  284
+  $self->__send_command($command, @args);
  285
+  push @{ $self->{queue} }, [$command, $wrapper, $collect_errors];
239 286
 
240  
-    my $cb = pop;
241  
-    confess("Missing required callback in call to $command(), ")
242  
-      unless ref($cb) eq 'CODE';
  287
+  return 1 if $cb;
243 288
 
244  
-    my @subs = @_;
245  
-    @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
246  
-      if $unsub;
247  
-    return unless @subs;
  289
+  $self->wait_all_responses;
  290
+  return $custom_decode ? $custom_decode->($ret, !wantarray)
  291
+       : wantarray && ref $ret eq 'ARRAY' ? @$ret : $ret;
  292
+}
248 293
 
249  
-    $self->__send_command($command, @subs);
  294
+sub wait_all_responses {
  295
+  my ($self) = @_;
250 296
 
251  
-    my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
252  
-    return $self->__process_subscription_changes($command, \%cbs);
  297
+  for my $handler (splice @{ $self->{queue} }) {
  298
+    my ($command, $cb, $collect_errors) = @$handler;
  299
+    $cb->($self->__read_response($command, $collect_errors));
253 300
   }
254 301
 
255  
-  $self->__send_command($command, @_);
256  
-  return $self->__read_response($command);
  302
+  return;
257 303
 }
258 304
 
259 305
 
@@ -262,6 +308,10 @@ sub quit {
262 308
   my ($self) = @_;
263 309
   return unless $self->{sock};
264 310
 
  311
+  confess "[quit] only works in synchronous mode, "
  312
+      if @_ && ref $_[-1] eq 'CODE';
  313
+
  314
+  $self->wait_all_responses;
265 315
   $self->__send_command('QUIT');
266 316
   close(delete $self->{sock}) || confess("Can't close socket: $!");
267 317
 
@@ -270,8 +320,14 @@ sub quit {
270 320
 
271 321
 sub shutdown {
272 322
   my ($self) = @_;
  323
+  $self->__is_valid_command('SHUTDOWN');
  324
+
  325
+  confess "[shutdown] only works in synchronous mode, "
  326
+      if @_ && ref $_[-1] eq 'CODE';
  327
+
273 328
   return unless $self->{sock};
274 329
 
  330
+  $self->wait_all_responses;
275 331
   $self->__send_command('SHUTDOWN');
276 332
   close(delete $self->{sock}) || confess("Can't close socket: $!");
277 333
 
@@ -279,46 +335,69 @@ sub shutdown {
279 335
 }
280 336
 
281 337
 sub ping {
282  
-  my ($self) = @_;
  338
+  my $self = shift;
  339
+  $self->__is_valid_command('PING');
  340
+
  341
+  confess "[ping] only works in synchronous mode, "
  342
+      if @_ && ref $_[-1] eq 'CODE';
  343
+
283 344
   return unless exists $self->{sock};
284 345
 
285  
-  my $reply;
286  
-  eval {
287  
-    $self->__send_command('PING');
288  
-    $reply = $self->__read_response('PING');
289  
-  };
290  
-  if ($@) {
  346
+  $self->wait_all_responses;
  347
+  return scalar try {
  348
+    $self->__std_cmd('PING');
  349
+  }
  350
+  catch {
291 351
     close(delete $self->{sock});
292 352
     return;
293  
-  }
294  
-
295  
-  return $reply;
  353
+  };
296 354
 }
297 355
 
298 356
 sub info {
299  
-  my ($self) = @_;
  357
+  my $self = shift;
300 358
   $self->__is_valid_command('INFO');
301 359
 
302  
-  $self->__send_command('INFO');
  360
+  my $custom_decode = sub {
  361
+    my ($reply) = @_;
  362
+    return $reply if !defined $reply || ref $reply;
  363
+    return { map { split(/:/, $_, 2) } split(/\r\n/, $reply) };
  364
+  };
  365
+
  366
+  my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
303 367
 
304  
-  my $info = $self->__read_response('INFO');
  368
+  ## Fast path, no reconnect
  369
+  return $self->__run_cmd('INFO', 0, $custom_decode, $cb, @_)
  370
+    unless $self->{reconnect};
305 371
 
306  
-  return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
  372
+  my @cmd_args = @_;
  373
+  $self->__with_reconnect(sub {
  374
+    $self->__run_cmd('INFO', 0, $custom_decode, $cb, @cmd_args);
  375
+  });
307 376
 }
308 377
 
309 378
 sub keys {
310 379
   my $self = shift;
311 380
   $self->__is_valid_command('KEYS');
312 381
 
313  
-  $self->__send_command('KEYS', @_);
  382
+  my $custom_decode = sub {
  383
+    my ($reply, $synchronous_scalar) = @_;
314 384
 
315  
-  my @keys = $self->__read_response('KEYS', \my $type);
316  
-  ## Support redis > 1.26
317  
-  return @keys if $type eq '*';
  385
+    ## Support redis <= 1.2.6
  386
+    $reply = [split(/\s/, $reply)] if defined $reply && !ref $reply;
318 387
 
319  
-  ## Support redis <= 1.2.6
320  
-  return split(/\s/, $keys[0]) if $keys[0];
321  
-  return;
  388
+    return ref $reply && ($synchronous_scalar || wantarray) ? @$reply : $reply;
  389
+  };
  390
+
  391
+  my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
  392
+
  393
+  ## Fast path, no reconnect
  394
+  return $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @_)
  395
+    unless $self->{reconnect};
  396
+
  397
+  my @cmd_args = @_;
  398
+  $self->__with_reconnect(sub {
  399
+    $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @cmd_args);
  400
+  });
322 401
 }
323 402
 
324 403
 
@@ -333,8 +412,9 @@ sub wait_for_messages {
333 412
   my $count = 0;
334 413
   while ($s->can_read($timeout)) {
335 414
     while (__try_read_sock($sock)) {
336  
-      my @m = $self->__read_response('WAIT_FOR_MESSAGES');
337  
-      $self->__process_pubsub_msg(\@m);
  415
+      my ($reply, $error) = $self->__read_response('WAIT_FOR_MESSAGES');
  416
+      confess "[WAIT_FOR_MESSAGES] $error, " if defined $error;
  417
+      $self->__process_pubsub_msg($reply);
338 418
       $count++;
339 419
     }
340 420
   }
@@ -342,6 +422,39 @@ sub wait_for_messages {
342 422
   return $count;
343 423
 }
344 424
 
  425
+sub __subscription_cmd {
  426
+  my $self    = shift;
  427
+  my $pr      = shift;
  428
+  my $unsub   = shift;
  429
+  my $command = shift;
  430
+  my $cb      = pop;
  431
+
  432
+  confess("Missing required callback in call to $command(), ")
  433
+    unless ref($cb) eq 'CODE';
  434
+
  435
+  $self->wait_all_responses;
  436
+
  437
+  my @subs = @_;
  438
+  $self->__with_reconnect(sub {
  439
+    $self->__throw_reconnect('Not connected to any server')
  440
+      unless $self->{sock};
  441
+
  442
+    @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
  443
+      if $unsub;
  444
+    return unless @subs;
  445
+
  446
+    $self->__send_command($command, @subs);
  447
+
  448
+    my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
  449
+    return $self->__process_subscription_changes($command, \%cbs);
  450
+  });
  451
+}
  452
+
  453
+sub    subscribe { shift->__subscription_cmd('',  0,    subscribe => @_) }
  454
+sub   psubscribe { shift->__subscription_cmd('p', 0,   psubscribe => @_) }
  455
+sub  unsubscribe { shift->__subscription_cmd('',  1,  unsubscribe => @_) }
  456
+sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
  457
+
345 458
 sub __process_unsubscribe_requests {
346 459
   my ($self, $cb, $pr, @unsubs) = @_;
347 460
   my $subs = $self->{subscribers};
@@ -364,21 +477,22 @@ sub __process_subscription_changes {
364 477
   my $subs = $self->{subscribers};
365 478
 
366 479
   while (%$expected) {
367  
-    my @m = $self->__read_response($cmd);
  480
+    my ($m, $error) = $self->__read_response($cmd);
  481
+    confess "[$cmd] $error, " if defined $error;
368 482
 
369 483
     ## Deal with pending PUBLISH'ed messages
370  
-    if ($m[0] =~ /^p?message$/) {
371  
-      $self->__process_pubsub_msg(\@m);
  484
+    if ($m->[0] =~ /^p?message$/) {
  485
+      $self->__process_pubsub_msg($m);
372 486
       next;
373 487
     }
374 488
 
375  
-    my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
376  
-    $key .= "message:$m[1]";
  489
+    my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
  490
+    $key .= "message:$m->[1]";
377 491
     my $cb = delete $expected->{$key};
378 492
 
379 493
     push @{$subs->{$key}}, $cb unless $unsub;
380 494
 
381  
-    $self->{is_subscriber} = $m[2];
  495
+    $self->{is_subscriber} = $m->[2];
382 496
   }
383 497
 }
384 498
 
@@ -407,9 +521,8 @@ sub __process_pubsub_msg {
407 521
 sub __is_valid_command {
408 522
   my ($self, $cmd) = @_;
409 523
 
410  
-  return unless $self->{is_subscriber};
411  
-  return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i;
412  
-  confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ");
  524
+  confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
  525
+    if $self->{is_subscriber};
413 526
 }
414 527
 
415 528
 
@@ -418,6 +531,11 @@ sub __connect {
418 531
   my ($self) = @_;
419 532
   delete $self->{sock};
420 533
 
  534
+  # Suppose we have at least one command response pending, but we're about
  535
+  # to reconnect.  The new connection will never get a response to any of
  536
+  # the pending commands, so delete all those pending responses now.
  537
+  $self->{queue} = [];
  538
+
421 539
   ## Fast path, no reconnect
422 540
   return $self->__build_sock() unless $self->{reconnect};
423 541
 
@@ -453,7 +571,7 @@ sub __send_command {
453 571
   my $deb  = $self->{debug};
454 572
 
455 573
   my $sock = $self->{sock}
456  
-    || $self->__try_reconnect('Not connected to any server');
  574
+    || $self->__throw_reconnect('Not connected to any server');
457 575
 
458 576
   warn "[SEND] $cmd ", Dumper([@_]) if $deb;
459 577
 
@@ -467,14 +585,14 @@ sub __send_command {
467 585
 
468 586
   ## Check to see if socket was closed: reconnect on EOF
469 587
   my $status = __try_read_sock($sock);
470  
-  $self->__try_reconnect('Not connected to any server')
  588
+  $self->__throw_reconnect('Not connected to any server')
471 589
     if defined $status && $status == 0;
472 590
 
473 591
   ## Send command, take care for partial writes
474 592
   warn "[SEND RAW] $buf" if $deb;
475 593
   while ($buf) {
476 594
     my $len = syswrite $sock, $buf, length $buf;
477  
-    $self->__try_reconnect("Could not write to Redis server: $!")
  595
+    $self->__throw_reconnect("Could not write to Redis server: $!")
478 596
       unless $len;
479 597
     substr $buf, 0, $len, "";
480 598
   }
@@ -483,55 +601,50 @@ sub __send_command {
483 601
 }
484 602
 
485 603
 sub __read_response {
486  
-  my ($self, $cmd) = @_;
  604
+  my ($self, $cmd, $collect_errors) = @_;
487 605
 
488 606
   confess("Not connected to any server") unless $self->{sock};
489 607
 
490 608
   local $/ = "\r\n";
491 609
 
492 610
   ## no debug => fast path
493  
-  return __read_response_r(@_) unless $self->{debug};
  611
+  return $self->__read_response_r($cmd, $collect_errors) unless $self->{debug};
494 612
 
495  
-  if (wantarray) {
496  
-    my @r = __read_response_r(@_);
497  
-    warn "[RECV] $cmd ", Dumper(\@r);
498  
-    return @r;
499  
-  }
500  
-  else {
501  
-    my $r = __read_response_r(@_);
502  
-    warn "[RECV] $cmd ", Dumper($r);
503  
-    return $r;
504  
-  }
  613
+  my ($result, $error) = $self->__read_response_r($cmd, $collect_errors);
  614
+  warn "[RECV] $cmd ", Dumper($result, $error) if $self->{debug};
  615
+  return $result, $error;
505 616
 }
506 617
 
507 618
 sub __read_response_r {
508  
-  my ($self, $command, $type_r) = @_;
  619
+  my ($self, $command, $collect_errors) = @_;
509 620
 
510 621
   my ($type, $result) = $self->__read_line;
511  
-  $$type_r = $type if $type_r;
512 622
 
513 623
   if ($type eq '-') {
514  
-    confess "[$command] $result, ";
  624
+    return undef, $result;
515 625
   }
516  
-  elsif ($type eq '+') {
517  
-    return $result;
  626
+  elsif ($type eq '+' || $type eq ':') {
  627
+    return $result, undef;
518 628
   }
519 629
   elsif ($type eq '$') {
520  
-    return if $result < 0;
521  
-    return $self->__read_len($result + 2);
  630
+    return undef, undef if $result < 0;
  631
+    return $self->__read_len($result + 2), undef;
522 632
   }
523 633
   elsif ($type eq '*') {
524  
-    return if $result < 0;
  634
+    return undef, undef if $result < 0;
525 635
 
526 636
     my @list;
527 637
     while ($result--) {
528  
-      push @list, scalar($self->__read_response_r($command));
  638
+      my @nested = $self->__read_response_r($command, $collect_errors);
  639
+      if ($collect_errors) {
  640
+        push @list, \@nested;
  641
+      }
  642
+      else {
  643
+        confess "[$command] $nested[1], " if defined $nested[1];
  644
+        push @list, $nested[0];
  645
+      }
529 646
     }
530  
-    return @list if wantarray;
531  
-    return \@list;
532  
-  }
533  
-  elsif ($type eq ':') {
534  
-    return $result;
  647
+    return \@list, undef;
535 648
   }
536 649
   else {
537 650
     confess "unknown answer type: $type ($result), ";
@@ -632,7 +745,7 @@ BEGIN {
632 745
 ##########################
633 746
 # I take exception to that
634 747
 
635  
-sub __try_reconnect {
  748
+sub __throw_reconnect {
636 749
   my ($self, $m) = @_;
637 750
   die bless(\$m, 'Redis::X::Reconnect') if $self->{reconnect};
638 751
   die $m;
@@ -643,16 +756,27 @@ sub __try_reconnect {
643 756
 
644 757
 __END__
645 758
 
  759
+=head1 Pipeline management
  760
+
  761
+=head2 wait_all_responses
  762
+
  763
+Waits until all pending pipelined responses have been received, and invokes
  764
+the pipeline callback for each one.  See L</PIPELINING>.
  765
+
646 766
 =head1 Connection Handling
647 767
 
648 768
 =head2 quit
649 769
 
650 770
   $r->quit;
651 771
 
  772
+The C<quit> method does not support pipelined operation.
  773
+
652 774
 =head2 ping
653 775
 
654 776
   $r->ping || die "no server?";
655 777
 
  778
+The C<ping> method does not support pipelined operation.
  779
+
656 780
 =head1 Commands operating on string values
657 781
 
658 782
 =head2 set
@@ -698,6 +822,13 @@ __END__
698 822
 =head2 keys
699 823
 
700 824
   my @keys = $r->keys( '*glob_pattern*' );
  825
+  my $keys = $r->keys( '*glob_pattern*' ); # count of matching keys
  826
+
  827
+Note that synchronous C<keys> calls in a scalar context return the number of
  828
+matching keys (not an array ref of matching keys as you might expect).  This
  829
+does not apply in pipelined mode: assuming the server returns a list of
  830
+keys, as expected, it is always passed to the pipeline callback as an array
  831
+ref.
701 832
 
702 833
 =head2 randomkey
703 834
 
@@ -860,12 +991,85 @@ See also L<Redis::List> for tie interface.
860 991
 
861 992
   $r->shutdown;
862 993
 
  994
+The C<shutdown> method does not support pipelined operation.
  995
+
863 996
 =head1 Remote server control commands
864 997
 
865 998
 =head2 info
866 999
 
867 1000
   my $info_hash = $r->info;
868 1001
 
  1002
+The C<info> method is unique in that it decodes the server's response into a
  1003
+hashref, if possible.  This decoding happens in both synchronous and
  1004
+pipelined modes.
  1005
+
  1006
+=head1 Transaction-handling commands
  1007
+
  1008
+=head2 multi
  1009
+
  1010
+  $r->multi;
  1011
+
  1012
+=head2 discard
  1013
+
  1014
+  $r->discard;
  1015
+
  1016
+=head2 exec
  1017
+
  1018
+  my @individual_replies = $r->exec;
  1019
+
  1020
+C<exec> has special behaviour when run in a pipeline: the C<$reply> argument
  1021
+to the pipeline callback is an array ref whose elements are themselves
  1022
+C<[$reply, $error]> pairs.  This means that you can accurately detect errors
  1023
+yielded by any command in the transaction, and without any exceptions being
  1024
+thrown.
  1025
+
  1026
+
  1027
+=head1 PIPELINING
  1028
+
  1029
+Usually, running a command will wait for a response.  However, if you're
  1030
+doing large numbers of requests, it can be more efficient to use what Redis
  1031
+calls I<pipelining>: send multiple commands to Redis without waiting for a
  1032
+response, then wait for the responses that come in.
  1033
+
  1034
+To use pipelining, add a coderef argument as the last argument to a command
  1035
+method call:
  1036
+
  1037
+  $r->set('foo', 'bar', sub {});
  1038
+
  1039
+Pending responses to pipelined commands are processed in a single batch, as
  1040
+soon as at least one of the following conditions holds:
  1041
+
  1042
+=over 4
  1043
+
  1044
+=item *
  1045
+
  1046
+A non-pipelined (synchronous) command has been sent on the same connection
  1047
+
  1048
+=item *
  1049
+
  1050
+A pub/sub subscription command (one of C<subscribe>, C<unsubscribe>,
  1051
+C<psubscribe>, or C<punsubscribe>) is about to be sent on the same
  1052
+connection.
  1053
+
  1054
+=item *
  1055
+
  1056
+The L</wait_all_responses> method is called explicitly.
  1057
+
  1058
+=back
  1059
+
  1060
+The coderef you supply to a pipelined command method is invoked once the
  1061
+response is available.  It takes two arguments, C<$reply> and C<$error>.  If
  1062
+C<$error> is defined, it contains the text of an error reply sent by the
  1063
+Redis server.  Otherwise, C<$reply> is the non-error reply.  For almost all
  1064
+commands, that means it's C<undef>, or a defined but non-reference scalar,
  1065
+or an array ref of any of those; but see L</keys>, L</info>, and L</exec>.
  1066
+
  1067
+Note the contrast with synchronous commands, which throw an exception on
  1068
+receipt of an error reply, or return a non-error reply directly.
  1069
+
  1070
+The fact that pipelined commands never throw an exception can be
  1071
+particularly useful for Redis transactions; see L</exec>.
  1072
+
869 1073
 
870 1074
 =head1 ENCODING
871 1075
 
4  t/01-basic.t
@@ -92,6 +92,10 @@ ok($@, 'rename to existing key');
92 92
 
93 93
 ok(my $nr_keys = $o->dbsize, 'dbsize');
94 94
 
  95
+throws_ok sub { $o->lpush('foo', 'bar') },
  96
+  qr/\[lpush\] ERR Operation against a key holding the wrong kind of value,/,
  97
+  'Error responses throw exception';
  98
+
95 99
 
96 100
 ## Commands operating on lists
97 101
 
66  t/02-responses.t 100644 → 100755
@@ -22,67 +22,75 @@ sub r {
22 22
 
23 23
 ## -ERR responses
24 24
 r('-you must die!!');
25  
-throws_ok sub { $r->__read_response('cmd') }, qr/\[cmd\] you must die!!/,
26  
-  'Error response must throw exception';
  25
+is_deeply([$r->__read_response('cmd')], [undef, 'you must die!!'],
  26
+          'Error response detected');
27 27
 
28 28
 
29 29
 ## +TEXT responses
30 30
 my $m;
31 31
 r('+all your text are belong to us');
32  
-lives_ok sub { $m = $r->__read_response('cmd') }, 'Text response ok';
33  
-is($m, 'all your text are belong to us', '... with the expected message');
  32
+is_deeply([$r->__read_response('cmd')],
  33
+          ['all your text are belong to us', undef],
  34
+          'Text response ok');
34 35
 
35 36
 
36 37
 ## :NUMBER responses
37 38
 r(':234');
38  
-lives_ok sub { $m = $r->__read_response('cmd') }, 'Integer response ok';
39  
-is($m, 234, '... with the expected value');
  39
+is_deeply([$r->__read_response('cmd')], [234, undef],
  40
+          'Integer response ok');
40 41
 
41 42
 
42 43
 ## $SIZE PAYLOAD responses
43 44
 r('$19', "Redis\r\nis\r\ngreat!\r\n");
44  
-lives_ok sub { $m = $r->__read_response('cmd') }, 'Size+payload response ok';
45  
-is($m, "Redis\r\nis\r\ngreat!\r\n", '... with the expected message');
  45
+is_deeply([$r->__read_response('cmd')], ["Redis\r\nis\r\ngreat!\r\n", undef],
  46
+          'Size+payload response ok');
46 47
 
47 48
 r('$0', "");
48  
-lives_ok sub { $m = $r->__read_response('cmd') },
49  
-  'Zero-size+payload response ok';
50  
-is($m, "", '... with the expected message');
  49
+is_deeply([$r->__read_response('cmd')], ['', undef],
  50
+          'Zero-size+payload response ok');
51 51
 
52 52
 r('$-1');
53  
-lives_ok sub { $m = $r->__read_response('cmd') },
54  
-  'Negative-size+payload response ok';
55  
-ok(!defined($m), '... with the expected undefined message');
  53
+is_deeply([$r->__read_response('cmd')], [undef, undef],
  54
+          'Negative-size+payload response ok');
56 55
 
57 56
 
58 57
 ## Multi-bulk responses
59 58
 my @m;
60 59
 r('*4', '$5', 'Redis', ':42', '$-1', '+Cool stuff');
61  
-lives_ok sub { @m = $r->__read_response('cmd') },
62  
-  'Simple multi-bulk response ok';
63  
-cmp_deeply(
64  
-  \@m,
65  
-  ['Redis', 42, undef, 'Cool stuff'],
66  
-  '... with the expected list of values'
67  
-);
  60
+cmp_deeply([$r->__read_response('cmd')],
  61
+           [['Redis', 42, undef, 'Cool stuff'], undef],
  62
+           'Simple multi-bulk response ok');
68 63
 
69 64
 
70 65
 ## Nested Multi-bulk responses
71 66
 r('*5', '$5', 'Redis', ':42', '*4', ':1', ':2', '$4', 'hope', '*2', ':4',
72 67
   ':5', '$-1', '+Cool stuff');
73  
-lives_ok sub { @m = $r->__read_response('cmd') },
74  
-  'Nested multi-bulk response ok';
75 68
 cmp_deeply(
76  
-  \@m,
77  
-  ['Redis', 42, [1, 2, 'hope', [4, 5]], undef, 'Cool stuff'],
78  
-  '... with the expected list of values'
  69
+  [$r->__read_response('cmd')],
  70
+  [['Redis', 42, [1, 2, 'hope', [4, 5]], undef, 'Cool stuff'], undef],
  71
+  'Nested multi-bulk response ok'
79 72
 );
80 73
 
81 74
 
82 75
 ## Nil multi-bulk responses
83 76
 r('*-1');
84  
-lives_ok sub { $m = $r->__read_response('blpop') },
85  
-  'Read a NIL multi-bulk response';
86  
-is($m, undef, '... with the expected "undef" value');
  77
+is_deeply([$r->__read_response('cmd')], [undef, undef],
  78
+          'Read a NIL multi-bulk response');
  79
+
  80
+
  81
+## Multi-bulk responses with nested error
  82
+r('*3', '$5', 'Redis', '-you must die!!', ':42');
  83
+throws_ok sub { $r->__read_response('cmd') },
  84
+  qr/\[cmd\] you must die!!/,
  85
+  'Nested errors must usually throw exceptions';
  86
+
  87
+r('*3', '$5', 'Redis', '-you must die!!', ':42');
  88
+is_deeply([$r->__read_response('cmd', 1)], [
  89
+  [['Redis', undef],
  90
+   [undef, 'you must die!!'],
  91
+   [42, undef]],
  92
+  undef,
  93
+], 'Nested errors must be collected in collect-errors mode');
  94
+
87 95
 
88 96
 done_testing();
13  t/03-pubsub.t 100644 → 100755
@@ -25,11 +25,16 @@ ok(
25 25
 
26 26
 is($pub->publish('aa', 'v1'), 0, "No subscribers to 'aa' topic");
27 27
 
  28
+my $db_size = -1;
  29
+$sub->dbsize(sub { $db_size = $_[0] });
  30
+
28 31
 ## Basic pubsub
29 32
 my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
30 33
 $sub->subscribe('aa', 'bb', $sub_cb);
31 34
 is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'");
32 35
 
  36
+is($db_size, 0, 'subscribing processes pending queued commands');
  37
+
33 38
 is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
34 39
 cmp_deeply(\%got, {'aa' => 'v1:aa'}, "... for the expected topic, 'aa'");
35 40
 
@@ -116,9 +121,11 @@ is($sub->wait_for_messages(1), 0, '... yep, no messages delivered');
116 121
 cmp_deeply(\%got, {}, '... and an empty messages recorded set');
117 122
 
118 123
 is($sub->is_subscriber, 1, 'Still some pending subcriptions active');
119  
-throws_ok sub { $sub->info },
120  
-  qr/Cannot use command 'INFO' while in SUBSCRIBE mode/,
121  
-  '... still an error to try commands in subscribe mode';
  124
+for my $cmd (qw<ping info keys dbsize shutdown>) {
  125
+  throws_ok sub { $sub->$cmd },
  126
+  qr/Cannot use command '(?i:$cmd)' while in SUBSCRIBE mode/,
  127
+  ".. still an error to try \U$cmd\E while in SUBSCRIBE mode";
  128
+}
122 129
 $sub->punsubscribe('c*', $psub_cb);
123 130
 is($sub->is_subscriber, 0, '... but none anymore');
124 131
 
100  t/04-pipeline.t
... ...
@@ -0,0 +1,100 @@
  1
+#!perl
  2
+
  3
+use warnings;
  4
+use strict;
  5
+use Test::More;
  6
+use Redis;
  7
+use lib 't/tlib';
  8
+use Test::SpawnRedisServer;
  9
+use Test::Exception;
  10
+
  11
+my ($c, $srv) = redis();
  12
+END { $c->() if $c }
  13
+
  14
+ok(my $r = Redis->new(server => $srv), 'connected to our test redis-server');
  15
+
  16
+sub pipeline_ok {
  17
+  my ($desc, @commands) = @_;
  18
+  my (@responses, @expected_responses);
  19
+  for my $cmd (@commands) {
  20
+    my ($method, $args, $expected, $expected_err) = @$cmd;
  21
+    push @expected_responses, [$expected, $expected_err];
  22
+    $r->$method(@$args, sub { push @responses, [@_] });
  23
+  }
  24
+  $r->wait_all_responses;
  25
+
  26
+  # An expected response consisting of a hashref means that any non-empty
  27
+  # hashref should be accepted.  But reimplementing is_deeply() sounds like
  28
+  # a pain, so fake it:
  29
+  for my $i (0 .. $#expected_responses) {
  30
+    $expected_responses[$i] = $responses[$i]
  31
+      if ref $expected_responses[$i][0] eq 'HASH'
  32
+      && ref $responses[$i][0] eq 'HASH'
  33
+      && keys %{ $responses[$i][0] };
  34
+  }
  35
+
  36
+  is_deeply(\@responses, \@expected_responses, $desc);
  37
+}
  38
+
  39
+pipeline_ok 'single-command pipeline', (
  40
+  [set => [foo => 'bar'], 'OK'],
  41
+);
  42
+
  43
+pipeline_ok 'pipeline with embedded error', (
  44
+  [set  => [clunk => 'eth'], 'OK'],
  45
+  [oops => [], undef, q[ERR unknown command 'OOPS']],
  46
+  [get  => ['clunk'], 'eth'],
  47
+);
  48
+
  49
+pipeline_ok 'keys in pipelined mode', (
  50
+  [keys => ['*'], [qw<foo clunk>]],
  51
+  [keys => [], undef, q[ERR wrong number of arguments for 'keys' command]],
  52
+);
  53
+
  54
+pipeline_ok 'info in pipelined mode', (
  55
+  [info => [], {}],             # any non-empty hashref
  56
+  [info => ['oops'], undef, q[ERR wrong number of arguments for 'info' command]],
  57
+);
  58
+
  59
+pipeline_ok 'pipeline with multi-bulk reply', (
  60
+  [hmset => [kapow => (a => 1, b => 2, c => 3)], 'OK'],
  61
+  [hmget => [kapow => qw<c b a>], [3, 2, 1]],
  62
+);
  63
+
  64
+pipeline_ok 'large pipeline', (
  65
+  (map { [hset => [zzapp => $_ => -$_], 1] } 1 .. 5000),
  66
+  [hmget => [zzapp => (1 .. 5000)], [reverse -5000 .. -1]],
  67
+  [del => ['zzapp'], 1],
  68
+);
  69
+
  70
+subtest 'synchronous request with pending pipeline' => sub {
  71
+  my $clunk;
  72
+  is($r->get('clunk', sub { $clunk = $_[0] }), 1, 'queue a request');
  73
+  is($r->set('kapow', 'zzapp', sub {}), 1, 'queue another request');
  74
+  is($r->get('kapow'), 'zzapp', 'synchronous request has expected return');
  75
+  is($clunk, 'eth', 'synchronous request processes pending ones');
  76
+};
  77
+
  78
+pipeline_ok 'transaction', (
  79
+  [multi => [],                  'OK'],
  80
+  [set   => ['clunk' => 'eth'],  'QUEUED'],
  81
+  [rpush => ['clunk' => 'oops'], 'QUEUED'],
  82
+  [get   => ['clunk'],           'QUEUED'],
  83
+  [exec  => [], [
  84
+    ['OK', undef],
  85
+    [undef, 'ERR Operation against a key holding the wrong kind of value'],
  86
+    ['eth', undef],
  87
+  ]],
  88
+);
  89
+
  90
+subtest 'transaction with error and no pipeline' => sub {
  91
+  is($r->multi, 'OK', 'multi');
  92
+  is($r->set('clunk', 'eth'), 'QUEUED',  'transactional SET');
  93
+  is($r->rpush('clunk', 'oops'), 'QUEUED', 'transactional bad RPUSH');
  94
+  is($r->get('clunk'), 'QUEUED', 'transactional GET');
  95
+  throws_ok sub { $r->exec },
  96
+    qr/\[exec\] ERR Operation against a key holding the wrong kind of value,/,
  97
+    'synchronous EXEC dies for intervening error';
  98
+};
  99
+
  100
+done_testing();
39  t/07-reconnect.t 100644 → 100755
@@ -42,6 +42,45 @@ subtest 'Command without connection or timeout, with reconnect' => sub {
42 42
 };
43 43
 
44 44
 
  45
+subtest 'Reconnection discards pending commands' => sub {
  46
+  ok(my $r = Redis->new(reconnect => 2, server => $srv),
  47
+     'connected to our test redis-server');
  48
+
  49
+  my $processed_pending = 0;
  50
+  $r->dbsize(sub { $processed_pending++ });
  51
+
  52
+  ok(close(delete $r->{sock}), 'evilly close connection to the server');
  53
+  ok($r->set(foo => 'bar'), 'send command with reconnect');
  54
+
  55
+  is($processed_pending, 0, 'pending command discarded on reconnect');
  56
+};
  57
+
  58
+
  59
+subtest 'INFO commands with extra logic triggers reconnect' => sub {
  60
+  ok(my $r = Redis->new(reconnect => 2, server => $srv),
  61
+     'connected to our test redis-server');
  62
+
  63
+  ok($r->quit, 'close connection to the server');
  64
+
  65
+  my $info = $r->info;
  66
+  is(ref $info, 'HASH', 'reconnect on INFO command');
  67
+};
  68
+
  69
+
  70
+subtest 'KEYS commands with extra logic triggers reconnect' => sub {
  71
+  ok(my $r = Redis->new(reconnect => 2, server => $srv),
  72
+     'connected to our test redis-server');
  73
+
  74
+  ok($r->flushdb, 'delete all keys');
  75
+  ok($r->set(reconnect => $$), 'set known key');
  76
+
  77
+  ok($r->quit, 'close connection to the server');
  78
+
  79
+  my @keys = $r->keys('*');
  80
+  is_deeply(\@keys, ['reconnect'], 'reconnect on KEYS command');
  81
+};
  82
+
  83
+
45 84
 subtest "Bad commnands don't trigger reconnect" => sub {
46 85
   ok(my $r = Redis->new(reconnect => 2, server => $srv),
47 86
     'connected to our test redis-server');
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.