Browse files

Using RedisDB instead of Redis for backend management with auto recon…

…nect
  • Loading branch information...
1 parent e63f17f commit b9d79cf56f52286aed5a7ad055c8c8e172985999 @diegok committed Jan 4, 2012
Showing with 43 additions and 17 deletions.
  1. +7 −0 Changes
  2. +16 −13 lib/Resque.pm
  3. +1 −1 lib/Resque/Worker.pm
  4. +12 −0 t/02-queue.t
  5. +7 −1 t/03-worker.t
  6. +0 −1 t/lib/Test/FailWorker.pm
  7. +0 −1 t/lib/Test/Worker.pm
View
7 Changes
@@ -1,5 +1,12 @@
{{$NEXT}}
+ - Replaced dependency on Redis for RedisDB
+ - Handle redis re-connection thank's to RedisDB
+ - More tests on workerland
+ - Stop requiring 5.10 on test libs
+
+0.05 2012-01-03 20:30:46 Europe/Madrid
+
- Relaxed regex to catch pid's of running workers
0.04 2011-12-30 22:19:55 Europe/Madrid
View
29 lib/Resque.pm
@@ -4,14 +4,14 @@ use Any::Moose '::Util::TypeConstraints';
# ABSTRACT: Redis-backed library for creating background jobs, placing them on multiple queues, and processing them later.
-use Redis;
+use RedisDB;
use Resque::Job;
use Resque::Worker;
use Resque::Failures;
=head1 SYNOPSIS
-First you create a Resque instance where you configure the L<Redis> backend and then you can
+First you create a Resque instance where you configure the L<RedisDB> backend and then you can
start sending jobs to be done by workers:
use Resque;
@@ -75,22 +75,25 @@ A lot more about Resque can be read on the original blog post: L<http://github.c
=attr redis
Redis instance for this Resque instance.
-Accept a Redis object or string. When a string is
+Accept a L<RedisDB> object or string. When a string is
passed in, it will be used as Redis server argument.
=cut
subtype 'Sugar::Redis'
- => as class_type('Redis');
+ => as class_type('RedisDB');
coerce 'Sugar::Redis'
=> from 'Str'
- => via { Redis->new( server => $_ ) };
+ => via {
+ my ( $host, $port ) = split /:/;
+ RedisDB->new( server => $host, port => $port )
+ };
has redis => (
is => 'ro',
lazy => 1,
coerce => 1,
isa => 'Sugar::Redis',
- default => sub { Redis->new }
+ default => sub { RedisDB->new }
);
=attr namespace
@@ -212,8 +215,8 @@ Returns an array of all known Resque queues.
=cut
sub queues {
my $self = shift;
- my @queues = $self->redis->smembers( $self->key('queues') );
- return wantarray ? @queues : \@queues;
+ my $queues = $self->redis->smembers( $self->key('queues') );
+ return wantarray ? @$queues : $queues;
}
=method remove_queue
@@ -275,7 +278,7 @@ sub mass_dequeue {
$removed += $self->redis->lrem( $queue, 0, $self->new_job($target)->encode );
}
else {
- for my $item ( $self->redis->lrange( $queue, 0, -1 ) ) {
+ for my $item ( @{ $self->redis->lrange( $queue, 0, -1 ) } ) {
if ( $self->new_job( $item )->class eq $target->{class} ) {
$removed += $self->redis->lrem( $queue, 0, $item );
}
@@ -324,8 +327,8 @@ is O(N) for the keyspace, so be careful - this can be slow for big databases.
=cut
sub keys {
my $self = shift;
- my @keys = $self->redis->keys( $self->key('*') );
- return wantarray ? @keys : \@keys;
+ my $keys = $self->redis->keys( $self->key('*') );
+ return wantarray ? @$keys : $keys;
}
=method flush_namespace
@@ -350,8 +353,8 @@ Does the dirty work of fetching a range of items from a Redis list.
sub list_range {
my ( $self, $key, $start, $count ) = @_;
my $stop = $count > 0 ? $start + $count - 1 : $count;
- my @items = $self->redis->lrange( $key, $start, $stop );
- return \@items;
+ my $items = $self->redis->lrange( $key, $start, $stop );
+ return $items;
}
# Used internally to keep track of which queues we've created.
View
2 lib/Resque/Worker.pm
@@ -605,7 +605,7 @@ Returns all worker registered on the backend.
=cut
sub all {
my $self = shift;
- my @w = grep {$_} map { $self->find($_) } $self->redis->smembers( $self->key('workers') );
+ my @w = grep {$_} map { $self->find($_) } @{ $self->redis->smembers( $self->key('workers') ) };
return wantarray ? @w : \@w;
}
View
12 t/02-queue.t
@@ -60,6 +60,12 @@ $r->remove_queue('test');
is( @{$r->queues}, 0, 'There is no queues after remove_queue()');
ok( ! $r->pop('test'), "Removed queue's don't pop()" );
+# mass dequeue on empty set
+is( $r->mass_dequeue({
+ queue => 'test',
+ class => 'OtherTask'
+}), 0, 'dequeue no jobs on non existant queue' );
+
# dequeue
push_jobs($r);
is( $r->size('test'), 2, 'Test queue has two jobs again');
@@ -75,6 +81,12 @@ is( $r->mass_dequeue({
}), 2, 'dequeue two jobs' );
is( $r->size('test'), 1, 'Test queue has one job');
+# mass dequeue on non empty without match
+is( $r->mass_dequeue({
+ queue => 'test',
+ class => 'OtherTask'
+}), 0, 'dequeue no jobs on non existant queue' );
+
sub push_jobs {
my $r = shift;
my $class = shift || 'TestWorker';
View
8 t/03-worker.t
@@ -30,20 +30,26 @@ $r->flush_namespace;
ok( $worker == $worker, 'Worker respond to ==' );
ok( $worker eq $worker, 'Worker respond to eq' );
+
+ is( @{ $worker->all }, 0, 'No workers registered' );
+ $worker->register_worker;
+ is( @{ $worker->all }, 1, 'One worker registered' );
+
ok( ! $worker->reserve, 'Nothing to reserve()' );
push_job($r);
ok( my $job = $worker->reserve, 'reserve() a job' );
is( $job->args->[0], 'bazinga!', 'Is first job in first queue' );
is( $job->queue, 'test2', 'Job object known about queue' );
ok( ! $job->has_worker, 'No worker set on job' );
-
+
$worker->working_on($job);
ok( $job->has_worker, 'Worker set on job after working_on' );
is( $worker->processing->{queue}, 'test2', 'processing() know what worker is doing');
ok( $worker->is_working, 'Worker is working' );
ok( ! $worker->is_idle, 'Worker is not idle' );
is( $worker->perform($job), 'bazinga!', 'Worker can make a job to perform()');
$worker->done_working;
+
ok( !$worker->is_working, 'Worker is not working' );
ok( $worker->is_idle, 'Worker is idle' );
View
1 t/lib/Test/FailWorker.pm
@@ -2,7 +2,6 @@ package # hide from cpan
Test::FailWorker;
use strict;
-use 5.10.1;
sub perform {
my $job = shift;
View
1 t/lib/Test/Worker.pm
@@ -2,7 +2,6 @@ package # hide from cpan
Test::Worker;
use strict;
-use 5.10.1;
sub perform {
my $job = shift;

0 comments on commit b9d79cf

Please sign in to comment.