Skip to content

Commit

Permalink
Merge pull request tokuhirom#3 from maxatome/master
Browse files Browse the repository at this point in the history
Comply with AnyEvent read_type view
  • Loading branch information
tokuhirom committed Jul 31, 2012
2 parents de15f98 + 4f6db62 commit a8c1fe2
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 73 deletions.
44 changes: 25 additions & 19 deletions lib/AnyEvent/MPRPC/Client.pm
Expand Up @@ -105,11 +105,7 @@ sub BUILD {
fh => $fh,
);

$handle->on_read(sub {
shift->unshift_read(msgpack => sub {
$self->_handle_response( $_[1] );
});
});
$handle->unshift_read(msgpack => $self->_handle_response_cb);

while (my $pooled = shift @{ $self->_request_pool }) {
$handle->push_write( msgpack => $pooled );
Expand Down Expand Up @@ -145,25 +141,35 @@ sub call {
$self->_callbacks->{ $msgid } = AnyEvent->condvar;
}

sub _handle_response {
my ($self, $res) = @_;
sub _handle_response_cb {
my $self = shift;

weaken $self;

my $d = delete $self->_callbacks->{ $res->[MP_RES_MSGID] };
return sub {
$self || return;

my ($handle, $res) = @_;

my $d = delete $self->_callbacks->{ $res->[MP_RES_MSGID] };

if (my $error = $res->[MP_RES_ERROR]) {
if ($d) {
$d->croak($error);
} else {
Carp::croak($error);
}
}

$handle->unshift_read(msgpack => $self->_handle_response_cb);

if (my $error = $res->[MP_RES_ERROR]) {
if ($d) {
$d->croak($error);
$d->send($res->[MP_RES_RESULT]);
} else {
Carp::croak($error);
warn q/Invalid response from server/;
return;
}
}

if ($d) {
$d->send($res->[MP_RES_RESULT]);
} else {
warn q/Invalid response from server/;
return;
}
};
}

__PACKAGE__->meta->make_immutable;
Expand Down
69 changes: 38 additions & 31 deletions lib/AnyEvent/MPRPC/Server.pm
Expand Up @@ -114,11 +114,8 @@ sub BUILD {
%{ $self->handler_options },
fh => $fh,
);
$handle->on_read(sub {
shift->unshift_read( msgpack => sub {
$self->_dispatch($indicator, @_);
}),
});

$handle->unshift_read(msgpack => $self->_dispatch_cb($indicator));

$self->_handlers->[ fileno($fh) ] = $handle;
}) unless defined $self->server;
Expand All @@ -135,39 +132,49 @@ sub reg_cb {
}
}

sub _dispatch {
my ($self, $indicator, $handle, $request) = @_;
$self->on_dispatch->($indicator, $handle, $request);
return if $handle->destroyed;
sub _dispatch_cb {
my ($self, $indicator) = @_;

return unless $request and ref $request eq 'ARRAY';
weaken $self;

my $target = $self->_callbacks->{ $request->[MP_REQ_METHOD] };
return sub {
$self || return;

my $id = $request->[MP_REQ_MSGID];
$indicator = "$indicator:$id";
my ($handle, $request) = @_;
$self->on_dispatch->($indicator, $handle, $request);
return if $handle->destroyed;

my $res_cb = sub {
my $type = shift;
my $result = @_ > 1 ? \@_ : $_[0];
$handle->unshift_read(msgpack => $self->_dispatch_cb($indicator));

$handle->push_write( msgpack => [
MP_TYPE_RESPONSE,
int($id), # should be IV.
$type eq 'error' ? $result : undef,
$type eq 'result' ? $result : undef,
]) if $handle;
};
weaken $handle;
return unless $request and ref $request eq 'ARRAY';

my $cv = AnyEvent::MPRPC::CondVar->new;
$cv->_cb(
sub { $res_cb->( result => $_[0]->recv ) },
sub { $res_cb->( error => $_[0]->recv ) },
);
my $target = $self->_callbacks->{ $request->[MP_REQ_METHOD] };

my $id = $request->[MP_REQ_MSGID];
$indicator = "$indicator:$id";

my $res_cb = sub {
my $type = shift;
my $result = @_ > 1 ? \@_ : $_[0];

$target ||= sub { shift->error(qq/No such method "@{[ $request->[MP_REQ_METHOD] ]}" found/) };
$target->( $cv, $request->[MP_REQ_PARAMS] );
$handle->push_write( msgpack => [
MP_TYPE_RESPONSE,
int($id), # should be IV.
$type eq 'error' ? $result : undef,
$type eq 'result' ? $result : undef,
]) if $handle;
};
weaken $handle;

my $cv = AnyEvent::MPRPC::CondVar->new;
$cv->_cb(
sub { $res_cb->( result => $_[0]->recv ) },
sub { $res_cb->( error => $_[0]->recv ) },
);

$target ||= sub { shift->error(qq/No such method "@{[ $request->[MP_REQ_METHOD] ]}" found/) };
$target->( $cv, $request->[MP_REQ_PARAMS] );
};
}

__PACKAGE__->meta->make_immutable;
Expand Down
11 changes: 5 additions & 6 deletions lib/AnyEvent/MessagePack.pm
Expand Up @@ -25,15 +25,14 @@ use AnyEvent::Handle;

sub {
my $buffer = delete $_[0]{rbuf};
return if $buffer eq '';

my $complete = 0;
$unpacker->feed($buffer);
while ($unpacker->next) {
$unpacker->feed($buffer) if defined $buffer;

if ($unpacker->next) {
$cb->( $_[0], $unpacker->data );
$complete++;
return 1;
}
return $complete;
return 0;
}
});
}
Expand Down
32 changes: 17 additions & 15 deletions t/01_handle.t
Expand Up @@ -3,37 +3,39 @@ use warnings;
use AE;
use AnyEvent::MessagePack;
use AnyEvent::Handle;
use File::Temp qw(tempfile);
use AnyEvent::Util;
use Test::More;

my ($fh, $fname) = tempfile(UNLINK => 0);
my($read_fh, $write_fh) = portable_pipe;
my @data = ( [ 1, 2, 3 ], [ 4, 5, 6 ] );

my $cv = AE::cv;

{
my $hdl = AnyEvent::Handle->new(fh => $fh, on_error => sub { die 'wtf' });
my $hdl = AnyEvent::Handle->new(fh => $write_fh, on_error => sub { die 'wtf' });
for my $d (@data) {
$hdl->push_write(msgpack => $d);
}
close $fh;
}

sub read_handler;
sub read_handler
{
my ($hdl, $data) = @_;

my $e = shift @data;
is_deeply $data, $e;
$cv->send() unless @data;

$hdl->push_read(msgpack => \&read_handler) if @data;
}

my $hdl = do {
open my $fh, '<', $fname or die $!;
my $hdl = AnyEvent::Handle->new(fh => $fh, on_error => sub { die 'wtf' });
$hdl->push_read(msgpack => sub {
my ($hdl, $data) = @_;

my $e = shift @data;
is_deeply $data, $e;
$cv->send() unless @data;
});
my $hdl = AnyEvent::Handle->new(fh => $read_fh, on_error => sub { die 'wtf' });
$hdl->push_read(msgpack => \&read_handler);
$hdl;
};

$cv->recv();
unlink $fname;

done_testing;

2 changes: 1 addition & 1 deletion t/02_connect.t
Expand Up @@ -10,7 +10,7 @@ test_tcp(
my $port = shift;
my $w = AnyEvent->signal( signal => 'PIPE', cb => sub { warn "SIGPIPE" } );

my $server = AnyEvent::MPRPC::Server->new(host => '127.0.0.1', port => $port);
my $server = AnyEvent::MPRPC::Server->new(host => '127.0.0.1', port => $port, on_error => sub {});
$server->reg_cb(
sum => sub {
my ($res_cv, $args) = @_;
Expand Down
2 changes: 1 addition & 1 deletion t/03_client.t
Expand Up @@ -10,7 +10,7 @@ test_tcp(
my $port = shift;
my $w = AnyEvent->signal( signal => 'PIPE', cb => sub { warn "SIGPIPE" } );

my $server = AnyEvent::MPRPC::Server->new(host => '127.0.0.1', port => $port);
my $server = AnyEvent::MPRPC::Server->new(host => '127.0.0.1', port => $port, on_error => sub {});
$server->reg_cb(
sum => sub {
my ($res_cv, $args) = @_;
Expand Down

0 comments on commit a8c1fe2

Please sign in to comment.