Skip to content

Commit

Permalink
Change listening socket to be a real Danga::Socket subclass, this allows
Browse files Browse the repository at this point in the history
pausing for a period of time when we run into accept errors. This will
fix the problem of gearmand spinning 100% cpu in those cases.

Make gearmand a little more vocal about socket accept errors as well.

git-svn-id: http://code.sixapart.com/svn/gearman/trunk@340 011c6a6d-750f-0410-a5f6-93fdcd050bc4
  • Loading branch information
hachi committed Oct 14, 2007
1 parent 2b816c2 commit 6184ac6
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGES
@@ -1,3 +1,9 @@
* Change listening socket to be a real Danga::Socket subclass, this allows
pausing for a period of time when we run into accept errors. This will
fix the problem of gearmand spinning 100% cpu in those cases.

* Make gearmand a little more vocal about socket accept errors.

* add fast read concept to server reading from client codepath. This drastically
improves performance of jobs over 1k in size.

Expand Down
27 changes: 7 additions & 20 deletions lib/Gearman/Server.pm
Expand Up @@ -25,8 +25,9 @@ script, and not use Gearman::Server directly.

use strict;
use Gearman::Server::Client;
use Gearman::Server::Listener;
use Gearman::Server::Job;
use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET SOCK_STREAM AF_UNIX SOCK_STREAM PF_UNSPEC);
use Socket qw(IPPROTO_TCP SOL_SOCKET SOCK_STREAM AF_UNIX SOCK_STREAM PF_UNSPEC);
use Carp qw(croak);
use Sys::Hostname ();
use IO::Handle ();
Expand All @@ -40,6 +41,7 @@ use fields (
'job_of_uniq', # func -> uniq -> Job
'handle_ct', # atomic counter
'handle_base', # atomic counter
'listeners', # arrayref of listener objects
);

our $VERSION = "1.09";
Expand Down Expand Up @@ -74,6 +76,7 @@ sub new {
$self->{job_of_handle} = {};
$self->{max_queue} = {};
$self->{job_of_uniq} = {};
$self->{listeners} = [];

$self->{handle_ct} = 0;
$self->{handle_base} = "H:" . Sys::Hostname::hostname() . ":";
Expand Down Expand Up @@ -108,26 +111,10 @@ sub create_listening_sock {
Listen => 10 )
or die "Error creating socket: $@\n";

$self->setup_listening_sock($ssock);
return $ssock;
}

sub setup_listening_sock {
my ($self, $ssock) = @_;

# make sure provided listening socket is non-blocking
IO::Handle::blocking($ssock, 0);
Danga::Socket->AddOtherFds(fileno($ssock) => sub {
my $csock = $ssock->accept
or return;
my $listeners = $self->{listeners};
push @$listeners, Gearman::Server::Listener->new($ssock, $self);

$self->debug(sprintf("Listen child making a Client for %d.", fileno($csock)));

IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;

$self->new_client($csock);
});
return $ssock;
}

sub new_client {
Expand Down
57 changes: 57 additions & 0 deletions lib/Gearman/Server/Listener.pm
@@ -0,0 +1,57 @@
package Gearman::Server::Listener;

use strict;
use base 'Danga::Socket';
use fields qw(server);

use Errno qw(EAGAIN);
use Socket qw(IPPROTO_TCP TCP_NODELAY);

sub new {
my Gearman::Server::Listener $self = shift;
my $sock = shift;
my $server = shift;

$self = fields::new($self) unless ref $self;

# make sure provided listening socket is non-blocking
IO::Handle::blocking($sock, 0);

$self->SUPER::new($sock);

$self->{server} = $server;
$self->watch_read(1);

return $self;
}

sub event_read {
my Gearman::Server::Listener $self = shift;

my $listen_sock = $self->sock;

local $!;

if (my $csock = $listen_sock->accept) {
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;

my $server = $self->{server};

$server->debug(sprintf("Listen child making a Client for %d.", fileno($csock)));
$server->new_client($csock);
return;
}

return if $! == EAGAIN;

warn "Error accepting incoming connection: $!\n";

$self->watch_read(0);

Danga::Socket->AddTimer( .1, sub {
$self->watch_read(1);
});
}

1;

0 comments on commit 6184ac6

Please sign in to comment.