Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

import Gearman 1.10 from CPAN

git-cpan-module:   Gearman
git-cpan-version:  1.10
git-cpan-authorid: DORMANDO
git-cpan-file:     authors/id/D/DO/DORMANDO/Gearman-1.10.tar.gz
  • Loading branch information...
commit edf9e2970e7ffbdabb50dfef83a81dd05f6350ac 1 parent 71a9506
Alan Kasindorf authored schwern committed
39 CHANGES
... ... @@ -1,3 +1,42 @@
  1 +1.10 (2009-10-04)
  2 +
  3 + -- Make workers wake up periodically for a particular server to make sure they aren't
  4 + stale connections. This happened naturally (although at relatively low interval) in
  5 + previous releases.
  6 +
  7 + -- Help prevent leaks when Gearman::Task->add_hook is used.
  8 +
  9 + -- Change worker to only 'wake up' against a gearmand that has woken it up, this prevents
  10 + a constant flood of pre-sleep command from arriving at an otherwise silent gearmand.
  11 +
  12 + -- Fix issue where prefixes were double-prepended on function names on worker
  13 + upon reconnect.
  14 +
  15 + -- Fix issue where the other response parser code in Util would silently truncate
  16 + argument strings longer than 20*the socket buffer size.
  17 +
  18 + -- Fix issue where the ResponseParser class would not correctly handle messages with
  19 + zero-length bodies.
  20 +
  21 + -- Make the Gearman::Task class autoload Storable and fail gracefully when it's not
  22 + loadable.
  23 +
  24 + -- Initial fold-in of exceptions support in the gearman client, makes an option
  25 + to the gearman server to enable it, and is disabled by default. Workers will
  26 + will attempt to throw exceptions anytime Storable is available.
  27 +
  28 + -- fix jobs of > 32kilobytes in size so they work properly (workers would disconnect
  29 + when a job greater than 32kb would arrive.)
  30 +
  31 + -- expose the time that the last job was processed in the stop_if hook of worker.
  32 + Since a jobserver wakes up all workers in the case of a new job to be processed
  33 + the concept of is_idle doesn't actually reflect if a worker has procssed jobs,
  34 + rather it indicates whether the jobserver has been silent for 10 seconds.
  35 +
  36 + -- change server polling order in workers to start at a random point in the list
  37 + during every worker pass. So we drain jobs from all servers rather than
  38 + working on each of them in order.
  39 +
1 40 1.09 (2007-06-29)
2 41
3 42 -- document the license and copyright
3  MANIFEST
@@ -19,7 +19,10 @@ t/20-leaktest.t
19 19 t/30-maxqueue.t
20 20 t/40-prefix.t
21 21 t/50-wait_timeout.t
  22 +t/51-large_args.t
  23 +t/60-stop-if.t
22 24 t/lib/GearTestLib.pm
23 25 t/TestGearman.pm
24 26 t/worker.pl
  27 +t/65-responseparser.t
25 28 TODO
4 META.yml
... ... @@ -1,11 +1,11 @@
1 1 # http://module-build.sourceforge.net/META-spec.html
2 2 #XXXXXXX This is a prototype!!! It will change in the future!!! XXXXX#
3 3 name: Gearman
4   -version: 1.09
  4 +version: 1.10
5 5 version_from: lib/Gearman/Client.pm
6 6 installdirs: site
7 7 requires:
8 8 String::CRC32: 0
9 9
10 10 distribution_type: module
11   -generated_by: ExtUtils::MakeMaker version 6.17
  11 +generated_by: ExtUtils::MakeMaker version 6.30
43 lib/Gearman/Client.pm
... ... @@ -1,11 +1,9 @@
1 1 #!/usr/bin/perl
2 2
3   -#TODO: timeout isn't supported by this client API yet.
4   -
5 3 package Gearman::Client;
6 4
7 5 our $VERSION;
8   -$VERSION = '1.09';
  6 +$VERSION = '1.10';
9 7
10 8 use strict;
11 9 use IO::Socket::INET;
@@ -26,12 +24,16 @@ sub new {
26 24 $self->{sock_cache} = {};
27 25 $self->{hooks} = {};
28 26 $self->{prefix} = '';
  27 + $self->{exceptions} = 0;
29 28
30 29 $self->debug($opts{debug}) if $opts{debug};
31 30
32 31 $self->set_job_servers(@{ $opts{job_servers} })
33 32 if $opts{job_servers};
34 33
  34 + $self->{exceptions} = delete $opts{exceptions}
  35 + if exists $opts{exceptions};
  36 +
35 37 $self->prefix($opts{prefix}) if $opts{prefix};
36 38
37 39 return $self;
@@ -98,7 +100,7 @@ sub do_task {
98 100
99 101 my $ts = $self->new_task_set;
100 102 $ts->add_task($task);
101   - $ts->wait;
  103 + $ts->wait(timeout => $task->timeout);
102 104
103 105 return $did_err ? undef : $ret;
104 106
@@ -113,12 +115,13 @@ sub dispatch_background {
113 115 my ($jst, $jss) = $self->_get_random_js_sock;
114 116 return 0 unless $jss;
115 117
116   - my $req = $task->pack_submit_packet("background");
  118 + my $req = $task->pack_submit_packet($self, "background");
117 119 my $len = length($req);
118 120 my $rv = $jss->write($req, $len);
119 121
120 122 my $err;
121 123 my $res = Gearman::Util::read_res_packet($jss, \$err);
  124 + $self->_put_js_sock($jst, $jss);
122 125 return 0 unless $res && $res->{type} eq "job_created";
123 126 return "$jst//${$res->{blobref}}";
124 127 }
@@ -175,6 +178,28 @@ sub get_status {
175 178 return Gearman::JobStatus->new(@args);
176 179 }
177 180
  181 +sub _option_request {
  182 + my Gearman::Client $self = shift;
  183 + my $sock = shift;
  184 + my $option = shift;
  185 +
  186 + my $req = Gearman::Util::pack_req_command("option_req",
  187 + $option);
  188 + my $len = length($req);
  189 + my $rv = $sock->write($req, $len);
  190 +
  191 + my $err;
  192 + my $res = Gearman::Util::read_res_packet($sock, \$err);
  193 +
  194 + return unless $res;
  195 +
  196 + return 0 if $res->{type} eq "error";
  197 + return 1 if $res->{type} eq "option_res";
  198 +
  199 + warn "Got unknown response to option request: $res->{type}\n";
  200 + return;
  201 +}
  202 +
178 203 # returns a socket from the cache. it should be returned to the
179 204 # cache with _put_js_sock. the hostport isn't verified. the caller
180 205 # should verify that $hostport is in the set of jobservers.
@@ -192,6 +217,14 @@ sub _get_js_sock {
192 217
193 218 setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
194 219 $sock->autoflush(1);
  220 +
  221 + # If exceptions support is to be requested, and the request fails, disable
  222 + # exceptions for this client.
  223 + if ($self->{exceptions} && ! $self->_option_request($sock, 'exceptions')) {
  224 + warn "Exceptions support denied by server, disabling.\n";
  225 + $self->{exceptions} = 0;
  226 + }
  227 +
195 228 return $sock;
196 229 }
197 230
3  lib/Gearman/Objects.pm
@@ -13,6 +13,7 @@ use fields (
13 13 'hooks', # hookname -> coderef
14 14 'prefix',
15 15 'debug',
  16 + 'exceptions',
16 17 );
17 18
18 19 package Gearman::Taskset;
@@ -41,11 +42,13 @@ use fields (
41 42 'uniq',
42 43 'on_complete',
43 44 'on_fail',
  45 + 'on_exception',
44 46 'on_retry',
45 47 'on_status',
46 48 'on_post_hooks', # used internally, when other hooks are done running, prior to cleanup
47 49 'retry_count',
48 50 'timeout',
  51 + 'try_timeout',
49 52 'high_priority',
50 53
51 54 # from server:
5 lib/Gearman/ResponseParser.pm
@@ -88,6 +88,11 @@ sub parse_data {
88 88 $self->reset;
89 89 }
90 90 }
  91 +
  92 + if (defined($self->{pkt}) && length(${ $self->{pkt}{blobref} }) == $self->{pkt}{len}) {
  93 + $self->on_packet($self->{pkt}, $self);
  94 + $self->reset;
  95 + }
91 96 }
92 97
93 98 # don't override:
53 lib/Gearman/Task.pm
@@ -7,6 +7,20 @@ use String::CRC32 ();
7 7 use Gearman::Taskset;
8 8 use Gearman::Util;
9 9
  10 +BEGIN {
  11 + my $storable = eval { require Storable; 1 }
  12 + if !defined &RECEIVE_EXCEPTIONS || RECEIVE_EXCEPTIONS();
  13 +
  14 + $storable ||= 0;
  15 +
  16 + if (defined &RECEIVE_EXCEPTIONS) {
  17 + die "Exceptions support requires Storable: $@";
  18 + } else {
  19 + eval "sub RECEIVE_EXCEPTIONS () { $storable }";
  20 + die "Couldn't define RECEIVE_EXCEPTIONS: $@\n" if $@;
  21 + }
  22 +}
  23 +
10 24 # constructor, given: ($func, $argref, $opts);
11 25 sub new {
12 26 my $class = shift;
@@ -22,8 +36,8 @@ sub new {
22 36
23 37 my $opts = shift || {};
24 38 for my $k (qw( uniq
25   - on_complete on_fail on_retry on_status
26   - retry_count timeout high_priority
  39 + on_complete on_exception on_fail on_retry on_status
  40 + retry_count timeout high_priority try_timeout
27 41 )) {
28 42 $self->{$k} = delete $opts->{$k};
29 43 }
@@ -111,6 +125,7 @@ sub _hashfunc {
111 125
112 126 sub pack_submit_packet {
113 127 my Gearman::Task $task = shift;
  128 + my Gearman::Client $client = shift;
114 129 my $is_background = shift;
115 130
116 131 my $mode = $is_background ?
@@ -121,7 +136,7 @@ sub pack_submit_packet {
121 136
122 137 my $func = $task->{func};
123 138
124   - if (my $prefix = $task->{taskset} && $task->{taskset}->client && $task->{taskset}->client->prefix) {
  139 + if (my $prefix = $client && $client->prefix) {
125 140 $func = join "\t", $prefix, $task->{func};
126 141 }
127 142
@@ -134,6 +149,7 @@ sub pack_submit_packet {
134 149
135 150 sub fail {
136 151 my Gearman::Task $task = shift;
  152 + my $reason = shift;
137 153 return if $task->{is_finished};
138 154
139 155 # try to retry, if we can
@@ -144,7 +160,7 @@ sub fail {
144 160 return $task->{taskset}->add_task($task);
145 161 }
146 162
147   - $task->final_fail;
  163 + $task->final_fail($reason);
148 164 }
149 165
150 166 sub final_fail {
@@ -156,13 +172,24 @@ sub final_fail {
156 172
157 173 $task->run_hook('final_fail', $task);
158 174
159   - $task->{on_fail}->() if $task->{on_fail};
160   - $task->{on_post_hooks}->() if $task->{on_post_hooks};
  175 + $task->{on_fail}->($reason) if $task->{on_fail};
  176 + $task->{on_post_hooks}->() if $task->{on_post_hooks};
161 177 $task->wipe;
162 178
163 179 return undef;
164 180 }
165 181
  182 +sub exception {
  183 + my Gearman::Task $task = shift;
  184 +
  185 + return unless RECEIVE_EXCEPTIONS;
  186 +
  187 + my $exception_ref = shift;
  188 + my $exception = Storable::thaw($$exception_ref);
  189 + $task->{on_exception}->($$exception) if $task->{on_exception};
  190 + return;
  191 +}
  192 +
166 193 sub complete {
167 194 my Gearman::Task $task = shift;
168 195 return if $task->{is_finished};
@@ -202,7 +229,7 @@ sub set_on_post_hooks {
202 229
203 230 sub wipe {
204 231 my Gearman::Task $task = shift;
205   - foreach my $f (qw(on_post_hooks on_complete on_fail on_retry on_status)) {
  232 + foreach my $f (qw(on_post_hooks on_complete on_fail on_retry on_status hooks)) {
206 233 $task->{$f} = undef;
207 234 }
208 235 }
@@ -211,6 +238,12 @@ sub func {
211 238 my Gearman::Task $task = shift;
212 239 return $task->{func};
213 240 }
  241 +
  242 +sub timeout {
  243 + my Gearman::Task $task = shift;
  244 + return $task->{timeout} unless @_;
  245 + return $task->{timeout} = shift;
  246 +}
214 247 1;
215 248 __END__
216 249
@@ -304,6 +337,12 @@ seconds have elapsed without an on_fail or on_complete being
304 337 called. Defaults to 0, which means never. Bypasses any retry_count
305 338 remaining.
306 339
  340 +=item * try_timeout
  341 +
  342 +Automatically fail, calling your on_retry callback (or on_fail if out of
  343 +retries), after this many seconds have elapsed. Defaults to 0, which means
  344 +never.
  345 +
307 346 =back
308 347
309 348 =head2 $task->is_finished
20 lib/Gearman/Taskset.pm
@@ -20,6 +20,7 @@ sub new {
20 20 $self->{client} = $client;
21 21 $self->{loaned_sock} = {};
22 22 $self->{cancelled} = 0;
  23 + $self->{hooks} = {};
23 24
24 25 return $self;
25 26 }
@@ -113,7 +114,7 @@ sub wait {
113 114 my $timeout;
114 115 if (exists $opts{timeout}) {
115 116 $timeout = delete $opts{timeout};
116   - $timeout += Time::HiRes::time();
  117 + $timeout += Time::HiRes::time() if defined $timeout;
117 118 }
118 119
119 120 Carp::carp "Unknown options: " . join(',', keys %opts) . " passed to Taskset->wait."
@@ -196,7 +197,7 @@ sub add_task {
196 197
197 198 $ts->run_hook('add_task', $ts, $task);
198 199
199   - my $req = $task->pack_submit_packet;
  200 + my $req = $task->pack_submit_packet($ts->client);
200 201 my $len = length($req);
201 202 my $rv = $task->{jssock}->syswrite($req, $len);
202 203 die "Wrote $rv but expected to write $len" unless $rv == $len;
@@ -329,6 +330,21 @@ sub _process_packet {
329 330 return 1;
330 331 }
331 332
  333 + if ($res->{type} eq "work_exception") {
  334 + ${ $res->{'blobref'} } =~ s/^(.+?)\0//
  335 + or die "Bogus work_exception from server";
  336 + my $shandle = $1;
  337 + my $task_list = $ts->{waiting}{$shandle} or
  338 + die "Uhhhh: got work_exception for unknown handle: $shandle\n";
  339 +
  340 + my Gearman::Task $task = $task_list->[0] or
  341 + die "Uhhhh: task_list is empty on work_exception for handle $shandle\n";
  342 +
  343 + $task->exception($res->{'blobref'});
  344 +
  345 + return 1;
  346 + }
  347 +
332 348 if ($res->{type} eq "work_status") {
333 349 my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
334 350
22 lib/Gearman/Util.pm
@@ -6,7 +6,7 @@ use strict;
6 6 # O: out of job server
7 7 # W: worker
8 8 # C: client of job server
9   -# J : jobserver
  9 +# J: jobserver
10 10 our %cmd = (
11 11 1 => [ 'I', "can_do" ], # from W: [FUNC]
12 12 23 => [ 'I', "can_do_timeout" ], # from W: FUNC[0]TIMEOUT
@@ -15,6 +15,9 @@ our %cmd = (
15 15 22 => [ 'I', "set_client_id" ], # W->J: [RANDOM_STRING_NO_WHITESPACE]
16 16 4 => [ 'I', "pre_sleep" ], # from W: ---
17 17
  18 + 26 => [ 'I', "option_req" ], # C->J: [OPT]
  19 + 27 => [ 'O', "option_res" ], # J->C: [OPT]
  20 +
18 21 6 => [ 'O', "noop" ], # J->W ---
19 22 7 => [ 'I', "submit_job" ], # C->J FUNC[0]UNIQ[0]ARGS
20 23 21 => [ 'I', "submit_job_high" ], # C->J FUNC[0]UNIQ[0]ARGS
@@ -28,6 +31,7 @@ our %cmd = (
28 31 12 => [ 'IO', "work_status" ], # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
29 32 13 => [ 'IO', "work_complete" ], # W->J/C: HANDLE[0]RES
30 33 14 => [ 'IO', "work_fail" ], # W->J/C: HANDLE
  34 + 25 => [ 'IO', "work_exception" ], # W->J/C: HANDLE[0]EXCEPTION
31 35
32 36 15 => [ 'I', "get_status" ], # C->J: HANDLE
33 37 20 => [ 'O', "status_res" ], # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
@@ -100,8 +104,20 @@ sub read_res_packet {
100 104 return $err->("malformed_magic") unless $magic eq "\0RES";
101 105
102 106 if ($len) {
103   - $rv = sysread($sock, $buf, $len);
104   - return $err->("short_body") unless $rv == $len;
  107 + # Start off trying to read the whole buffer. Store the bits in an array
  108 + # one element for each read, then do a big join at the end. This minimizes
  109 + # the number of memory allocations we have to do.
  110 + my $readlen = $len;
  111 + my $lim = 20 + int( $len / 2**10 );
  112 + my @buffers;
  113 + for (my $i = 0; $readlen > 0 && $i < $lim; $i++) {
  114 + my $rv = sysread($sock, $buffers[$i], $readlen);
  115 + return $err->("short_body") unless $rv > 0;
  116 + last unless $rv > 0;
  117 + $readlen -= $rv;
  118 + }
  119 + $buf = join('', @buffers);
  120 + return $err->("short_body") unless length($buf) == $len;
105 121 }
106 122
107 123 $type = $cmd{$type};
178 lib/Gearman/Worker.pm
@@ -69,14 +69,28 @@ use fields (
69 69 'last_connect_fail', # host:port -> unixtime
70 70 'down_since', # host:port -> unixtime
71 71 'connecting', # host:port -> unixtime connect started at
72   - 'can', # func -> subref
73   - 'timeouts', # func -> timeouts
  72 + 'can', # ability -> subref (ability is func with optional prefix)
  73 + 'timeouts', # ability -> timeouts
74 74 'client_id', # random identifer string, no whitespace
75 75 'parent_pipe', # bool/obj: if we're a child process of a gearman server,
76 76 # this is socket to our parent process. also means parent
77 77 # sock can never disconnect or timeout, etc..
78 78 );
79 79
  80 +BEGIN {
  81 + my $storable = eval { require Storable; 1 }
  82 + if !defined &THROW_EXCEPTIONS || THROW_EXCEPTIONS();
  83 +
  84 + $storable ||= 0;
  85 +
  86 + if (defined &THROW_EXCEPTIONS) {
  87 + die "Exceptions support requires Storable: $@";
  88 + } else {
  89 + eval "sub THROW_EXCEPTIONS () { $storable }";
  90 + die "Couldn't define THROW_EXCEPTIONS: $@\n" if $@;
  91 + }
  92 +}
  93 +
80 94 sub new {
81 95 my ($class, %opts) = @_;
82 96 my $self = $class;
@@ -90,7 +104,7 @@ sub new {
90 104 $self->{can} = {};
91 105 $self->{timeouts} = {};
92 106 $self->{client_id} = join("", map { chr(int(rand(26)) + 97) } (1..30));
93   - $self->{prefix} = '';
  107 + $self->{prefix} = undef;
94 108
95 109 $self->debug($opts{debug}) if $opts{debug};
96 110
@@ -113,6 +127,10 @@ sub new {
113 127 sub _get_js_sock {
114 128 my Gearman::Worker $self = shift;
115 129 my $ipport = shift;
  130 + my %opts = @_;
  131 +
  132 + my $on_connect = delete $opts{on_connect};
  133 + # Someday should warn when called with extra opts.
116 134
117 135 warn "getting job server socket: $ipport" if $self->debug;
118 136
@@ -153,7 +171,7 @@ sub _get_js_sock {
153 171
154 172 $self->{sock_cache}{$ipport} = $sock;
155 173
156   - unless ($self->_on_connect($sock)) {
  174 + unless ($self->_on_connect($sock) && $on_connect && $on_connect->($sock)) {
157 175 delete $self->{sock_cache}{$ipport};
158 176 return undef;
159 177 }
@@ -171,9 +189,9 @@ sub _on_connect {
171 189 return undef unless Gearman::Util::send_req($sock, \$cid_req);
172 190
173 191 # get this socket's state caught-up
174   - foreach my $func (keys %{$self->{can}}) {
175   - my $timeout = $self->{timeouts}->{$func};
176   - unless ($self->_set_ability($sock, $func, $timeout)) {
  192 + foreach my $ability (keys %{$self->{can}}) {
  193 + my $timeout = $self->{timeouts}->{$ability};
  194 + unless ($self->_set_ability($sock, $ability, $timeout)) {
177 195 return undef;
178 196 }
179 197 }
@@ -183,15 +201,13 @@ sub _on_connect {
183 201
184 202 sub _set_ability {
185 203 my Gearman::Worker $self = shift;
186   - my ($sock, $func, $timeout) = @_;
187   -
188   - $func = join "\t", $self->prefix, $func if $self->prefix;
  204 + my ($sock, $ability, $timeout) = @_;
189 205
190 206 my $req;
191 207 if (defined $timeout) {
192   - $req = Gearman::Util::pack_req_command("can_do_timeout", "$func\0$timeout");
  208 + $req = Gearman::Util::pack_req_command("can_do_timeout", "$ability\0$timeout");
193 209 } else {
194   - $req = Gearman::Util::pack_req_command("can_do", $func);
  210 + $req = Gearman::Util::pack_req_command("can_do", $ability);
195 211 }
196 212 return Gearman::Util::send_req($sock, \$req);
197 213 }
@@ -236,14 +252,32 @@ sub work {
236 252
237 253 my $grab_req = Gearman::Util::pack_req_command("grab_job");
238 254 my $presleep_req = Gearman::Util::pack_req_command("pre_sleep");
239   - my %fd_map;
  255 +
  256 + my $last_job_time;
  257 +
  258 + # "Active" job servers are servers that have woken us up and should be
  259 + # queried to see if they have jobs for us to handle. On our first pass
  260 + # in the loop we contact all servers.
  261 + my %active_js = map { $_ => 1 } @{$self->{job_servers}};
  262 +
  263 + # ( js => last_update_time, ... )
  264 + my %last_update_time;
240 265
241 266 while (1) {
  267 + # "Jobby" job servers are the set of server which we will contact
  268 + # on this pass through the loop, because we need to clear and use
  269 + # the "Active" set to plan for our next pass through the loop.
  270 + my @jobby_js = keys %active_js;
242 271
243   - my @jss;
244   - my $need_sleep = 1;
  272 + %active_js = ();
  273 +
  274 + my $js_count = @jobby_js;
  275 + my $js_offset = int(rand($js_count));
  276 + my $is_idle = 0;
245 277
246   - foreach my $js (@{ $self->{job_servers} }) {
  278 + for (my $i = 0; $i < $js_count; $i++) {
  279 + my $js_index = ($i + $js_offset) % $js_count;
  280 + my $js = $jobby_js[$js_index];
247 281 my $jss = $self->_get_js_sock($js)
248 282 or next;
249 283
@@ -259,6 +293,7 @@ sub work {
259 293 exit(0);
260 294 }
261 295 $self->uncache_sock($js, "grab_job_timeout");
  296 + delete $last_update_time{$js};
262 297 next;
263 298 }
264 299
@@ -269,6 +304,7 @@ sub work {
269 304 my $timeout = $self->{parent_pipe} ? 5 : 0.50;
270 305 unless (Gearman::Util::wait_for_readability($jss->fileno, $timeout)) {
271 306 $self->uncache_sock($js, "grab_job_timeout");
  307 + delete $last_update_time{$js};
272 308 next;
273 309 }
274 310
@@ -278,13 +314,17 @@ sub work {
278 314 $res = Gearman::Util::read_res_packet($jss, \$err);
279 315 unless ($res) {
280 316 $self->uncache_sock($js, "read_res_error");
  317 + delete $last_update_time{$js};
281 318 next;
282 319 }
283 320 } while ($res->{type} eq "noop");
284 321
285   - push @jss, [$js, $jss];
286   -
287 322 if ($res->{type} eq "no_job") {
  323 + unless (Gearman::Util::send_req($jss, \$presleep_req)) {
  324 + delete $last_update_time{$js};
  325 + $self->uncache_sock($js, "write_presleep_error");
  326 + }
  327 + $last_update_time{$js} = time;
288 328 next;
289 329 }
290 330
@@ -297,20 +337,28 @@ sub work {
297 337 die $msg;
298 338 }
299 339
300   - $need_sleep = 0;
301   -
302 340 ${ $res->{'blobref'} } =~ s/^(.+?)\0(.+?)\0//
303 341 or die "Uh, regexp on job_assign failed";
304   - my ($handle, $func) = ($1, $2);
305   - my $job = Gearman::Job->new($func, $res->{'blobref'}, $handle, $jss);
  342 + my ($handle, $ability) = ($1, $2);
  343 + my $job = Gearman::Job->new($ability, $res->{'blobref'}, $handle, $jss);
306 344
307 345 my $jobhandle = "$js//" . $job->handle;
308 346 $start_cb->($jobhandle) if $start_cb;
309 347
310   - my $handler = $self->{can}{$func};
  348 + my $handler = $self->{can}{$ability};
311 349 my $ret = eval { $handler->($job); };
312   - my $err = $@ || '';
313   - warn "Job '$func' died: $err" if $err;
  350 + my $err = $@;
  351 + warn "Job '$ability' died: $err" if $err;
  352 +
  353 + $last_update_time{$js} = $last_job_time = time();
  354 +
  355 + if (THROW_EXCEPTIONS && $err) {
  356 + my $exception_req = Gearman::Util::pack_req_command("work_exception", join("\0", $handle, Storable::nfreeze(\$err)));
  357 + unless (Gearman::Util::send_req($jss, \$exception_req)) {
  358 + $self->uncache_sock($js, "write_res_error");
  359 + next;
  360 + }
  361 + }
314 362
315 363 my $work_req;
316 364 if (defined $ret) {
@@ -324,29 +372,57 @@ sub work {
324 372
325 373 unless (Gearman::Util::send_req($jss, \$work_req)) {
326 374 $self->uncache_sock($js, "write_res_error");
  375 + next;
327 376 }
  377 +
  378 + $active_js{$js} = 1;
328 379 }
329 380
330   - my $is_idle = 0;
331   - if ($need_sleep) {
332   - $is_idle = 1;
333   - my $wake_vec = '';
  381 + my @jss;
  382 +
  383 + my $on_connect = sub {
  384 + return Gearman::Util::send_req($_[0], \$presleep_req);
  385 + };
  386 +
  387 + foreach my $js (@{$self->{job_servers}}) {
  388 + my $jss = $self->_get_js_sock($js, on_connect => $on_connect)
  389 + or next;
  390 + push @jss, [$js, $jss];
  391 + }
  392 +
  393 + $is_idle = 1;
  394 + my $wake_vec = '';
  395 +
  396 + foreach my $j (@jss) {
  397 + my ($js, $jss) = @$j;
  398 + my $fd = $jss->fileno;
  399 + vec($wake_vec, $fd, 1) = 1;
  400 + }
  401 +
  402 + my $timeout = keys %active_js ? 0 : (10 + rand(2));
  403 +
  404 +
  405 + # chill for some arbitrary time until we're woken up again
  406 + my $nready = select(my $wout = $wake_vec, undef, undef, $timeout);
  407 +
  408 + if ($nready) {
334 409 foreach my $j (@jss) {
335 410 my ($js, $jss) = @$j;
336   - unless (Gearman::Util::send_req($jss, \$presleep_req)) {
337   - $self->uncache_sock($js, "write_presleep_error");
338   - next;
339   - }
340 411 my $fd = $jss->fileno;
341   - vec($wake_vec, $fd, 1) = 1;
  412 + $active_js{$js} = 1
  413 + if vec($wout, $fd, 1);
342 414 }
343   -
344   - # chill for some arbitrary time until we're woken up again
345   - my $nready = select($wake_vec, undef, undef, 10);
346   - $is_idle = 0 if $nready;
347 415 }
348 416
349   - return if $stop_if->($is_idle);
  417 + $is_idle = 0 if keys %active_js;
  418 +
  419 + return if $stop_if->($is_idle, $last_job_time);
  420 +
  421 + my $update_since = time - (15 + rand 60);
  422 +
  423 + while (my ($js, $last_update) = each %last_update_time) {
  424 + $active_js{$js} = 1 if $last_update < $update_since;
  425 + }
350 426 }
351 427
352 428 }
@@ -357,18 +433,32 @@ sub register_function {
357 433 my $timeout = shift unless (ref $_[0] eq 'CODE');
358 434 my $subref = shift;
359 435
360   - $func = join "\t", $self->prefix, $func if $self->prefix;
  436 + my $prefix = $self->prefix;
  437 + my $ability = defined($prefix) ? "$prefix\t$func" : "$func";
361 438
362 439 my $req;
363 440 if (defined $timeout) {
364   - $req = Gearman::Util::pack_req_command("can_do_timeout", "$func\0$timeout");
365   - $self->{timeouts}{$func} = $timeout;
  441 + $req = Gearman::Util::pack_req_command("can_do_timeout", "$ability\0$timeout");
  442 + $self->{timeouts}{$ability} = $timeout;
366 443 } else {
367   - $req = Gearman::Util::pack_req_command("can_do", $func);
  444 + $req = Gearman::Util::pack_req_command("can_do", $ability);
368 445 }
369 446
370 447 $self->_register_all($req);
371   - $self->{can}{$func} = $subref;
  448 + $self->{can}{$ability} = $subref;
  449 +}
  450 +
  451 +sub unregister_function {
  452 + my Gearman::Worker $self = shift;
  453 + my $func = shift;
  454 +
  455 + my $prefix = $self->prefix;
  456 + my $ability = defined($prefix) ? "$prefix\t$func" : "$func";
  457 +
  458 + my $req = Gearman::Util::pack_req_command("cant_do", $ability);
  459 +
  460 + $self->_register_all($req);
  461 + delete $self->{can}{$ability};
372 462 }
373 463
374 464 sub _register_all {
18 t/10-all.t
@@ -8,7 +8,7 @@ use lib 't';
8 8 use TestGearman;
9 9
10 10 if (start_server(PORT)) {
11   - plan tests => 32;
  11 + plan tests => 33;
12 12 } else {
13 13 plan skip_all => "Can't find server to test with";
14 14 exit 0;
@@ -31,7 +31,7 @@ for (0..($NUM_SERVERS-1)) {
31 31 start_worker(PORT, $NUM_SERVERS);
32 32 start_worker(PORT, $NUM_SERVERS);
33 33
34   -my $client = Gearman::Client->new;
  34 +my $client = Gearman::Client->new(exceptions => 1);
35 35 isa_ok($client, 'Gearman::Client');
36 36 $client->job_servers(map { '127.0.0.1:' . (PORT + $_) } 0..$NUM_SERVERS);
37 37
@@ -70,6 +70,16 @@ is($sums[1], 4, 'Second task completed (sum is 4)');
70 70 ## Test some failure conditions:
71 71 ## Normal failure (worker returns undef or dies within eval).
72 72 is($client->do_task('fail'), undef, 'Job that failed naturally returned undef');
  73 +
  74 +## the die message is available in the on_fail sub
  75 +my $msg = undef;
  76 +$tasks = $client->new_task_set;
  77 +$tasks->add_task('fail_die', undef, {
  78 + on_exception => sub { $msg = shift },
  79 +});
  80 +$tasks->wait;
  81 +like($msg, qr/test reason/, 'the die message is available in the on_fail sub');
  82 +
73 83 ## Worker process exits.
74 84 is($client->do_task('fail_exit'), undef,
75 85 'Job that failed via exit returned undef');
@@ -222,7 +232,3 @@ do {
222 232 $status = $client->get_status($handle);
223 233 } until $status->percent == 1;
224 234
225   -
226   -
227   -
228   -
4 t/30-maxqueue.t
@@ -54,8 +54,8 @@ foreach my $iter (1..5) {
54 54 }
55 55 $tasks->wait;
56 56
57   -is($completed, 2, 'number of success'); # One starts immediately and on the queue
58   -is($failed, 3, 'number of failure'); # All the rest
  57 +ok($completed == 2 || $completed == 1, 'number of success'); # One in the queue, plus one that may start immediately
  58 +ok($failed == 3 || $failed== 4, 'number of failure'); # All the rest
59 59
60 60
61 61
18 t/40-prefix.t
@@ -4,6 +4,7 @@ use strict;
4 4 use Gearman::Client;
5 5 use Storable qw( freeze );
6 6 use Test::More;
  7 +use Time::HiRes 'sleep';
7 8
8 9 use lib 't';
9 10 use TestGearman;
@@ -11,7 +12,7 @@ use TestGearman;
11 12
12 13
13 14 if (start_server(PORT)) {
14   - plan tests => 8;
  15 + plan tests => 9;
15 16 } else {
16 17 plan skip_all => "Can't find server to test with";
17 18 exit 0;
@@ -63,4 +64,17 @@ for my $k (sort keys %tasks) {
63 64 is($out{$k}, "$k from prefix_$k", "taskset from client_$k");
64 65 }
65 66
66   -
  67 +## dispatch_background tasks also support prefixing
  68 +my $bg_task = Gearman::Task->new('echo_sleep', \('sleep prefix test'));
  69 +my $handle = $client_a->dispatch_background($bg_task);
  70 +
  71 +## wait for the task to be done
  72 +my $status;
  73 +my $n = 0;
  74 +do {
  75 + sleep 0.1;
  76 + $n++;
  77 + diag "still waiting..." if $n == 12;
  78 + $status = $client_a->get_status($handle);
  79 +} until $status->percent == 1 or $n == 20;
  80 +is $status->percent, 1, "Background task completed using prefix";
55 t/51-large_args.t
... ... @@ -0,0 +1,55 @@
  1 +#!/usr/bin/perl
  2 +
  3 +use strict;
  4 +use Gearman::Client;
  5 +use Storable qw( freeze );
  6 +use Test::More;
  7 +use Time::HiRes qw(time);
  8 +
  9 +use lib 't';
  10 +use TestGearman;
  11 +
  12 +# This is testing the MAXQUEUE feature of gearmand. There's no direct
  13 +# support for it in Gearman::Worker yet, so we connect directly to
  14 +# gearmand to configure it for the test.
  15 +
  16 +if (start_server(PORT)) {
  17 + plan tests => 3;
  18 +} else {
  19 + plan skip_all => "Can't find server to test with";
  20 + exit 0;
  21 +}
  22 +
  23 +wait_for_port(PORT);
  24 +
  25 +start_worker(PORT);
  26 +
  27 +my $client = Gearman::Client->new;
  28 +isa_ok($client, 'Gearman::Client');
  29 +
  30 +$client->job_servers('127.0.0.1:' . PORT);
  31 +
  32 +my $tasks = $client->new_task_set;
  33 +isa_ok($tasks, 'Gearman::Taskset');
  34 +
  35 +my $arg = "x" x ( 5 * 1024 * 1024 );
  36 +
  37 +$tasks->add_task('long', \$arg, {
  38 + on_complete => sub {
  39 + my $rr = shift;
  40 + if (length($$rr) != length($arg)) {
  41 + fail("Large job failed size check: got ".length($$rr).", want ".length($arg));
  42 + } elsif ($$rr ne $arg) {
  43 + fail("Large job failed content check");
  44 + } else {
  45 + pass("Large job succeeded");
  46 + }
  47 + },
  48 + on_fail => sub {
  49 + fail("Large job failed");
  50 + },
  51 +});
  52 +
  53 +$tasks->wait(timeout => 10);
  54 +
  55 +# vim: filetype=perl
84 t/60-stop-if.t
... ... @@ -0,0 +1,84 @@
  1 +#!/usr/bin/perl
  2 +
  3 +use strict;
  4 +use Gearman::Client;
  5 +use Storable qw(thaw);
  6 +use Test::More;
  7 +
  8 +use lib 't';
  9 +use TestGearman;
  10 +
  11 +if (start_server(PORT)) {
  12 + plan tests => 12;
  13 +} else {
  14 + plan skip_all => "Can't find server to test with";
  15 + exit 0;
  16 +}
  17 +
  18 +wait_for_port(PORT);
  19 +
  20 +start_worker(PORT);
  21 +
  22 +my $client = Gearman::Client->new;
  23 +isa_ok($client, 'Gearman::Client');
  24 +
  25 +$client->job_servers('127.0.0.1:' . PORT);
  26 +
  27 +{
  28 + # If we start up too fast, then the worker hasn't gone 'idle' yet.
  29 + sleep 1;
  30 +
  31 + my $result = $client->do_task('check_stop_if');
  32 +
  33 + my ($is_idle, $last_job_time) = @{thaw($$result)};
  34 +
  35 + is($is_idle, 0, "We shouldn't be idle yet");
  36 + is($last_job_time, undef, "No job should have been processed yet");
  37 +}
  38 +
  39 +{
  40 + my $result = $client->do_task('check_stop_if');
  41 +
  42 + my ($is_idle, $last_job_time) = @{thaw($$result)};
  43 +
  44 + is($is_idle, 0, "We still shouldn't be idle yet");
  45 + isnt($last_job_time, undef, "We should have processed a job now");
  46 +
  47 + my $time_diff = time() - $last_job_time;
  48 +
  49 + # On a really slow system this test could fail, maybe.
  50 + ok($time_diff < 3, "That last job should have been within the last 3 seconds");
  51 +}
  52 +
  53 +diag "Sleeping for 5 seconds";
  54 +sleep 5;
  55 +
  56 +{
  57 + my $result = $client->do_task('check_stop_if');
  58 +
  59 + my ($is_idle, $last_job_time) = @{thaw($$result)};
  60 +
  61 + is($is_idle, 0, "We still shouldn't be idle yet");
  62 + isnt($last_job_time, undef, "We should have processed a job now");
  63 +
  64 + my $time_diff = time() - $last_job_time;
  65 +
  66 + # On a really slow system this test could fail, maybe.
  67 + ok($time_diff > 3, "That last job should have been more than 3 seconds ago");
  68 + ok($time_diff < 8, "That last job should have been less than 8 seconds ago");
  69 +}
  70 +
  71 +$client->do_task('work_exit');
  72 +
  73 +sleep 2; # make sure the worker has time to shut down and isn't still in the 'run' loop
  74 +
  75 +{
  76 + my $result = $client->do_task('check_stop_if');
  77 +
  78 + my ($is_idle, $last_job_time) = @{thaw($$result)};
  79 +
  80 + is($is_idle, 0, "We shouldn't be idle yet");
  81 + is($last_job_time, undef, "No job should have been processed yet");
  82 +}
  83 +
  84 +# vim: filetype=perl
100 t/65-responseparser.t
... ... @@ -0,0 +1,100 @@
  1 +use strict;
  2 +use warnings;
  3 +use Test::More tests => 9;
  4 +use Gearman::Client;
  5 +
  6 +our $last_packet = undef;
  7 +our @packets;
  8 +
  9 +my $parser = Gearman::ResponseParser::Test->new();
  10 +
  11 +test_packet("\0RES\0\0\0\x0a\0\0\0\x01!", {
  12 + len => 1,
  13 + blobref => \"!", #"
  14 + type => 'no_job',
  15 +});
  16 +
  17 +test_packet("\0RES\0\0\0\x0a\0\0\0\0", {
  18 + len => 0,
  19 + blobref => \"", #"
  20 + type => 'no_job',
  21 +});
  22 +
  23 +## multiple packets
  24 +my $pkt = "\0RES\0\0\0\x0a\0\0\0\0";
  25 +test_multi_packet("$pkt$pkt", {
  26 + len => 0,
  27 + blobref => \"", #"
  28 + type => 'no_job',
  29 +}, {
  30 + len => 0,
  31 + blobref => \"", #"
  32 + type => 'no_job',
  33 +});
  34 +
  35 +# Message split into two packets
  36 +test_packet("\0RE", undef);
  37 +test_packet("S\0\0\0\x0a\0\0\0\0", {
  38 + len => 0,
  39 + blobref => \"", #"
  40 + type => 'no_job',
  41 +});
  42 +
  43 +# Message with payload split into two packets
  44 +test_packet("\0RES\0\0\0\x0a\0\0\0\x02a", undef);
  45 +test_packet("b", {
  46 + len => 2,
  47 + blobref => \"ab", #"
  48 + type => 'no_job',
  49 +});
  50 +
  51 +# Two packets, with the first containing a full message
  52 +# and a partial message, and the second containing the
  53 +# remainder of the partial message.
  54 +test_packet("\0RES\0\0\0\x0a\0\0\0\x02ab\0RES\0\0\0\x0a\0\0\0\x02b", {
  55 + len => 2,
  56 + blobref => \"ab", #"
  57 + type => 'no_job',
  58 +});
  59 +test_packet("a", {
  60 + len => 2,
  61 + blobref => \"ba", #"
  62 + type => 'no_job',
  63 +});
  64 +
  65 +sub test_packet {
  66 + my ($data, $expected) = @_;
  67 +
  68 + my $test_name = "Parsing ".enc($data);
  69 +
  70 + $last_packet = undef;
  71 + $parser->parse_data(\$data);
  72 + is_deeply($last_packet, $expected, $test_name);
  73 +}
  74 +
  75 +sub test_multi_packet {
  76 + my ($data, @expected) = @_;
  77 +
  78 + my $test_name = "Parsing ".enc($data);
  79 +
  80 + @packets = ();
  81 + $parser->parse_data(\$data);
  82 +
  83 + is_deeply(\@packets, \@expected, $test_name);
  84 +}
  85 +
  86 +sub enc {
  87 + my $data = $_[0];
  88 + $data =~ s/([\W])/"%" . uc(sprintf("%2.2x",ord($1)))/eg;
  89 + return $data;
  90 +}
  91 +
  92 +package Gearman::ResponseParser::Test;
  93 +
  94 +use Gearman::ResponseParser;
  95 +use base qw(Gearman::ResponseParser);
  96 +
  97 +sub on_packet {
  98 + $main::last_packet = $_[1];
  99 + push @main::packets, $_[1];
  100 +}
38 t/worker.pl
@@ -3,7 +3,7 @@
3 3
4 4 use lib 'lib';
5 5 use Gearman::Worker;
6   -use Storable qw( thaw );
  6 +use Storable qw(thaw nfreeze);
7 7 use Getopt::Long qw( GetOptions );
8 8
9 9 GetOptions(
@@ -25,6 +25,7 @@
25 25 });
26 26
27 27 $worker->register_function(fail => sub { undef });
  28 +$worker->register_function(fail_die => sub { die 'test reason' });
28 29 $worker->register_function(fail_exit => sub { exit 255 });
29 30
30 31 $worker->register_function(sleep => sub { sleep $_[0]->arg });
@@ -43,6 +44,13 @@
43 44 join " from ", $_[0]->arg, $prefix;
44 45 });
45 46
  47 +$worker->register_function(echo_sleep => sub {
  48 + my($job) = @_;
  49 + $job->set_status(1, 1);
  50 + sleep 2; ## allow some time to read the status
  51 + join " from ", $_[0]->arg, $prefix;
  52 +});
  53 +
46 54
47 55 $worker->register_function(long => sub {
48 56 my($job) = @_;
@@ -50,9 +58,35 @@
50 58 sleep 2;
51 59 $job->set_status(100, 100);
52 60 sleep 2;
  61 + return $job->arg;
53 62 });
54 63
55 64 my $nsig;
56 65 $nsig = kill 'USR1', $notifypid if $notifypid;
57 66
58   -$worker->work while 1;
  67 +my $work_exit = 0;
  68 +
  69 +$worker->register_function(work_exit => sub {
  70 + $work_exit = 1;
  71 +});
  72 +
  73 +my ($is_idle, $last_job_time);
  74 +
  75 +$worker->register_function(check_stop_if => sub {
  76 + return nfreeze([$is_idle, $last_job_time]);
  77 +});
  78 +
  79 +
  80 +
  81 +my $stop_if = sub {
  82 + ($is_idle, $last_job_time) = @_;
  83 +
  84 + if ($work_exit) {
  85 + $work_exit = 0;
  86 + return 1;
  87 + }
  88 +
  89 + return 0;
  90 +};
  91 +
  92 +$worker->work(stop_if => $stop_if) while (1);

0 comments on commit edf9e29

Please sign in to comment.
Something went wrong with that request. Please try again.