Skip to content
Browse files

Use Time::HiRes to allow floating seconds in worker interval attribute

  • Loading branch information...
1 parent a1fd43e commit 7dee55ea2f350bc861e731aa7caf4e1ff176ba2d @diegok committed Apr 12, 2012
Showing with 36 additions and 24 deletions.
  1. +5 −0 Changes
  2. +31 −24 lib/Resque/Worker.pm
View
5 Changes
@@ -1,5 +1,10 @@
{{$NEXT}}
+ - Worker interval attribute accept floating seconds (Time::HiRes::sleep).
+ - Worker interval attribute is documented now.
+
+0.07 2012-04-03 00:39:06 Europe/Madrid
+
- Back to Redis dependency
- Using reconnect facility of Redis module
- encoding undef as recommended on Redis docs
View
55 lib/Resque/Worker.pm
@@ -6,13 +6,14 @@ use Resque::Stat;
use POSIX ":sys_wait_h";
use Sys::Hostname;
use Scalar::Util qw(blessed weaken);
-use List::MoreUtils qw{ uniq any };
+use List::MoreUtils qw(uniq any);
+use Time::HiRes qw(sleep);
use DateTime;
use Try::Tiny;
# ABSTRACT: Does the hard work of babysitting Resque::Job's
-use overload
+use overload
'""' => \&_string,
'==' => \&_is_equal,
'eq' => \&_is_equal;
@@ -72,10 +73,10 @@ has verbose => ( is => 'rw', default => sub {0} );
=attr cant_fork
-Set it to a true value to stop this worker from fork jobs.
+Set it to a true value to stop this worker from fork jobs.
-By default, the worker will fork the job out and control the
-children process. This make the worker more resilient to
+By default, the worker will fork the job out and control the
+children process. This make the worker more resilient to
memory leaks.
=cut
@@ -101,6 +102,12 @@ When true, this worker won't proccess more jobs till false.
=cut
has paused => ( is => 'rw', default => sub{0} );
+
+=attr interval
+
+Float representing the polling frequency. The default is 5 seconds, but for a semi-active app you may want to use a smaller value.
+
+=cut
has interval => ( is => 'rw', default => sub{5} );
=method pause
@@ -124,9 +131,9 @@ Schedule this worker for shutdown. Will finish processing the
current job.
=cut
-sub shutdown_please {
+sub shutdown_please {
print "Shutting down...\n";
- $_[0]->shutdown(1);
+ $_[0]->shutdown(1);
}
=method shutdown_now
@@ -153,10 +160,10 @@ sub work {
$self->work_tick($job);
}
elsif( $self->interval ) {
- my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
+ my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
$self->procline( $status );
$self->log( $status );
- sleep $self->interval;
+ sleep( $self->interval );
}
}
$self->unregister_worker;
@@ -251,9 +258,9 @@ Stop listening to the given queue.
sub del_queue {
my ( $self, $queue ) = @_;
return unless $queue;
-
- return
- @{$self->queues}
+
+ return
+ @{$self->queues}
-
@{$self->queues( [ grep {$_} map { $_ eq $queue ? undef : $_ } @{$self->queues} ] )};
}
@@ -290,13 +297,13 @@ sub reserve {
=method working_on
-Set worker and working status on the given L<Resque::Job>.
+Set worker and working status on the given L<Resque::Job>.
=cut
sub working_on {
my ( $self, $job ) = @_;
- $self->redis->set(
- $self->key( worker => $self->id ),
+ $self->redis->set(
+ $self->key( worker => $self->id ),
$self->encoder->encode({
queue => $job->queue,
run_at => DateTime->now->strftime("%Y/%m/%d %H:%M:%S %Z"),
@@ -319,7 +326,7 @@ sub done_working {
=method started
-What time did this worker start?
+What time did this worker start?
Returns an instance of DateTime.
TODO: not working in this release. This is returning
@@ -412,7 +419,7 @@ sub procline {
=method startup
Helper method called by work() to:
-
+
1. register_signal_handlers()
2. prune_dead_workers();
3. register_worker();
@@ -431,7 +438,7 @@ sub startup {
=method register_signal_handlers
Registers the various signal handlers a worker responds to.
-
+
TERM: Shutdown immediately, stop processing jobs.
INT: Shutdown immediately, stop processing jobs.
QUIT: Shutdown after the current job has finished processing.
@@ -455,12 +462,12 @@ sub register_signal_handlers {
Looks for any workers which should be running on this server
and, if they're not, removes them from Redis.
-
+
This is a form of garbage collection. If a server is killed by a
hard shutdown, power failure, or something else beyond our
control, the Resque workers will not die gracefully and therefore
will leave stale state information in Redis.
-
+
By checking the current Redis state against the actual
environment, we can determine if Redis is old and clean it up a bit.
@@ -505,10 +512,10 @@ sub unregister_worker {
if ( %$hr ) {
# Ensure the proper worker is attached to this job, even if
# it's not the precise instance that died.
- my $job = $self->resque->new_job({
+ my $job = $self->resque->new_job({
worker => $self,
- queue => $hr->{queue},
- payload => $hr->{payload}
+ queue => $hr->{queue},
+ payload => $hr->{payload}
});
$job->fail( 'Dirty exit' );
}
@@ -590,7 +597,7 @@ sub find {
my ( $self, $worker_id ) = @_;
if ( $self->exists( $worker_id ) ) {
my @queues = split ',', (split( ':', $worker_id))[-1];
- return __PACKAGE__->new(
+ return __PACKAGE__->new(
resque => $self->resque,
queues => \@queues,
id => $worker_id

0 comments on commit 7dee55e

Please sign in to comment.
Something went wrong with that request. Please try again.