Skip to content

Commit

Permalink
Now just a consumer of POEx::Role::PSGIServer (thanks nperez!)
Browse files Browse the repository at this point in the history
  • Loading branch information
frodwith committed Sep 10, 2010
1 parent 983fa57 commit 973c0b9
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 329 deletions.
2 changes: 2 additions & 0 deletions Changes
Expand Up @@ -8,3 +8,5 @@
- poll_cb bug fixed (misunderstood the unreleased spec), tests!
0.05 Mon Feb 15 17:17:30 2010
- renamed to POE::Component::Server::PSGI
0.06
- Now just a simple consumer of POEx::Role::PSGIServer (thanks nperez!)
12 changes: 3 additions & 9 deletions dist.ini
@@ -1,5 +1,5 @@
name = POE-Component-Server-PSGI
version = 0.5
version = 0.6
abstract = PSGI Server implementation for POE
author = Paul Driver <frodwith@cpan.org>
license = Perl_5
Expand All @@ -8,11 +8,5 @@ copyright_holder = Paul Driver
[@Classic]

[Prereq]
POE = 0
POE::Component::Server::TCP = 0
POE::Filter::HTTP::Parser = 0
Plack = 0
HTTP::Message::PSGI = 0
HTTP::Status = 0
Test::More = 0
namespace::autoclean = 0
Moose = 0
POEx::Role::PSGIServer = 0
225 changes: 16 additions & 209 deletions lib/POE/Component/Server/PSGI.pm
@@ -1,206 +1,15 @@
package POE::Component::Server::PSGI;

use warnings;
use strict;

use namespace::autoclean;

use HTTP::Message::PSGI;
use HTTP::Status qw(status_message);
use Plack::Util;
use POE;
use POE::Component::Server::TCP;
use POE::Filter::HTTP::Parser;
use POE::Filter::Stream;

sub new {
my $class = shift;
my $opt = ref $_[0] eq 'HASH' ? shift : { @_ };
$opt->{port} ||= 8080,
$opt->{host} ||= '0.0.0.0',

return bless $opt, $class;
}

sub on_client_input {
my ($self, $heap, $req) = @_;
my $client = $heap->{client};

unless ($req->isa('HTTP::Request')) {
$client->put($req->as_string);
POE::Kernel->yield('shutdown');
return;
}

my $version = $req->header('X-HTTP-Version') || '0.9';
my $protocol = "HTTP/$version";

my $env = req_to_psgi($req,
SERVER_NAME => $self->{host},
SERVER_PORT => $self->{port},
SERVER_PROTOCOL => $protocol,
'psgi.streaming' => Plack::Util::TRUE,
'psgi.nonblocking' => Plack::Util::TRUE,
'psgi.runonce' => Plack::Util::FALSE,
);
use Moose;

my $connection = $req->header('Connection') || '';
my $keep_alive = $version eq '1.1' && $connection ne 'close';

my $write = sub { $client->put($_[0]) };
my $close = sub {
delete $heap->{client_flush};
POE::Kernel->yield('shutdown') unless $keep_alive;
return;
};

my $write_chunked = sub {
my $chunk = shift;
my $len = sprintf "%X", do { use bytes; length($chunk) };
$write->("$len\r\n$chunk\r\n");
};

my $close_chunked = sub {
$write->("0\r\n\r\n");
$close->();
};

my $start_response = sub {
my ($code, $headers, $body) = @{+shift};
my ($explicit_length, $chunked);
my $message = status_message($code);
$write->("$protocol $code $message\r\n");

while (@$headers) {
my $k = shift(@$headers);
my $v = shift(@$headers);
if ($k eq 'Connection' && $v eq 'close') {
$keep_alive = 0;
}
elsif ($k eq 'Content-Length') {
$explicit_length = 1;
}
$write->("$k: $v\r\n");
}

my $no_body_allowed = ($req->method =~ /^head$/i)
|| ($code < 200)
|| ($code == 204)
|| ($code == 304);

if ($no_body_allowed) {
$write->("\r\n");
return;
}

$chunked = ($keep_alive && !$explicit_length);
$write->("Transfer-Encoding: chunked\r\n") if $chunked;

$write->("\r\n");

my $w = $chunked ? $write_chunked : $write;
my $c = $chunked ? $close_chunked : $close;

if ($body) {
if (Plack::Util::is_real_fh($body)) {
my ($wheel, $buffer);
my $flusher = sub {
return unless $buffer;
$w->($buffer);
$buffer = '';
$wheel->resume_input() if $wheel;
};
$heap->{client_flush} = $flusher;
POE::Session->create(
inline_states => {
_start => sub {
$wheel = POE::Wheel::ReadWrite->new(
Handle => $body,
Filter => POE::Filter::Stream->new,
InputEvent => 'got_input',
ErrorEvent => 'got_error',
);
},
got_error => sub {
my ($op, $errno, $errstr, $id) = @_[ARG0..ARG3];
if ($op eq 'read') {
delete $_[HEAP]->{wheels}->{$id};
$wheel = undef;
$body->close();
$c->();
}
},
got_input => sub {
my $data = $_[ARG0];
my $already_flushed = !$buffer;
$buffer .= $data;
if ($already_flushed) {
$flusher->();
}
else {
my $len = do { use bytes; length($buffer) };
$wheel->pause_input if $len > 1024;
}
}
}
);
}
else {
Plack::Util::foreach($body, $w);
$c->();
}
return;
}

my $writer; $writer = Plack::Util::inline_object(
write => $w,
close => sub { $c->(@_); undef $writer },
poll_cb => sub {
my $get = shift;
($heap->{client_flush} = sub {
$get->($writer);
})->();
},
);
return $writer;
};

my $response = Plack::Util::run_app($self->{app}, $env);

if (ref $response eq 'CODE') {
$response->($start_response);
}
else {
$start_response->($response);
}
}

sub register_service {
my ($self, $app) = @_;
$self->{app} = $app;

my $filter = POE::Filter::HTTP::Parser->new( type => 'server' );
print STDERR "Listening on $self->{host}:$self->{port}\n";
POE::Component::Server::TCP->new(
Port => $self->{port},
Address => $self->{host},
ClientInput => sub {
$self->on_client_input(@_[HEAP, ARG0]);
},
ClientInputFilter => $filter,
ClientOutputFilter => 'POE::Filter::Stream',
ClientFlushed => sub {
my $cb = $_[HEAP]->{client_flush};
$cb && $cb->();
},
);
}
with 'POEx::Role::PSGIServer';

sub run {
my ($self, $app) = @_;
$self->register_service($app);
POE::Kernel->run;
}
before run => sub {
my $self = shift;
my $host = $self->listen_ip;
my $port = $self->listen_port;
print STDERR "Listening on $host:$port\n";
};

1;

Expand All @@ -212,7 +21,14 @@ POE::Component::Server::PSGI
=head1 DESCRIPTION
PSGI Server implementation for POE
PSGI Server implementation for POE.
=head1 NOTE
We've switched over to using nperez's excellent L<POEx::Role::PSGIServer>,
since it's essentially a (much better) refactor of this module's original
code. Use this if you just want a default implementation of his role with no
modifications.
=head1 SYNOPSIS
Expand Down Expand Up @@ -240,13 +56,4 @@ This module is licensed under the same terms as Perl itself.
L<Plack>
=begin Pod::Coverage
new
on_client_input
register_service
run
=end Pod::Coverage
=cut
104 changes: 0 additions & 104 deletions t/streaming.t

This file was deleted.

7 changes: 0 additions & 7 deletions t/suite.t

This file was deleted.

0 comments on commit 973c0b9

Please sign in to comment.