Skip to content
Browse files

Separate AnyEvent::RabbitMQ from Net::RabbitFoot. Added a test for re…

…ject method. Reuse Coro.
  • Loading branch information...
1 parent a460738 commit bff751adfe0b8b00f2904686ef5deeef6be1d7e6 @cooldaemon committed
View
2 Changes
@@ -1,5 +1,7 @@
Revision history for Perl extension RabbitFoot
+1.03 Thu Apr 7 03:10:37 2011
+ - Separate AnyEvent::RabbitMQ from Net::RabbitFoot.
- Avoid (additional) issues when in global destruction.
- Do not set reply_to to an empty string in the header frame.
- Implement basic.reject (requires RabbitMQ >= 2.0.0).
View
9 Makefile.PL
@@ -10,12 +10,9 @@ requires 'MooseX::ConfigFromFile';
requires 'Config::Any';
requires 'JSON::XS';
requires 'List::MoreUtils';
-requires 'Net::AMQP';
-requires 'AnyEvent';
-requires 'File::ShareDir';
-
-requires 'Devel::GlobalDestruction';
-requires 'namespace::clean';
+requires 'AnyEvent::RabbitMQ';
+requires 'Coro';
+requires 'Coro::AnyEvent';
tests 't/*.t';
author_tests 'xt';
View
4 README
@@ -7,10 +7,10 @@ You can use Net::RabbitFoot to -
* Declare and delete exchanges
* Declare, delete, bind and unbind queues
* Set QoS
- * Publish, consume, get, ack and recover messages
+ * Publish, consume, get, ack, recover and reject messages
* Select, commit and rollback transactions
-Net::RabbitFoot is known to work with RabbitMQ versions 2.3.1 and version 0-8 of the AMQP specification.
+Net::RabbitFoot is known to work with RabbitMQ versions 2.4.0 and version 0-8 of the AMQP specification.
INSTALLATION
View
3,908 amqp0-8.xml
0 additions, 3,908 deletions not shown because the diff is too large. Please use a local Git client to view these changes.
View
589 lib/AnyEvent/RabbitMQ.pm
@@ -1,589 +0,0 @@
-package AnyEvent::RabbitMQ;
-
-use strict;
-use warnings;
-
-use Data::Dumper;
-use Carp qw/ confess /;
-use List::MoreUtils qw(none);
-
-use AnyEvent::Handle;
-use AnyEvent::Socket;
-
-use Net::AMQP;
-use Net::AMQP::Common qw(:all);
-
-use AnyEvent::RabbitMQ::Channel;
-use AnyEvent::RabbitMQ::LocalQueue;
-
-use Devel::GlobalDestruction;
-use namespace::clean;
-
-our $VERSION = '1.03';
-
-sub new {
- my $class = shift;
- return bless {
- verbose => 0,
- @_,
- _is_open => 0,
- _queue => AnyEvent::RabbitMQ::LocalQueue->new,
- _channels => {},
- _login_user => '',
- _server_properties => {},
- }, $class;
-}
-
-sub channels {
- my $self = shift;
- return $self->{_channels};
-}
-
-sub delete_channel {
- my $self = shift;
- my ($id) = @_;
- return delete $self->{_channels}->{$id};
-}
-
-sub login_user {
- my $self = shift;
- return $self->{_login_user};
-}
-
-sub load_xml_spec {
- my $self = shift;
- Net::AMQP::Protocol->load_xml_spec(@_); # die when fail in this line.
- return $self;
-}
-
-sub connect {
- my $self = shift;
- my %args = $self->_set_cbs(@_);
-
- if ($self->{_is_open}) {
- $args{on_failure}->('Connection has already been opened');
- return $self;
- }
-
- $args{on_close} ||= sub {};
- $args{on_read_failure} ||= sub {warn @_, "\n"};
- $args{timeout} ||= 0;
-
- for (qw/ host port /) {
- confess("No $_ passed to connect to") unless $args{$_};
- }
-
- if ($self->{verbose}) {
- warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
- }
-
- $self->{_connect_guard} = AnyEvent::Socket::tcp_connect(
- $args{host},
- $args{port},
- sub {
- my $fh = shift or return $args{on_failure}->(
- sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!)
- );
-
- $self->{_handle} = AnyEvent::Handle->new(
- fh => $fh,
- on_error => sub {
- my ($handle, $fatal, $message) = @_;
-
- $self->{_channels} = {};
- if (!$self->{_is_open}) {
- $args{on_failure}->(@_);
- }
- $self->{_is_open} = 0;
- $self->_disconnect();
- $args{on_close}->($message);
- },
- on_drain => sub {
- my ($handle) = @_;
- $self->{drain_condvar}->send
- if exists $self->{drain_condvar};
- },
- );
- $self->_read_loop($args{on_close}, $args{on_read_failure});
- $self->_start(%args,);
- },
- sub {
- return $args{timeout};
- },
- );
-
- return $self;
-}
-
-sub server_properties {
- return shift->{_server_properties};
-}
-
-sub _read_loop {
- my ($self, $close_cb, $failure_cb,) = @_;
-
- return if !defined $self->{_handle}; # called on_error
-
- $self->{_handle}->push_read(chunk => 8, sub {
- my $data = $_[1];
- my $stack = $_[1];
-
- if (length($data) <= 7) {
- $failure_cb->('Broken data was received');
- @_ = ($self, $close_cb, $failure_cb,);
- goto &_read_loop;
- }
-
- my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
- if (!defined $type_id || !defined $channel || !defined $length) {
- $failure_cb->('Broken data was received');
- @_ = ($self, $close_cb, $failure_cb,);
- goto &_read_loop;
- }
-
- $self->{_handle}->push_read(chunk => $length, sub {
- $stack .= $_[1];
- my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
-
- if ($self->{verbose}) {
- warn '[C] <-- [S] ' . Dumper($frame);
- warn '-----------', "\n";
- }
-
- my $id = $frame->channel;
- if (0 == $id) {
- return if !$self->_check_close_and_clean($frame, $close_cb,);
- $self->{_queue}->push($frame);
- } else {
- my $channel = $self->{_channels}->{$id};
- if (defined $channel) {
- $channel->push_queue_or_consume($frame, $failure_cb);
- } else {
- $failure_cb->('Unknown channel id: ' . $frame->channel);
- }
- }
-
- @_ = ($self, $close_cb, $failure_cb,);
- goto &_read_loop;
- });
- });
-
- return $self;
-}
-
-sub _check_close_and_clean {
- my $self = shift;
- my ($frame, $close_cb,) = @_;
-
- return 1 if !$frame->isa('Net::AMQP::Frame::Method');
-
- my $method_frame = $frame->method_frame;
- return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');
-
- $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
- $self->{_channels} = {};
- $self->{_is_open} = 0;
- $self->_disconnect();
- $close_cb->($frame);
- return;
-}
-
-sub _start {
- my $self = shift;
- my %args = @_;
-
- if ($self->{verbose}) {
- warn 'post header', "\n";
- }
-
- $self->{_handle}->push_write(Net::AMQP::Protocol->header);
-
- $self->_push_read_and_valid(
- 'Connection::Start',
- sub {
- my $frame = shift;
-
- my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
- return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
- if none {$_ eq 'AMQPLAIN'} @mechanisms;
-
- my @locales = split /\s/, $frame->method_frame->locales;
- return $args{on_failure}->('en_US is not found in locales')
- if none {$_ eq 'en_US'} @locales;
-
- $self->{_server_properties} = $frame->method_frame->server_properties;
-
- $self->_push_write(
- Net::AMQP::Protocol::Connection::StartOk->new(
- client_properties => {
- platform => 'Perl',
- product => __PACKAGE__,
- information => 'http://d.hatena.ne.jp/cooldaemon/',
- version => __PACKAGE__->VERSION,
- },
- mechanism => 'AMQPLAIN',
- response => {
- LOGIN => $args{user},
- PASSWORD => $args{pass},
- },
- locale => 'en_US',
- ),
- );
-
- $self->_tune(%args,);
- },
- $args{on_failure},
- );
-
- return $self;
-}
-
-sub _tune {
- my $self = shift;
- my %args = @_;
-
- $self->_push_read_and_valid(
- 'Connection::Tune',
- sub {
- my $frame = shift;
-
- $self->_push_write(
- Net::AMQP::Protocol::Connection::TuneOk->new(
- channel_max => $frame->method_frame->channel_max,
- frame_max => $frame->method_frame->frame_max,
- heartbeat => $frame->method_frame->heartbeat,
- ),
- );
-
- $self->_open(%args,);
- },
- $args{on_failure},
- );
-
- return $self;
-}
-
-sub _open {
- my $self = shift;
- my %args = @_;
-
- $self->_push_write_and_read(
- 'Connection::Open',
- {
- virtual_host => $args{vhost},
- capabilities => '',
- insist => 1,
- },
- 'Connection::OpenOk',
- sub {
- $self->{_is_open} = 1;
- $self->{_login_user} = $args{user};
- $args{on_success}->($self);
- },
- $args{on_failure},
- );
-
- return $self;
-}
-
-sub close {
- my $self = shift;
- my %args = $self->_set_cbs(@_);
-
- if (!$self->{_is_open}) {
- $args{on_success}->(@_);
- return $self;
- }
-
- my $close_cb = sub {
- $self->_close(
- sub {
- $self->_disconnect();
- $args{on_success}->(@_);
- },
- sub {
- $self->_disconnect();
- $args{on_failure}->(@_);
- }
- );
- return $self;
- };
-
- if (0 == scalar keys %{$self->{_channels}}) {
- return $close_cb->();
- }
-
- for my $id (keys %{$self->{_channels}}) {
- my $channel = $self->{_channels}->{$id}
- or next; # Could have already gone away on global destruction..
- $channel->close(
- on_success => $close_cb,
- on_failure => sub {
- $close_cb->();
- $args{on_failure}->(@_);
- },
- );
- }
-
- return $self;
-}
-
-sub _close {
- my $self = shift;
- my ($cb, $failure_cb,) = @_;
-
- return $self if !$self->{_is_open} || 0 < scalar keys %{$self->{_channels}};
-
- $self->_push_write_and_read(
- 'Connection::Close', {}, 'Connection::CloseOk',
- $cb, $failure_cb,
- );
- $self->{_is_open} = 0;
-
- return $self;
-}
-
-sub _disconnect {
- my $self = shift;
- $self->{_handle}->push_shutdown;
- return $self;
-}
-
-sub open_channel {
- my $self = shift;
- my %args = $self->_set_cbs(@_);
-
- return $self if !$self->_check_open($args{on_failure});
-
- $args{on_close} ||= sub {};
-
- my $id = $args{id};
- if ($id && $self->{_channels}->{$id}) {
- $args{on_failure}->("Channel id $id is already in use");
- return $self;
- }
-
- if (!$id) {
- for my $candidate_id (1 .. (2**16 - 1)) {
- next if defined $self->{_channels}->{$candidate_id};
- $id = $candidate_id;
- last;
- }
- if (!$id) {
- $args{on_failure}->('Ran out of channel ids');
- return $self;
- }
- }
-
- my $channel = AnyEvent::RabbitMQ::Channel->new(
- id => $id,
- connection => $self,
- on_close => $args{on_close},
- );
-
- $self->{_channels}->{$id} = $channel;
-
- $channel->open(
- on_success => sub {
- $args{on_success}->($channel);
- },
- on_failure => sub {
- $self->delete_channel($id);
- $args{on_failure}->(@_);
- },
- );
-
- return $self;
-}
-
-sub _push_write_and_read {
- my $self = shift;
- my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;
-
- $method = 'Net::AMQP::Protocol::' . $method;
- $self->_push_write(
- Net::AMQP::Frame::Method->new(
- method_frame => $method->new(%$args)
- ),
- $id,
- );
-
- return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
-}
-
-sub _push_read_and_valid {
- my $self = shift;
- my ($exp, $cb, $failure_cb, $id,) = @_;
- $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
-
- my $queue;
- if (!$id) {
- $queue = $self->{_queue};
- } elsif (defined $self->{_channels}->{$id}) {
- $queue = $self->{_channels}->{$id}->queue;
- } else {
- $failure_cb->('Unknown channel id: ' . $id);
- }
-
- return unless $queue; # Can go away in global destruction..
- $queue->get(sub {
- my $frame = shift;
-
- return $failure_cb->('Received data is not method frame')
- if !$frame->isa('Net::AMQP::Frame::Method');
-
- my $method_frame = $frame->method_frame;
- for my $exp_elem (@$exp) {
- return $cb->($frame)
- if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
- }
-
- $failure_cb->(
- 'Method is not ' . join(',', @$exp) . "\n"
- . 'Method was ' . ref $method_frame
- );
- });
-}
-
-sub _push_write {
- my $self = shift;
- my ($output, $id,) = @_;
-
- if ($output->isa('Net::AMQP::Protocol::Base')) {
- $output = $output->frame_wrap;
- }
- $output->channel($id || 0);
-
- if ($self->{verbose}) {
- warn '[C] --> [S] ', Dumper($output);
- }
-
- $self->{_handle}->push_write($output->to_raw_frame())
- if $self->{_handle}; # Careful - could have gone (global destruction)
- return;
-}
-
-sub _set_cbs {
- my $self = shift;
- my %args = @_;
-
- $args{on_success} ||= sub {};
- $args{on_failure} ||= sub { return if in_global_destruction; die @_};
-
- return %args;
-}
-
-sub _check_open {
- my $self = shift;
- my ($failure_cb) = @_;
-
- return 1 if $self->{_is_open};
-
- $failure_cb->('Connection has already been closed');
- return 0;
-}
-
-sub drain_writes {
- my ($self, $timeout) = shift;
- $self->{drain_condvar} = AnyEvent->condvar;
- if ($timeout) {
- $self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
- $self->{drain_condvar}->croak("Timed out after $timeout");
- });
- }
- $self->{drain_condvar}->recv;
- delete $self->{drain_timer};
-}
-
-my $is_gd;
-
-END { $is_gd++ };
-
-sub DESTROY {
- my $self = shift;
- return if $is_gd;
- $self->close() if defined $self;
- return;
-}
-
-1;
-__END__
-
-=head1 NAME
-
-AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
-
-=head1 SYNOPSIS
-
- use AnyEvent::RabbitMQ;
-
- my $cv = AnyEvent->condvar;
-
- my $ar = AnyEvent::RabbitMQ->new->load_xml_spec(
- '/path/to/amqp0-8.xml',
- )->connect(
- host => 'localhost',
- port => 5672,
- user => 'guest',
- pass => 'guest',
- vhost => '/',
- timeout => 1,
- on_success => sub {
- $ar->open_channel(
- on_success => sub {
- my $channel = shift;
- $channel->declare_exchange(
- exchange => 'test_exchange',
- on_success => sub {
- $cv->send('Declared exchange');
- },
- on_failure => $cv,
- );
- },
- on_failure => $cv,
- on_close => sub {
- my $method_frame = shift->method_frame;
- die $method_frame->reply_code, $method_frame->reply_text;
- }
- );
- },
- on_failure => $cv,
- on_read_failure => sub {die @_},
- on_close => sub {
- my $method_frame = shift->method_frame;
- die $method_frame->reply_code, $method_frame->reply_text;
- },
- );
-
- print $cv->recv, "\n";
-
-=head1 DESCRIPTION
-
-AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.
-
-You can use AnyEvent::RabbitMQ to -
-
- * Declare and delete exchanges
- * Declare, delete, bind and unbind queues
- * Set QoS
- * Publish, consume, get, ack and recover messages
- * Select, commit and rollback transactions
-
-AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 2.3.1 and version 0-8 of the AMQP specification.
-
-=head1 AUTHOR
-
-Masahito Ikuta E<lt>cooldaemon@gmail.comE<gt>
-
-=head1 COPYRIGHT
-
-Copyright (c) 2010, the above named author(s).
-
-=head1 SEE ALSO
-
-=head1 LICENSE
-
-This library is free software; you can redistribute it and/or modify
-it under the same terms as Perl itself.
-
-=cut
View
885 lib/AnyEvent/RabbitMQ/Channel.pm
@@ -1,885 +0,0 @@
-package AnyEvent::RabbitMQ::Channel;
-
-use strict;
-use warnings;
-
-use Scalar::Util qw(weaken);
-use AnyEvent::RabbitMQ::LocalQueue;
-
-our $VERSION = '1.03';
-
-sub new {
- my $class = shift;
- my $self = bless {
- @_, # id, connection, on_close
- _is_open => 0,
- _is_active => 0,
- _queue => AnyEvent::RabbitMQ::LocalQueue->new,
- _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
- _consumer_cbs => {},
- _return_cbs => {},
- }, $class;
- weaken($self->{connection});
- return $self;
-}
-
-sub queue {
- my $self = shift;
- return $self->{_queue};
-}
-
-sub open {
- my $self = shift;
- my %args = @_;
-
- if ($self->{_is_open}) {
- $args{on_failure}->('Channel has already been opened');
- return $self;
- }
-
- $self->{connection}->_push_write_and_read(
- 'Channel::Open', {}, 'Channel::OpenOk',
- sub {
- $self->{_is_open} = 1;
- $self->{_is_active} = 1;
- $args{on_success}->();
- },
- $args{on_failure},
- $self->{id},
- );
-
- return $self;
-}
-
-sub close {
- my $self = shift;
- my $connection = $self->{connection}
- or return;
- my %args = $connection->_set_cbs(@_);
-
- return $self if !$self->{_is_open};
-
- return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}};
-
- for my $consumer_tag (keys %{$self->{_consumer_cbs}}) {
- $self->cancel(
- consumer_tag => $consumer_tag,
- on_success => sub {
- $self->_close(%args);
- },
- on_failure => sub {
- $self->_close(%args);
- $args{on_failure}->(@_);
- }
- );
- }
-
- return $self;
-}
-
-sub _close {
- my $self = shift;
- my %args = @_;
-
- $self->{connection}->_push_write_and_read(
- 'Channel::Close', {}, 'Channel::CloseOk',
- sub {
- $self->{_is_open} = 0;
- $self->{_is_active} = 0;
- $self->{connection}->delete_channel($self->{id});
- $args{on_success}->();
- },
- sub {
- $self->{_is_open} = 0;
- $self->{_is_active} = 0;
- $self->{connection}->delete_channel($self->{id});
- $args{on_failure}->();
- },
- $self->{id},
- );
-
- return $self;
-}
-
-sub declare_exchange {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Exchange::Declare',
- {
- type => 'direct',
- passive => 0,
- durable => 0,
- auto_delete => 0,
- internal => 0,
- %args, # exchange
- ticket => 0,
- nowait => 0, # FIXME
- },
- 'Exchange::DeclareOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub delete_exchange {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Exchange::Delete',
- {
- if_unused => 0,
- %args, # exchange
- ticket => 0,
- nowait => 0, # FIXME
- },
- 'Exchange::DeleteOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub declare_queue {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Queue::Declare',
- {
- queue => '',
- passive => 0,
- durable => 0,
- exclusive => 0,
- auto_delete => 0,
- no_ack => 1,
- %args,
- ticket => 0,
- nowait => 0, # FIXME
- },
- 'Queue::DeclareOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-}
-
-sub bind_queue {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Queue::Bind',
- {
- %args, # queue, exchange, routing_key
- ticket => 0,
- nowait => 0, # FIXME
- },
- 'Queue::BindOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub unbind_queue {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Queue::Unbind',
- {
- %args, # queue, exchange, routing_key
- ticket => 0,
- },
- 'Queue::UnbindOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub purge_queue {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Queue::Purge',
- {
- %args, # queue
- ticket => 0,
- nowait => 0, # FIXME
- },
- 'Queue::PurgeOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub delete_queue {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Queue::Delete',
- {
- if_unused => 0,
- if_empty => 0,
- %args, # queue
- ticket => 0,
- nowait => 0, # FIXME
- },
- 'Queue::DeleteOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub publish {
- my $self = shift;
- my %args = @_;
-
- return $self if !$self->{_is_active};
-
- my $header_args = delete $args{header} || {};
- my $body = delete $args{body} || '';
- my $return_cb = delete $args{on_return} || sub {};
-
- $self->_publish(
- %args,
- )->_header(
- $header_args, $body,
- )->_body(
- $body,
- );
-
- return $self if !$args{mandatory} && !$args{immediate};
-
- $self->{_return_cbs}->{
- ($args{exchange} || '') . '_' . $args{routing_key}
- } = $return_cb;
-
- return $self;
-}
-
-sub _publish {
- my $self = shift;
- my %args = @_;
-
- $self->{connection}->_push_write(
- Net::AMQP::Protocol::Basic::Publish->new(
- exchange => '',
- mandatory => 0,
- immediate => 0,
- %args, # routing_key
- ticket => 0,
- ),
- $self->{id},
- );
-
- return $self;
-}
-
-sub _header {
- my ($self, $args, $body,) = @_;
-
- $self->{connection}->_push_write(
- Net::AMQP::Frame::Header->new(
- weight => $args->{weight} || 0,
- body_size => length($body),
- header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
- content_type => 'application/octet-stream',
- content_encoding => undef,
- headers => {},
- delivery_mode => 1,
- priority => 1,
- correlation_id => undef,
- expiration => undef,
- message_id => undef,
- timestamp => time,
- type => undef,
- user_id => $self->{connection}->login_user,
- app_id => undef,
- cluster_id => undef,
- %$args,
- ),
- ),
- $self->{id},
- );
-
- return $self;
-}
-
-sub _body {
- my ($self, $body,) = @_;
-
- $self->{connection}->_push_write(
- Net::AMQP::Frame::Body->new(payload => $body),
- $self->{id},
- );
-
- return $self;
-}
-
-sub consume {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- my $consumer_cb = delete $args{on_consume} || sub {};
-
- $self->{connection}->_push_write_and_read(
- 'Basic::Consume',
- {
- consumer_tag => '',
- no_local => 0,
- no_ack => 1,
- exclusive => 0,
- %args, # queue
- ticket => 0,
- nowait => 0, # FIXME
- },
- 'Basic::ConsumeOk',
- sub {
- my $frame = shift;
- $self->{_consumer_cbs}->{
- $frame->method_frame->consumer_tag
- } = $consumer_cb;
- $cb->($frame);
- },
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub cancel {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- if (!defined $args{consumer_tag}) {
- $failure_cb->('consumer_tag is not set');
- return $self;
- }
-
- if (!$self->{_consumer_cbs}->{$args{consumer_tag}}) {
- $failure_cb->('Unknown consumer_tag');
- return $self;
- }
-
- $self->{connection}->_push_write_and_read(
- 'Basic::Cancel',
- {
- %args, # consumer_tag
- nowait => 0,
- },
- 'Basic::CancelOk',
- sub {
- my $frame = shift;
- delete $self->{_consumer_cbs}->{$args{consumer_tag}};
- $cb->($frame);
- },
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub get {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Basic::Get',
- {
- no_ack => 1,
- %args, # queue
- ticket => 0,
- },
- [qw(Basic::GetOk Basic::GetEmpty)],
- sub {
- my $frame = shift;
- return $cb->({empty => $frame})
- if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
- $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
- },
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub ack {
- my $self = shift;
- my %args = @_;
-
- return $self if !$self->_check_open(sub {});
-
- $self->{connection}->_push_write(
- Net::AMQP::Protocol::Basic::Ack->new(
- delivery_tag => 0,
- multiple => (
- defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
- ),
- %args,
- ),
- $self->{id},
- );
-
- return $self;
-}
-
-sub qos {
- my $self = shift;
- my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Basic::Qos',
- {
- prefetch_count => 1,
- %args,
- prefetch_size => 0,
- global => 0,
- },
- 'Basic::QosOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub recover {
- my $self = shift;
- my %args = @_;
-
- return $self if !$self->_check_open(sub {});
-
- $self->{connection}->_push_write(
- Net::AMQP::Protocol::Basic::Recover->new(
- requeue => 1,
- %args,
- ),
- $self->{id},
- );
-
- return $self;
-}
-
-sub reject {
- my $self = shift;
- my %args = @_;
-
- return $self if !$self->_check_open( sub { } );
-
- $self->{connection}->_push_write(
- Net::AMQP::Protocol::Basic::Reject->new(
- delivery_tag => 0,
- requeue => 0,
- %args,
- ),
- $self->{id},
- );
-
- return $self;
-}
-
-sub select_tx {
- my $self = shift;
- my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Tx::Select', {}, 'Tx::SelectOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub commit_tx {
- my $self = shift;
- my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Tx::Commit', {}, 'Tx::CommitOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub rollback_tx {
- my $self = shift;
- my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
-
- return $self if !$self->_check_open($failure_cb);
-
- $self->{connection}->_push_write_and_read(
- 'Tx::Rollback', {}, 'Tx::RollbackOk',
- $cb,
- $failure_cb,
- $self->{id},
- );
-
- return $self;
-}
-
-sub push_queue_or_consume {
- my $self = shift;
- my ($frame, $failure_cb,) = @_;
-
- if ($frame->isa('Net::AMQP::Frame::Method')) {
- my $method_frame = $frame->method_frame;
- if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
- $self->{connection}->_push_write(
- Net::AMQP::Protocol::Channel::CloseOk->new(),
- $self->{id},
- );
- $self->{_is_open} = 0;
- $self->{_is_active} = 0;
- $self->{connection}->delete_channel($self->{id});
- $self->{on_close}->($frame);
- return $self;
- } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
- my $cb = $self->{_consumer_cbs}->{
- $method_frame->consumer_tag
- } || sub {};
- $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
- return $self;
- } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
- my $cb = $self->{_return_cbs}->{
- $method_frame->exchange . '_' . $method_frame->routing_key
- } || sub {};
- $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
- return $self;
- } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
- $self->{_is_active} = $method_frame->active;
- $self->{connection}->_push_write(
- Net::AMQP::Protocol::Channel::FlowOk->new(
- active => $method_frame->active,
- ),
- $self->{id},
- );
- return $self;
- }
- $self->{_queue}->push($frame);
- } else {
- $self->{_content_queue}->push($frame);
- }
-
- return $self;
-}
-
-sub _push_read_header_and_body {
- my $self = shift;
- my ($type, $frame, $cb, $failure_cb,) = @_;
- my $response = {$type => $frame};
-
- $self->{_content_queue}->get(sub{
- my $frame = shift;
-
- return $failure_cb->('Received data is not header frame')
- if !$frame->isa('Net::AMQP::Frame::Header');
-
- my $header_frame = $frame->header_frame;
- return $failure_cb->(
- 'Header is not Protocol::Basic::ContentHeader'
- . 'Header was ' . ref $header_frame
- ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
-
- $response->{header} = $header_frame;
- });
-
- $self->{_content_queue}->get(sub{
- my $frame = shift;
-
- return $failure_cb->('Received data is not body frame')
- if !$frame->isa('Net::AMQP::Frame::Body');
-
- $response->{body} = $frame;
- $cb->($response);
- });
-
- return $self;
-}
-
-sub _delete_cbs {
- my $self = shift;
- my %args = @_;
-
- my $cb = delete $args{on_success} || sub {};
- my $failure_cb = delete $args{on_failure} || sub {die @_};
-
- return $cb, $failure_cb, %args;
-}
-
-sub _check_open {
- my $self = shift;
- my ($failure_cb) = @_;
-
- return 1 if $self->{_is_open};
-
- $failure_cb->('Channel has already been closed');
- return 0;
-}
-
-sub DESTROY {
- my $self = shift;
- $self->close() if defined $self;
- return;
-}
-
-1;
-
-1;
-__END__
-
-=head1 NAME
-
-AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel.
-
-=head1 SYNOPSIS
-
- my $ch = $rf->open_channel();
- $ch->declare_exchange(exchange => 'test_exchange');
-
-=head1 DESRIPTION
-
-=head1 METHODS
-
-=head2 declare_exchange (%args)
-
-Declare an exchange (to publish messages to) on the server.
-
-Arguments:
-
-=over
-
-=item on_success
-
-=item on_failure
-
-=item type
-
-Default 'direct'
-
-=item passive
-
-Default 0
-
-=item durable
-
-Default 0
-
-=item auto_delete
-
-Default 0
-
-=item internal
-
-Default 0
-
-=item exchange
-
-The name of the exchange
-
-=back
-
-=head2 delete_exchange
-
-=head2 declare_queue
-
-=head2 bind_queue
-
-Binds a queue to an exchange, with a routing key.
-
-Arguments:
-
-=over
-
-=item queue
-
-The name of the queue to bind
-
-=item exchange
-
-The name of the exchange to bind
-
-=item routing_key
-
-The routing key to bind with
-
-=head2 unbind_queue
-
-=head2 purge_queue
-
-Flushes the contents of a queue.
-
-=head2 delete_queue
-
-Deletes a queue. The queue may not have any active consumers.
-
-=head2 publish
-
-Publish a message to an exchange
-
-Arguments:
-
-=over
-
-=item body
-
-The text body of the message to send.
-
-=item exchange
-
-The name of the exchange to send the message to.
-
-=item routing_key
-
-The routing key with which to publish the message.
-
-=back
-
-=head2 consume
-
-Subscribe to consume messages from a queue.
-
-Arguments:
-
-=over
-
-=item on_consume
-
-Callback called with an argument of the message which has been consumed.
-
-=item consumer_tag
-
-Identifies this consumer, will be auto-generated if you do not provide it, but you must
-supply a value if you want to be able to later cancel the subscription.
-
-=item on_success
-
-Callback called if the subscription was successfull (before the first message is consumed).
-
-=item on_failure
-
-Callback called if the subscription fails for any reason.
-
-=back
-
-=head2 cancel
-
-Cancel a queue subscription.
-
-Note that the cancellation B<will not> take place at once, and further messages may be
-consumed before the subscription is cancelled. No further messages will be
-consumed after the on_success callback has been called.
-
-Arguments:
-
-=item consumer_tag
-
-Identifies this consumer, needs to be the value supplied when the queue is initially
-consumed from.
-
-=item on_success
-
-Callback called if the subscription was successfully cancelled.
-
-=item on_failure
-
-Callback called if the subscription could not be cancelled for any reason.
-
-=head2 get
-
-Try to get a single message from a queue.
-
-Arguments:
-
-=over
-
-=item queue
-
-Mandatory. Name of the queue to try to recieve a message from.
-
-=item on_success
-
-Will be called either with either a message, or, if the queue is empty,
-a notification that there was nothing to collect from the queue.
-
-=item on_failure
-
-This callback will be called if an error is signaled on this channel.
-
-=back
-
-=head2 ack
-
-=head2 qos
-
-=head2 recover
-
-=head2 select_tx
-
-=head2 commit_tx
-
-=head2 rollback_tx
-
-=head1 AUTHOR, COPYRIGHT AND LICENSE
-
-See L<AnyEvent::RabbitMQ> for author(s), copyright and license.
-
-=cut
-
-
View
49 lib/AnyEvent/RabbitMQ/LocalQueue.pm
@@ -1,49 +0,0 @@
-package AnyEvent::RabbitMQ::LocalQueue;
-
-use strict;
-use warnings;
-
-our $VERSION = '1.03';
-
-sub new {
- my $class = shift;
- return bless {
- _message_queue => [],
- _drain_code_queue => [],
- }, $class;
-}
-
-sub push {
- my $self = shift;
-
- CORE::push @{$self->{_message_queue}}, @_;
- return $self->_drain_queue();
-}
-
-sub get {
- my $self = shift;
-
- CORE::push @{$self->{_drain_code_queue}}, @_;
- return $self->_drain_queue();
-}
-
-sub _drain_queue {
- my $self = shift;
-
- my $message_count = scalar @{$self->{_message_queue}};
- my $drain_code_count = scalar @{$self->{_drain_code_queue}};
-
- my $count = $message_count < $drain_code_count
- ? $message_count : $drain_code_count;
-
- for (1 .. $count) {
- &{shift @{$self->{_drain_code_queue}}}(
- shift @{$self->{_message_queue}}
- );
- }
-
- return $self;
-}
-
-1;
-
View
26 lib/Net/RabbitFoot.pm
@@ -3,10 +3,9 @@ package Net::RabbitFoot;
use strict;
use warnings;
-use AnyEvent;
use AnyEvent::RabbitMQ;
-
-use File::ShareDir ();
+use Coro;
+use Coro::AnyEvent;
use Net::RabbitFoot::Channel;
@@ -40,9 +39,8 @@ sub load_xml_spec {
return $self;
}
-sub default_amqp_spec {
- my $dir = File::ShareDir::dist_dir("Net-RabbitFoot");
- return "$dir/fixed_amqp0-8.xml";
+sub server_properties {
+ return shift->{_ar}->server_properties();
}
sub open_channel {
@@ -53,16 +51,16 @@ sub open_channel {
}
sub _do {
- my $self = shift;
+ my $self = shift;
my $method = shift;
- my %args = @_;
+ my %args = @_;
- my $cv = AnyEvent->condvar;
- $args{on_success} = sub {$cv->send(1, @_);},
- $args{on_failure} = sub {$cv->send(0, @_);},
+ my $cb = Coro::rouse_cb;
+ $args{on_success} = sub {$cb->(1, @_);},
+ $args{on_failure} = sub {$cb->(0, @_);},
$self->{_ar}->$method(%args);
- my ($is_success, @responses) = $cv->recv;
+ my ($is_success, @responses) = Coro::rouse_wait;
die @responses if !$is_success;
return @responses;
}
@@ -78,9 +76,7 @@ Net::RabbitFoot - An Asynchronous and multi channel Perl AMQP client.
use Net::RabbitFoot;
- my $rf = Net::RabbitFoot->new()->load_xml_spec(
- '/path/to/amqp0-8.xml',
- )->connect(
+ my $rf = Net::RabbitFoot->new()->load_xml_spec()->connect(
host => 'localhost',
port => 5672,
user => 'guest',
View
12 lib/Net/RabbitFoot/Channel.pm
@@ -3,7 +3,9 @@ package Net::RabbitFoot::Channel;
use strict;
use warnings;
-use AnyEvent;
+use Coro;
+use Coro::AnyEvent;
+
use AnyEvent::RabbitMQ::Channel;
our $VERSION = '1.03';
@@ -21,12 +23,12 @@ BEGIN {
my $self = shift;
my %args = @_;
- my $cv = AnyEvent->condvar;
- $args{on_success} = sub {$cv->send(1, @_);},
- $args{on_failure} = sub {$cv->send(0, @_);},
+ my $cb = Coro::rouse_cb;
+ $args{on_success} = sub {$cb->(1, @_);},
+ $args{on_failure} = sub {$cb->(0, @_);},
$self->{arc}->$method(%args);
- my ($is_success, @responses) = $cv->recv;
+ my ($is_success, @responses) = Coro::rouse_wait;
die @responses if !$is_success;
return $responses[0];
};
View
2 lib/Net/RabbitFoot/Cmd/Role/Command.pm
@@ -9,7 +9,7 @@ requires qw(_run);
has spec => (
isa => 'Str',
is => 'rw',
- default => Net::RabbitFoot::default_amqp_spec(),
+ default => '',
metaclass => 'MooseX::Getopt::Meta::Attribute',
cmd_aliases => 's',
documentation => 'AMQP specification',
View
6 t/00_compile.t
@@ -1,5 +1,5 @@
use strict;
-use Test::More tests => 15;
+use Test::More tests => 12;
BEGIN {
use_ok 'Net::RabbitFoot';
@@ -15,8 +15,4 @@ BEGIN {
use_ok 'Net::RabbitFoot::Cmd::Command::purge_queue';
use_ok 'Net::RabbitFoot::Cmd::Command::declare_exchange';
use_ok 'Net::RabbitFoot::Cmd::Command::delete_exchange';
-
- use_ok 'AnyEvent::RabbitMQ';
- use_ok 'AnyEvent::RabbitMQ::Channel';
- use_ok 'AnyEvent::RabbitMQ::LocalQueue';
}
View
432 xt/04_anyevent.t
@@ -1,432 +0,0 @@
-use Test::More;
-use Test::Exception;
-
-use FindBin;
-use JSON::XS;
-use version;
-
-my $json_text;
-my %server = (
- product => undef,
- version => undef,
-);
-open my $fh, '<', $FindBin::Bin . '/../config.json' or die $!;
-{undef $/; $json_text = <$fh>;}
-close $fh;
-my $conf = decode_json($json_text);
-
-eval {
- use IO::Socket::INET;
-
- my $socket = IO::Socket::INET->new(
- Proto => 'tcp',
- PeerAddr => $conf->{host},
- PeerPort => $conf->{port},
- Timeout => 1,
- ) or die 'Error connecting to AMQP Server!';
-
- close $socket;
-};
-
-plan skip_all => 'Connection failure: '
- . $conf->{host} . ':' . $conf->{port} if $@;
-plan tests => 25;
-
-use Net::RabbitFoot ();
-use AnyEvent::RabbitMQ;
-
-my $ar = AnyEvent::RabbitMQ->new();
-
-lives_ok sub {
- $ar->load_xml_spec(Net::RabbitFoot::default_amqp_spec())
-}, 'load xml spec';
-
-my $done = AnyEvent->condvar;
-$ar->connect(
- (map {$_ => $conf->{$_}} qw(host port user pass vhost)),
- timeout => 1,
- on_success => sub {
- my $ar = shift;
- isa_ok($ar, 'AnyEvent::RabbitMQ');
- $server{product} = $ar->server_properties->{product};
- $server{version} = version->parse($ar->server_properties->{version});
- $done->send;
- },
- on_failure => failure_cb($done),
- on_close => sub {
- my $method_frame = shift->method_frame;
- die $method_frame->reply_code, $method_frame->reply_text;
- },
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-my $ch;
-$ar->open_channel(
- on_success => sub {
- $ch = shift;
- isa_ok($ch, 'AnyEvent::RabbitMQ::Channel');
- $done->send;
- },
- on_failure => failure_cb($done),
- on_close => sub {
- my $method_frame = shift->method_frame;
- die $method_frame->reply_code, $method_frame->reply_text;
- },
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->declare_exchange(
- exchange => 'test_x',
- on_success => sub {
- pass('declare exchange');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->declare_queue(
- queue => 'test_q',
- on_success => sub {
- pass('declare queue');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->bind_queue(
- queue => 'test_q',
- exchange => 'test_x',
- routing_key => 'test_r',
- on_success => sub {
- pass('bound queue');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-my $consumer_tag;
-$ch->consume(
- queue => 'test_q',
- on_success => sub {
- my $frame = shift;
- $consumer_tag = $frame->method_frame->consumer_tag;
- pass('consume');
- },
- on_consume => sub {
- my $response = shift;
- ok($response->{body}->payload, 'publish');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-publish($ch, 'Hello RabbitMQ.', $done,);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->cancel(
- consumer_tag => $consumer_tag,
- on_success => sub {
- pass('cancel');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-publish($ch, 'I love RabbitMQ.', $done,);
-$ch->get(
- queue => 'test_q',
- on_success => sub {
- my $response = shift;
- ok(defined $response->{ok}, 'getok');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->get(
- queue => 'test_q',
- on_success => sub {
- my $response = shift;
- ok(defined $response->{empty}, 'empty');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->consume(
- queue => 'test_q',
- no_ack => 0,
- on_consume => sub {
- my $response = shift;
- $ch->ack(
- delivery_tag => $response->{deliver}->method_frame->delivery_tag
- );
- pass('ack deliver');
-
- $ch->cancel(
- consumer_tag => $response->{deliver}->method_frame->consumer_tag,
- on_success => sub {
- pass('cancel');
- $done->send;
- },
- on_failure => failure_cb($done),
- );
- },
- on_failure => failure_cb($done),
-);
-publish($ch, 'NO RabbitMQ, NO LIFE.', $done,);
-$done->recv;
-
-$done = AnyEvent->condvar;
-publish($ch, 'RabbitMQ is cool.', $done,);
-$ch->get(
- queue => 'test_q',
- no_ack => 0,
- on_success => sub {
- my $response = shift;
- $ch->ack(
- delivery_tag => $response->{ok}->method_frame->delivery_tag
- );
- pass('ack get');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-my @responses;
-$ch->qos(
- prefetch_count => 2,
- on_success => sub {
- $ch->consume(
- queue => 'test_q',
- no_ack => 0,
- on_consume => sub {
- my $response = shift;
- push @responses, $response;
- return if 2 > scalar @responses;
- $done->send;
- },
- on_failure => failure_cb($done),
- );
- },
- on_failure => failure_cb($done),
-);
-publish($ch, 'RabbitMQ is excellent.', $done,);
-publish($ch, 'RabbitMQ is fantastic.', $done,);
-$done->recv;
-pass('qos');
-
-for my $response (@responses) {
- $ch->ack(
- delivery_tag => $response->{deliver}->method_frame->delivery_tag,
- );
-}
-
-$done = AnyEvent->condvar;
-$ch->cancel(
- consumer_tag => $responses[0]->{deliver}->method_frame->consumer_tag,
- on_success => sub {
- $ch->qos(
- on_success => sub {
- $done->send;
- },
- on_failure => failure_cb($done),
- );
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-my $recover_count = 0;
-$ch->consume(
- queue => 'test_q',
- no_ack => 0,
- on_consume => sub {
- my $response = shift;
-
- if (5 > ++$recover_count) {
- $ch->recover();
- return;
- }
-
- $ch->ack(
- delivery_tag => $response->{deliver}->method_frame->delivery_tag
- );
-
- $ch->cancel(
- consumer_tag => $response->{deliver}->method_frame->consumer_tag,
- on_success => sub {
- $done->send;
- },
- on_failure => failure_cb($done),
- );
- },
- on_failure => failure_cb($done),
-);
-publish($ch, 'RabbitMQ is powerful.', $done,);
-$done->recv;
-pass('recover');
-
-# This only works for RabbitMQ >= 2.0.0
-my $can_reject = $server{product} eq 'RabbitMQ' && $server{version} >= version->parse('2.0.0');
-SKIP: {
- skip 'We need RabbitMQ >= 2.0.0 for the reject test', 1 unless $can_reject;
- $done = AnyEvent->condvar;
- my $reject_count = 0;
- $ch->consume(
- queue => 'test_q',
- no_ack => 0,
- on_consume => sub {
- my $response = shift;
-
- if ( 5 > ++$reject_count ) {
- $ch->reject(
- delivery_tag => $response->{deliver}->method_frame->delivery_tag,
-
- # requeue! Else the server does not deliver the message again to this client.
- requeue => 1,
- );
- return;
- }
-
- $ch->ack( delivery_tag => $response->{deliver}->method_frame->delivery_tag );
-
- $ch->cancel(
- consumer_tag => $response->{deliver}->method_frame->consumer_tag,
- on_success => sub {
- $done->send;
- },
- on_failure => failure_cb($done),
- );
- },
- on_failure => failure_cb($done),
- );
- publish( $ch, 'RabbitMQ is powerful.', $done, );
- $done->recv;
- pass('reject');
-};
-
-$done = AnyEvent->condvar;
-$ch->select_tx(
- on_success => sub {
- pass('select tx');
- publish($ch, 'RabbitMQ is highly reliable systems.', $done,);
-
- $ch->rollback_tx(
- on_success => sub {
- pass('rollback tx');
- publish($ch, 'RabbitMQ is highly reliable systems.', $done,);
-
- $ch->commit_tx(
- on_success => sub {
- pass('commit tx');
- $done->send;
- },
- on_failure => failure_cb($done),
- );
- },
- on_failure => failure_cb($done),
- );
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->purge_queue(
- queue => 'test_q',
- on_success => sub {
- pass('purge queue');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->unbind_queue(
- queue => 'test_q',
- exchange => 'test_x',
- routing_key => 'test_r',
- on_success => sub {
- pass('unbind queue');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->delete_queue(
- queue => 'test_q',
- on_success => sub {
- pass('delete queue');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ch->delete_exchange(
- exchange => 'test_x',
- on_success => sub {
- pass('delete exchange');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-$done = AnyEvent->condvar;
-$ar->close(
- on_success => sub {
- pass('close');
- $done->send;
- },
- on_failure => failure_cb($done),
-);
-$done->recv;
-
-sub failure_cb {
- my ($cv,) = @_;
- return sub {
- fail(join(' ', 'on_failure:', @_));
- $cv->send;
- };
-}
-
-sub publish {
- my ($ch, $message, $cv,) = @_;
-
- $ch->publish(
- exchange => 'test_x',
- routing_key => 'test_r',
- body => $message,
- on_return => sub {
- my $response = shift;
- fail('on_return: ' . Dumper($response));
- $cv->send;
- },
- );
-
- return;
-}
-
View
49 xt/05_coro.t → xt/04_coro.t
@@ -3,6 +3,12 @@ use Test::Exception;
use FindBin;
use JSON::XS;
+use version;
+
+my %server = (
+ product => undef,
+ version => undef,
+);
my $json_text;
open my $fh, '<', $FindBin::Bin . '/../config.json' or die $!;
@@ -25,7 +31,7 @@ eval {
plan skip_all => 'Connection failure: '
. $conf->{host} . ':' . $conf->{port} if $@;
-plan tests => 23;
+plan tests => 24;
use Coro;
use Net::RabbitFoot;
@@ -33,7 +39,7 @@ use Net::RabbitFoot;
my $rf = Net::RabbitFoot->new();
lives_ok sub {
- $rf->load_xml_spec(Net::RabbitFoot::default_amqp_spec())
+ $rf->load_xml_spec()
}, 'load xml spec';
lives_ok sub {
@@ -41,6 +47,8 @@ lives_ok sub {
(map {$_ => $conf->{$_}} qw(host port user pass vhost)),
timeout => 1,
);
+ $server{product} = $rf->server_properties->{product};
+ $server{version} = version->parse($rf->server_properties->{version});
}, 'connect';
my $ch;
@@ -188,6 +196,43 @@ lives_ok sub {
$ch->cancel(consumer_tag => $frame->method_frame->consumer_tag);
}, 'recover';
+# This only works for RabbitMQ >= 2.0.0
+my $can_reject = $server{product} eq 'RabbitMQ' && $server{version} >= version->parse('2.0.0');
+SKIP: {
+ skip 'We need RabbitMQ >= 2.0.0 for the reject test', 1 unless $can_reject;
+ lives_ok sub {
+ my $reject_count = 0;
+ $done = 0;
+ $frame = $ch->consume(
+ queue => 'test_q',
+ no_ack => 0,
+ on_consume => unblock_sub {
+ my $response = shift;
+
+ if (5 > ++$reject_count) {
+ $ch->reject(
+ delivery_tag => $response->{deliver}->method_frame->delivery_tag,
+ # requeue! Else the server does not deliver the message again to this client.
+ requeue => 1,
+ );
+ return;
+ }
+
+ $ch->ack(
+ delivery_tag => $response->{deliver}->method_frame->delivery_tag
+ );
+
+ $done = 1;
+ $main->ready;
+ schedule;
+ }
+ );
+ publish($ch, 'RabbitMQ is powerful.');
+ schedule while !$done;
+ $ch->cancel(consumer_tag => $frame->method_frame->consumer_tag);
+ }, 'reject';
+};
+
lives_ok sub {
$ch->select_tx();
}, 'select_tx';
View
4 xt/06_multi_channel.t → xt/05_multi_channel.t
@@ -30,9 +30,7 @@ plan tests => 6;
use Coro;
use Net::RabbitFoot;
-my $rf = Net::RabbitFoot->new()->load_xml_spec(
- Net::RabbitFoot::default_amqp_spec()
-)->connect(
+my $rf = Net::RabbitFoot->new()->load_xml_spec()->connect(
(map {$_ => $conf->{$_}} qw(host port user pass vhost)),
timeout => 1,
);

0 comments on commit bff751a

Please sign in to comment.
Something went wrong with that request. Please try again.