Skip to content

Commit

Permalink
Streaming from filehandles and poll_cb
Browse files Browse the repository at this point in the history
  • Loading branch information
frodwith committed Oct 21, 2009
1 parent f872f37 commit bcd08d9
Showing 1 changed file with 162 additions and 94 deletions.
256 changes: 162 additions & 94 deletions lib/Plack/Server/POE.pm
Expand Up @@ -20,113 +20,181 @@ sub new {
my $class = shift;
my $opt = ref $_[0] eq 'HASH' ? shift : { @_ };
$opt->{port} ||= 8080,
$opt->{host} ||= 'localhost',
$opt->{host} ||= '0.0.0.0',

return bless $opt, $class;
}

sub on_client_input {
my ($self, $heap, $req) = @_;
my $client = $heap->{client};

unless ($req->isa('HTTP::Request')) {
$client->put($req->as_string);
$poe_kernel->yield('shutdown');
return;
}

my $version = $req->header('X-HTTP-Version') || '0.9';
my $protocol = "HTTP/$version";

my $env = req_to_psgi($req,
SERVER_NAME => $self->{host},
SERVER_PORT => $self->{port},
SERVER_PROTOCOL => $protocol,
'psgi.streaming' => Plack::Util::TRUE,
'psgi.nonblocking' => Plack::Util::TRUE,
'psgi.runonce' => Plack::Util::FALSE,
);

my $connection = $req->header('Connection') || '';
my $keep_alive = $version eq '1.1' && $connection ne 'close';

my $write = sub { $client->put($_[0]) };
my $close = sub {
delete $heap->{client_flush};
$poe_kernel->yield('shutdown') unless $keep_alive;
return;
};

my $write_chunked = sub {
my $chunk = shift;
my $len = sprintf "%X", do { use bytes; length($chunk) };
$write->("$len\r\n$chunk\r\n");
};

my $close_chunked = sub {
$write->("0\r\n\r\n");
$close->();
};

my $start_response = sub {
my ($code, $headers, $body) = @{+shift};
my ($explicit_length, $chunked);
my $message = status_message($code);
$write->("$protocol $code $message\r\n");

while (@$headers) {
my $k = shift(@$headers);
my $v = shift(@$headers);
if ($k eq 'Connection' && $v eq 'close') {
$keep_alive = 0;
}
elsif ($k eq 'Content-Length') {
$explicit_length = 1;
}
$write->("$k: $v\r\n");
}

my $no_body_allowed = ($req->method =~ /^head$/i)
|| ($code < 200)
|| ($code == 204)
|| ($code == 304);

if ($no_body_allowed) {
$write->("\r\n");
return;
}

$chunked = ($keep_alive && !$explicit_length);
$write->("Transfer-Encoding: chunked\r\n") if $chunked;

$write->("\r\n");

my $w = $chunked ? $write_chunked : $write;
my $c = $chunked ? $close_chunked : $close;

if ($body) {
if (Plack::Util::is_real_fh($body)) {
my ($wheel, $buffer);
my $flusher = sub {
return unless $buffer;
$w->($buffer);
$buffer = '';
$wheel->resume_input() if $wheel;
};
$heap->{client_flush} = $flusher;
POE::Session->create(
inline_states => {
_start => sub {
$wheel = POE::Wheel::ReadWrite->new(
Handle => $body,
Filter => POE::Filter::Stream->new,
InputEvent => 'got_input',
ErrorEvent => 'got_error',
);
},
got_error => sub {
my ($op, $errno, $errstr, $id) = @_[ARG0..ARG3];
if ($op eq 'read') {
delete $_[HEAP]->{wheels}->{$id};
$wheel = undef;
$body->close();
$c->();
}
},
got_input => sub {
my $data = $_[ARG0];
my $already_flushed = !$buffer;
$buffer .= $data;
if ($already_flushed) {
$flusher->();
}
else {
my $len = do { use bytes; length($buffer) };
$wheel->pause_input if $len > 1024;
}
}
}
);
}
else {
Plack::Util::foreach($body, $w);
$c->();
}
return;
}

return Plack::Util::inline_object(
write => $w,
close => $c,
poll_cb => sub {
my $get = shift;
($heap->{client_flush} = sub {
my $chunk = $get->();
$w->($chunk) if defined $chunk;
})->();
},
);
};

my $response = Plack::Util::run_app($self->{app}, $env);

if (ref $response eq 'CODE') {
$response->($start_response);
}
else {
$start_response->($response);
}
}

sub register_service {
my ($self, $app) = @_;
$self->{app} = $app;

my $filter = POE::Filter::HTTP::Parser->new( type => 'server' );
print STDERR "Listening on $self->{host}:$self->{port}\n";
POE::Component::Server::TCP->new(
Port => $self->{port},
Address => $self->{host},
ClientInput => sub {
$self->on_client_input(@_[HEAP, ARG0]);
},
ClientInputFilter => $filter,
ClientOutputFilter => 'POE::Filter::Stream',
ClientInput => sub {
my ($kernel, $heap, $req) = @_[KERNEL, HEAP, ARG0];
my $client = $heap->{client};

unless ($req->isa('HTTP::Request')) {
$client->put($req->as_string);
$poe_kernel->yield('shutdown');
return;
}

my $version = $req->header('X-HTTP-Version') || '0.9';
my $protocol = "HTTP/$version";

my $env = req_to_psgi($req,
SERVER_NAME => $self->{host},
SERVER_PORT => $self->{port},
'psgi.streaming' => Plack::Util::TRUE,
'psgi.nonblocking' => Plack::Util::TRUE,
'psgi.runonce' => Plack::Util::FALSE,
);

my $connection = $req->header('Connection') || '';
my $keep_alive = $version eq '1.1' && $connection ne 'close';

my $write = sub { $client->put($_[0]) };
my $close = sub {
$poe_kernel->yield('shutdown') unless $keep_alive;
};

my $write_chunked = sub {
my $chunk = shift;
my $len = sprintf "%X", do { use bytes; length($chunk) };
$write->("$len\r\n$chunk\r\n");
};

my $close_chunked = sub {
$write->("0\r\n\r\n");
$close->();
};

my $start_response = sub {
my ($code, $headers, $body) = @{+shift};
my ($explicit_length, $chunked);
my $message = status_message($code);
$write->("$protocol $code $message\r\n");

while (@$headers) {
my $k = shift(@$headers);
my $v = shift(@$headers);
if ($k eq 'Connection' && $v eq 'close') {
$keep_alive = 0;
}
elsif ($k eq 'Content-Length') {
$explicit_length = 1;
}
$write->("$k: $v\r\n");
}

my $no_body_allowed = ($req->method =~ /^head$/i)
|| ($code < 200)
|| ($code == 204)
|| ($code == 304);

if ($no_body_allowed) {
$write->("\r\n");
return;
}

$chunked = ($keep_alive && !$explicit_length);
$write->("Transfer-Encoding: chunked\r\n") if $chunked;

$write->("\r\n");

my $w = $chunked ? $write_chunked : $write;
my $c = $chunked ? $close_chunked : $close;

if ($body) {
Plack::Util::foreach($body, $w);
$c->();
return;
}

return Plack::Util::inline_object(write => $w, close => $c);
};

my $response = Plack::Util::run_app($app, $env);

if (ref $response eq 'CODE') {
$response->($start_response);
}
else {
$start_response->($response);
}
ClientFlushed => sub {
my $cb = $_[HEAP]->{client_flush};
$cb && $cb->();
},
);
}
Expand Down

0 comments on commit bcd08d9

Please sign in to comment.