Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added methods for get and ack.

  • Loading branch information...
commit e9ba12bb1f95fab6ffa85bac750dad490f30baf5 1 parent 23e94c5
@cooldaemon authored
View
1  .gitignore
@@ -5,4 +5,5 @@ blib
inc
pm_to_blib
MANIFEST
+MANIFEST.bak
Makefile.old
View
2  MANIFEST.SKIP
@@ -14,6 +14,8 @@
^t/perlcritic
^tools/
\.svn/
+\.git/
^[^/]+\.yaml$
^[^/]+\.pl$
^\.shipit$
+^\.gitignore$
View
13 README
@@ -1,11 +1,16 @@
This is Perl module RabbitFoot.
-INSTALLATION
+RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a synchronous fashion.
+
+You can use RabbitFoot to -
-RabbitFoot installation is straightforward. If your CPAN shell is set up,
-you should just be able to do
+ * Declare and delete exchanges
+ * Declare, delete and bind queues
+ * Publish, consume, get and ack messages
- % cpan RabbitFoot
+RabbitFoot is known to work with RabbitMQ versions 1.7.0 and version 0-8 of the AMQP specification.
+
+INSTALLATION
Download it, unpack it, then build it as per the usual:
View
54 lib/RabbitFoot.pm
@@ -443,17 +443,55 @@ sub cancel {
}
sub poll {
- my ($self, $args) = @_;
+ my ($self, $args,) = @_;
my $timeout = $args && $args->{timeout} ? $args->{timeout} : 'infinite';
- $self->_read_and_valid('Basic::Deliver', $timeout);
+ return {
+ deliver => $self->_read_and_valid('Basic::Deliver', $timeout),
+ header => $self->_read_header_and_valid(),
+ body => $self->_read_body_and_valid(),
+ };
+}
+
+sub get {
+ my ($self, $args,) = @_;
+
+ my $frame = $self->_post_and_read(
+ 'Basic::Get',
+ {
+ no_ack => 1,
+ %$args, # queue
+ ticket => 0,
+ },
+ [qw(Basic::GetOk Basic::GetEmpty)],
+ 1,
+ );
+
+ return $frame
+ if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
return {
+ getok => $frame,
header => $self->_read_header_and_valid(),
body => $self->_read_body_and_valid(),
};
}
+sub ack {
+ my ($self, $args,) = @_;
+
+ $self->_post(
+ Net::AMQP::Protocol::Basic::Ack->new(
+ delivery_tag => 0,
+ multiple => (
+ defined $args->{delivery_tag} && $args->{delivery_tag} != 0 ? 0 : 1
+ ),
+ %$args,
+ ),
+ 1,
+ );
+}
+
sub _post_and_read {
my ($self, $method, $args, $exp, $id,) = @_;
@@ -469,17 +507,21 @@ sub _post_and_read {
sub _read_and_valid {
my ($self, $exp, $timeout,) = @_;
+ $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
my ($frame) = $self->_read($timeout);
die 'Received data is not method frame', "\n"
if !$frame->isa('Net::AMQP::Frame::Method');
my $method_frame = $frame->method_frame;
- return $frame if $method_frame->isa('Net::AMQP::Protocol::' . $exp);
+ for my $exp_elem (@$exp) {
+ return $frame if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
+ }
$self->_check_close_and_clean($frame);
- die 'Method is not ', $exp, "\n", 'Method was ', ref $method_frame, "\n"
- if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');
+ die 'Method is not ', join(',', @$exp), "\n",
+ 'Method was ', ref $method_frame, "\n"
+ if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');
}
sub _read_header_and_valid {
@@ -639,7 +681,7 @@ You can use RabbitFoot to -
* Declare and delete exchanges
* Declare, delete and bind queues
- * Publish and consume messages
+ * Publish, consume, get and ack messages
RabbitFoot is known to work with RabbitMQ versions 1.7.0 and version 0-8 of the AMQP specification.
View
61 xt/04_use_server.t
@@ -21,7 +21,7 @@ eval {
plan skip_all => 'Connection failure: '
. $conf->{host} . ':' . $conf->{port} if $@;
-plan tests => 13;
+plan tests => 17;
use RabbitFoot;
@@ -52,15 +52,8 @@ lives_ok sub {
}, 'bind_queue';
lives_ok sub {
- $rf->publish(
- {
- exchange => 'test_x',
- routing_key => 'test_r',
- },
- {},
- 'Hello RabbitMQ.'
- );
-}, 'publish';
+ publish($rf, 'Hello RabbitMQ.');
+}, 'publish for consume';
lives_ok sub {
$rf->consume({queue => 'test_q'});
@@ -75,6 +68,39 @@ lives_ok sub {
}, 'cancel';
lives_ok sub {
+ publish($rf, 'I love RabbitMQ.');
+ $rf->get({queue => 'test_q'});
+}, 'get';
+
+lives_ok sub {
+ $rf->get({queue => 'test_q'});
+}, 'get empty';
+
+lives_ok sub {
+ publish($rf, 'NO RabbitMQ, NO LIFE.');
+ $rf->consume({
+ queue => 'test_q',
+ no_ack => 0,
+ });
+ my $response = $rf->poll({timeout => 1});
+ $rf->ack({
+ delivery_tag => $response->{deliver}->method_frame->delivery_tag,
+ });
+ $rf->cancel();
+}, 'ack deliver';
+
+lives_ok sub {
+ publish($rf, 'RabbitMQ is cool.');
+ my $response = $rf->get({
+ queue => 'test_q',
+ no_ack => 0,
+ });
+ $rf->ack({
+ delivery_tag => $response->{getok}->method_frame->delivery_tag,
+ });
+}, 'ack get';
+
+lives_ok sub {
$rf->purge_queue({queue => 'test_q'});
}, 'purge_queue';
@@ -90,3 +116,18 @@ lives_ok sub {
$rf->close();
}, 'close';
+sub publish {
+ my ($rf, $message,) = @_;
+
+ $rf->publish(
+ {
+ exchange => 'test_x',
+ routing_key => 'test_r',
+ },
+ {},
+ $message,
+ );
+
+ return;
+}
+
Please sign in to comment.
Something went wrong with that request. Please try again.