Skip to content

Commit

Permalink
Add --wakeup and --wakeup-delay options
Browse files Browse the repository at this point in the history
git-svn-id: http://code.sixapart.com/svn/gearman/trunk@434 011c6a6d-750f-0410-a5f6-93fdcd050bc4
  • Loading branch information
hachi committed Apr 13, 2009
1 parent e38f63f commit ba54bfb
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
11 changes: 10 additions & 1 deletion CHANGES
Original file line number Original file line Diff line number Diff line change
@@ -1,5 +1,14 @@
* Add command-line option to adjust a delay before more workers are started up.
This acts as an anti-starvation mechanism in case of lower wake up counts.
-Default is .1 seconds, formerly this option was not needed because all workers
were woken up at the time of job submission.

* Add command-line option to adjust number of workers to wake up per job injected.
-Default is 3, formerly was -1 (wake up all as fast as possible)

* Add command-line option to change the number of sockets accepted at once per * Add command-line option to change the number of sockets accepted at once per
listener socket. Default is now 10, formerly used to be 1. listener socket.
-Default is now 10, formerly used to be 1.


* Add exceptions passing support to gearman server classes, using new options * Add exceptions passing support to gearman server classes, using new options
support. support.
Expand Down
31 changes: 30 additions & 1 deletion gearmand
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -40,6 +40,28 @@ Enable debugging (currently the only debug output is when a client or worker con
Number of new connections to accept each time we see a listening socket ready. This doesn't usually Number of new connections to accept each time we see a listening socket ready. This doesn't usually
need to be tuned by anyone, however in dire circumstances you may need to do it quickly. need to be tuned by anyone, however in dire circumstances you may need to do it quickly.
=item --wakeup=3
Number of workers to wake up per job inserted into the queue.
Zero (0) is a perfectly acceptable answer, and can be used if you don't care much about job latency.
This would bank on the base idea of a worker checking in with the server every so often.
Negative One (-1) indicates that all sleeping workers should be woken up.
All other negative numbers will cause the server to throw exception and not start.
=item --wakeup-delay=
Time interval before waking up more workers (the value specified by --wakeup) when jobs are still in
the queue.
Zero (0) means go as fast as possible, but not all at the same time. Similar to -1 on --wakeup, but
is more cooperative in gearmand's multitasking model.
Negative One (-1) means that this event won't happe, so only the initial workers will be woken up to
handle jobs in the queue.
=back =back
=head1 COPYRIGHT =head1 COPYRIGHT
Expand Down Expand Up @@ -97,6 +119,8 @@ my (
$notify_pid, $notify_pid,
$opt_pidfile, $opt_pidfile,
$accept, $accept,
$wakeup,
$wakeup_delay,
); );
my $conf_port = 7003; my $conf_port = 7003;


Expand All @@ -106,6 +130,8 @@ Getopt::Long::GetOptions(
'debug=i' => \$DEBUG, 'debug=i' => \$DEBUG,
'pidfile=s' => \$opt_pidfile, 'pidfile=s' => \$opt_pidfile,
'accept=i' => \$accept, 'accept=i' => \$accept,
'wakeup=i' => \$wakeup,
'wakeup-delay=f' => \$wakeup_delay,
'notifypid|n=i' => \$notify_pid, # for test suite only. 'notifypid|n=i' => \$notify_pid, # for test suite only.
); );


Expand All @@ -117,7 +143,10 @@ our $graceful_shutdown = 0;


$SIG{'PIPE'} = "IGNORE"; # handled manually $SIG{'PIPE'} = "IGNORE"; # handled manually


my $server = Gearman::Server->new; my $server = Gearman::Server->new(
wakeup => $wakeup,
wakeup_delay => $wakeup_delay,
);
my $ssock = $server->create_listening_sock($conf_port, accept_per_loop => $accept); my $ssock = $server->create_listening_sock($conf_port, accept_per_loop => $accept);


if ($opt_pidfile) { if ($opt_pidfile) {
Expand Down
55 changes: 52 additions & 3 deletions lib/Gearman/Server.pm
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ use fields (
'handle_ct', # atomic counter 'handle_ct', # atomic counter
'handle_base', # atomic counter 'handle_base', # atomic counter
'listeners', # arrayref of listener objects 'listeners', # arrayref of listener objects
'wakeup', # number of workers to wake
'wakeup_delay', # seconds to wait before waking more workers
'wakeup_timers', # func -> timer, timer to be canceled or adjusted when job grab/inject is called
); );


our $VERSION = "1.09"; our $VERSION = "1.09";
Expand Down Expand Up @@ -79,11 +82,31 @@ sub new {
$self->{max_queue} = {}; $self->{max_queue} = {};
$self->{job_of_uniq} = {}; $self->{job_of_uniq} = {};
$self->{listeners} = []; $self->{listeners} = [];
$self->{wakeup} = 3;
$self->{wakeup_delay} = .1;
$self->{wakeup_timers} = {};


$self->{handle_ct} = 0; $self->{handle_ct} = 0;
$self->{handle_base} = "H:" . Sys::Hostname::hostname() . ":"; $self->{handle_base} = "H:" . Sys::Hostname::hostname() . ":";


my $port = delete $opts{port}; my $port = delete $opts{port};

my $wakeup = delete $opts{wakeup};

if (defined $wakeup) {
die "Invalid value passed in wakeup option"
if $wakeup < 0 && $wakeup != -1;
$self->{wakeup} = $wakeup;
}

my $wakeup_delay = delete $opts{wakeup_delay};

if (defined $wakeup_delay) {
die "Invalid value passed in wakeup_delay option"
if $wakeup_delay < 0 && $wakeup_delay != -1;
$self->{wakeup_delay} = $wakeup_delay;
}

croak("Unknown options") if %opts; croak("Unknown options") if %opts;
$self->create_listening_sock($port); $self->create_listening_sock($port);


Expand Down Expand Up @@ -247,20 +270,46 @@ sub enqueue_job {


sub wake_up_sleepers { sub wake_up_sleepers {
my ($self, $func) = @_; my ($self, $func) = @_;

if (my $existing_timer = delete($self->{wakeup_timers}->{$func})) {
$existing_timer->cancel();
}

return unless $self->_wake_up_some($func);

my $delay = $self->{wakeup_delay};

# -1 means don't setup a timer. 0 actually means go as fast as we can, cooperatively.
return if $delay == -1;

# If we're only going to wakeup 0 workers anyways, don't set up a timer.
return if $self->{wakeup} == 0;

my $timer = Danga::Socket->AddTimer($delay, sub { $self->wake_up_sleepers($func) });
$self->{wakeup_timers}->{$func} = $timer;
}

# Returns true when there are still more workers to wake up
# False if there are no sleepers
sub _wake_up_some {
my ($self, $func) = @_;
my $sleepmap = $self->{sleepers}{$func} or return; my $sleepmap = $self->{sleepers}{$func} or return;
my $sleeporder = $self->{sleepers_list}{$func} or return; my $sleeporder = $self->{sleepers_list}{$func} or return;


# TODO SYNC UP STATE HERE IN CASE TWO LISTS END UP OUT OF SYNC # TODO SYNC UP STATE HERE IN CASE TWO LISTS END UP OUT OF SYNC


my $max = 3; my $max = $self->{wakeup};


while (@$sleeporder) { while (@$sleeporder) {
my Gearman::Server::Client $c = shift @$sleeporder; my Gearman::Server::Client $c = shift @$sleeporder;
delete $sleepmap->{"$c"};
next if $c->{closed} || ! $c->{sleeping}; next if $c->{closed} || ! $c->{sleeping};
if ($max-- <= 0) {
unshift @$sleeporder, $c;
return 1;
}
delete $sleepmap->{"$c"};
$c->res_packet("noop"); $c->res_packet("noop");
$c->{sleeping} = 0; $c->{sleeping} = 0;
return if $max-- <= 0;
} }


delete $self->{sleepers}{$func}; delete $self->{sleepers}{$func};
Expand Down

0 comments on commit ba54bfb

Please sign in to comment.