Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Preparing for release on CPAN

  • Loading branch information...
commit c0df2b65746850379f409160d1c7bcea44263c79 1 parent ec72009
Eric Waters authored
22 Build.PL
... ... @@ -0,0 +1,22 @@
  1 +use Module::Build;
  2 +
  3 +my $build = Module::Build->new(
  4 + module_name => 'POE::Component::Client::AMQP',
  5 + license => 'perl',
  6 + build_requires => {
  7 + 'Test::More' => 0,
  8 + 'Test::Deep' => 0,
  9 + },
  10 + requires => {
  11 + 'Params::Validate' => 0,
  12 + 'Class::Accessor' => 0,
  13 + 'POE' => 0,
  14 + 'POE::Component::Client::TCP' => 0,
  15 + 'Net::AMQP' => 0,
  16 + },
  17 + sign => 1,
  18 + create_makefile_pl => 'passthrough',
  19 + create_readme => 1,
  20 +);
  21 +
  22 +$build->create_build_script;
14 MANIFEST
... ... @@ -0,0 +1,14 @@
  1 +Build.PL
  2 +examples/ack.pl
  3 +examples/examples.pm
  4 +examples/pingpong.pl
  5 +examples/README
  6 +lib/POE/Component/Client/AMQP.pm
  7 +lib/POE/Component/Client/AMQP/Channel.pm
  8 +lib/POE/Component/Client/AMQP/Queue.pm
  9 +LICENSE
  10 +Makefile.PL
  11 +MANIFEST This list of files
  12 +META.yml
  13 +README
  14 +t/01_basic.t
48 MANIFEST.SKIP
... ... @@ -0,0 +1,48 @@
  1 +# Avoid version control files.
  2 +\bRCS\b
  3 +\bCVS\b
  4 +,v$
  5 +\B\.svn\b
  6 +\B\.git\b
  7 +\B\.cvsignore$
  8 +
  9 +# Avoid Makemaker generated and utility files.
  10 +\bMakefile$
  11 +\bblib
  12 +\bMakeMaker-\d
  13 +\bpm_to_blib$
  14 +\bblibdirs$
  15 +^MANIFEST\.SKIP$
  16 +
  17 +# Avoid VMS specific Makmaker generated files
  18 +\bDescrip.MMS$
  19 +\bDESCRIP.MMS$
  20 +\bdescrip.mms$
  21 +
  22 +# Avoid Module::Build generated and utility files.
  23 +\bBuild$
  24 +\bBuild.bat$
  25 +\b_build
  26 +\bBuild.COM$
  27 +\bBUILD.COM$
  28 +\bbuild.com$
  29 +
  30 +# Avoid Devel::Cover generated files
  31 +\bcover_db
  32 +
  33 +# Avoid temp and backup files.
  34 +~$
  35 +\.tmp$
  36 +\.old$
  37 +\.bak$
  38 +\#$
  39 +\.#
  40 +\.rej$
  41 +
  42 +# Avoid OS-specific files/dirs
  43 +# Mac OSX metadata
  44 +\B\.DS_Store
  45 +# Mac OSX SMB mount metadata files
  46 +\B\._
  47 +# Avoid archives of this distribution
  48 +\bPOE-Component-Client-AMQP-[\d\.\_]+
34 META.yml
... ... @@ -0,0 +1,34 @@
  1 +---
  2 +name: POE-Component-Client-AMQP
  3 +version: 0.01
  4 +author:
  5 + - 'Eric Waters <ewaters@gmail.com>'
  6 +abstract: Asynchronous AMQP client implementation in POE
  7 +license: perl
  8 +resources:
  9 + license: http://dev.perl.org/licenses/
  10 +requires:
  11 + Class::Accessor: 0
  12 + Net::AMQP: 0
  13 + POE: 0
  14 + POE::Component::Client::TCP: 0
  15 + Params::Validate: 0
  16 +build_requires:
  17 + Test::Deep: 0
  18 + Test::More: 0
  19 +provides:
  20 + POE::Component::Client::AMQP:
  21 + file: lib/POE/Component/Client/AMQP.pm
  22 + version: 0.01
  23 + POE::Component::Client::AMQP::Channel:
  24 + file: lib/POE/Component/Client/AMQP/Channel.pm
  25 + version: 0.01
  26 + POE::Component::Client::AMQP::FakeLogger:
  27 + file: lib/POE/Component/Client/AMQP.pm
  28 + POE::Component::Client::AMQP::Queue:
  29 + file: lib/POE/Component/Client/AMQP/Queue.pm
  30 + version: 0.01
  31 +generated_by: Module::Build version 0.33
  32 +meta-spec:
  33 + url: http://module-build.sourceforge.net/META-spec-v1.4.html
  34 + version: 1.4
34 Makefile.PL
... ... @@ -0,0 +1,34 @@
  1 +# Note: this file was auto-generated by Module::Build::Compat version 0.33
  2 +
  3 + unless (eval "use Module::Build::Compat 0.02; 1" ) {
  4 + print "This module requires Module::Build to install itself.\n";
  5 +
  6 + require ExtUtils::MakeMaker;
  7 + my $yn = ExtUtils::MakeMaker::prompt
  8 + (' Install Module::Build now from CPAN?', 'y');
  9 +
  10 + unless ($yn =~ /^y/i) {
  11 + die " *** Cannot install without Module::Build. Exiting ...\n";
  12 + }
  13 +
  14 + require Cwd;
  15 + require File::Spec;
  16 + require CPAN;
  17 +
  18 + # Save this 'cause CPAN will chdir all over the place.
  19 + my $cwd = Cwd::cwd();
  20 +
  21 + CPAN::Shell->install('Module::Build::Compat');
  22 + CPAN::Shell->expand("Module", "Module::Build::Compat")->uptodate
  23 + or die "Couldn't install Module::Build, giving up.\n";
  24 +
  25 + chdir $cwd or die "Cannot chdir() back to $cwd: $!";
  26 + }
  27 + eval "use Module::Build::Compat 0.02; 1" or die $@;
  28 +
  29 + Module::Build::Compat->run_build_pl(args => \@ARGV);
  30 + my $build_script = 'Build';
  31 + $build_script .= '.com' if $^O eq 'VMS';
  32 + exit(0) unless(-e $build_script); # cpantesters convention
  33 + require Module::Build;
  34 + Module::Build::Compat->write_makefile(build_class => 'Module::Build');
141 README
... ... @@ -1 +1,140 @@
1   -Placeholder
  1 +NAME
  2 + POE::Component::Client::AMQP - Asynchronous AMQP client implementation
  3 + in POE
  4 +
  5 +SYNOPSIS
  6 + use POE::Component::Client::AMQP;
  7 +
  8 + Net::AMQP::Protocol->load_xml_spec('amqp0-8.xml');
  9 +
  10 + my $amq = Component::Client::AMQP->create(
  11 + RemoteAddress => 'mq.domain.tld',
  12 + );
  13 +
  14 + $amq->channel(1)->queue('frank')->subscribe(sub {
  15 + my ($payload, $meta) = @_;
  16 +
  17 + my $reply_to = $meta->{header_frame}->reply_to;
  18 +
  19 + $amq->channel(1)->queue($reply_to)->publish("Message received");
  20 + });
  21 +
  22 + $amq->run();
  23 +
  24 +DESCRIPTION
  25 + This module implements the Advanced Message Queue Protocol (AMQP) TCP/IP
  26 + client. It's goal is to provide users with a quick and easy way of using
  27 + AMQP while at the same time exposing the advanced functionality of the
  28 + protocol if needed.
  29 +
  30 + The (de)serialization and representation logic is handled by Net::AMQP,
  31 + which needs to be setup (via load_xml_spec()) prior to this client
  32 + software running. Please see the docs there for further information on
  33 + this.
  34 +
  35 +USAGE
  36 + create (...)
  37 + Create a new AMQP client. Arguments to this method:
  38 +
  39 + *RemoteAddress* (default: 127.0.0.1)
  40 + Connect to this host
  41 +
  42 + *RemotePort* (default: 5672)
  43 + *Username* (default: guest)
  44 + *Password* (default: guest)
  45 + *VirtualHost* (default: /)
  46 + *Logger* (default: simple screen logger)
  47 + Provide an object which implements 'debug', 'info' and 'error'
  48 + logging methods (such as Log::Log4perl).
  49 +
  50 + *Debug*
  51 + This module provides extensive debugging options. These are
  52 + specified as a hash as follows:
  53 +
  54 + *logic* (boolean)
  55 + Display decisions the code is making
  56 +
  57 + *frame_input* (boolean)
  58 + *frame_output* (boolean)
  59 + Use the *frame_dumper* code to display frames that come in
  60 + from or out to the server.
  61 +
  62 + *frame_dumper* (coderef)
  63 + A coderef which, given a Net::AMQP::Frame object, will
  64 + return a string representation of it, prefixed with "\n".
  65 +
  66 + *raw_input* (boolean)
  67 + *raw_output* (boolean)
  68 + Use the *raw_dumper* code to display raw data that comes in
  69 + from or out to the server.
  70 +
  71 + *raw_dumper* (coderef)
  72 + A coderef which, given a raw string, will return a byte
  73 + representation of it, prefixed with "\n".
  74 +
  75 + *Alias* (default: amqp_client)
  76 + The POE session alias of the main client session
  77 +
  78 + *AliasTCP* (default: tcp_client)
  79 + The POE session alias of the TCP client
  80 +
  81 + *Callbacks* (default: {})
  82 + Provide callbacks. At the moment, 'Startup' is the only
  83 + recognized callback.
  84 +
  85 + *is_testing*
  86 + Set to '1' to avoid creating POE::Sessions (mainly useful in t/
  87 + scripts)
  88 +
  89 + Returns a class object.
  90 +
  91 +CLASS METHODS
  92 + do_when_startup (...)
  93 + Pass a subref that should be executed after the client has connected
  94 + and authenticated with the remote AMQP server. If the client is
  95 + already connected and authenticated, the subref will be called
  96 + immediately. Think: deferred.
  97 +
  98 + channel ($id)
  99 + Call with an optional argument $id (1 - 65536). Returns a
  100 + POE::Component::Client::AMQP::Channel object which can be used
  101 + immediately.
  102 +
  103 + run ()
  104 + Shortcut to calling $poe_kernel->run
  105 +
  106 + stop ()
  107 + Shortcut to calling the POE state 'disconnect'
  108 +
  109 +POE STATES
  110 + The following are states you can post to to interact with the client.
  111 + Use the alias defined in the "create()" call above.
  112 +
  113 + server_disconnect
  114 + Send a Connection.Close request
  115 +
  116 + server_send (@output)
  117 + Pass one or more Net::AMQP::Frame objects. For short hand, you may
  118 + pass Net::AMQP::Protocol::Base objects, which will be automatically
  119 + wrapped in the appropriate frame type, with channel 0. These frames
  120 + will be written to the server. In the case of
  121 + Net::AMQP::Frame::Method objects which are calling a synchronous
  122 + method, the client will handle them one at a time, waiting until a
  123 + synchronous method returns properly before sending further
  124 + synchronous frames. This happens automatically.
  125 +
  126 +SEE ALSO
  127 + POE, Net::AMQP
  128 +
  129 +COPYRIGHT
  130 + Copyright (c) 2009 Eric Waters and XMission LLC
  131 + (http://www.xmission.com/). All rights reserved. This program is free
  132 + software; you can redistribute it and/or modify it under the same terms
  133 + as Perl itself.
  134 +
  135 + The full text of the license can be found in the LICENSE file included
  136 + with this module.
  137 +
  138 +AUTHOR
  139 + Eric Waters <ewaters@gmail.com>
  140 +
18 examples/README
... ... @@ -0,0 +1,18 @@
  1 +These examples are perl versions of some examples found in the Ruby/EventMachine
  2 +AMQP implementation (http://github.com/tmm1/amqp). To run them, first start up
  3 +an AMQP server, such as RabbitMQ:
  4 +
  5 + hg clone http://hg.rabbitmq.com/rabbitmq-codegen
  6 + hg clone http://hg.rabbitmq.com/rabbitmq-server
  7 + cd rabbitmq-server
  8 + make run
  9 +
  10 +cd into this directory and type the example name:
  11 +
  12 + ./pingpong.pl # 1-1 communication with amq.direct
  13 + ./ack.pl # using ack
  14 +
  15 +By default they require YAML::XS and Term::ANSIColor, and display colored debug
  16 +output of the operations of the program. To turn this off, prefix the program
  17 +with DEBUG=0. You can edit 'examples.pm' to customize what debug output you'd
  18 +like to see.
62 examples/ack.pl
... ... @@ -0,0 +1,62 @@
  1 +#!/usr/bin/perl
  2 +
  3 +use strict;
  4 +use warnings;
  5 +use examples;
  6 +
  7 +# Create a queue, 'awesome', that will exist after this program ends
  8 +my $queue = $channel->queue(
  9 + 'awesome',
  10 + {
  11 + auto_delete => 0, # will remain after all consumers part
  12 + exclusive => 0, # not limited to just this connection
  13 + },
  14 +);
  15 +
  16 +# Publish a few things
  17 +$queue->publish('Totally rad 1');
  18 +$queue->publish('Totally rad 2');
  19 +$queue->publish('Totally rad 3');
  20 +
  21 +my $i = 0;
  22 +
  23 +$queue->subscribe(
  24 + sub {
  25 + my ($message, $meta) = @_;
  26 +
  27 + if (++$i == 3) {
  28 + $amq->Logger->info("Shutting down...");
  29 + $amq->stop();
  30 + }
  31 +
  32 + if ($amq->is_stopping) {
  33 + $amq->Logger->info("Got $message (ignored, redelivered later)");
  34 + return 0; # don't ack it
  35 + }
  36 + else {
  37 + $amq->Logger->info("Got $message");
  38 + return 1; # ack it
  39 + }
  40 + },
  41 + {
  42 + no_ack => 0,
  43 + }
  44 +);
  45 +
  46 +$amq->run();
  47 +
  48 +__DATA__
  49 +
  50 +Got Totally rad 1
  51 +Got Totally rad 2
  52 +Shutting down...
  53 +Got Totally rad 3 (ignored, redelivered later)
  54 +
  55 + and upon a second run:
  56 +
  57 +Got Totally rad 3
  58 +Got Totally rad 1
  59 +Shutting down...
  60 +Got Totally rad 2 (ignored, redelivered later)
  61 +Got Totally rad 3 (ignored, redelivered later)
  62 +
50 examples/examples.pm
... ... @@ -0,0 +1,50 @@
  1 +package examples;
  2 +
  3 +use strict;
  4 +use warnings;
  5 +use FindBin;
  6 +use lib (
  7 + $FindBin::Bin . '/../lib',
  8 + $FindBin::Bin . '/../../net-amqp/lib',
  9 +);
  10 +use POE qw(Component::Client::AMQP);
  11 +use base qw(Exporter);
  12 +
  13 +our @EXPORT = qw($amq $channel);
  14 +
  15 +# Libraries for the dumper() calls
  16 +use YAML::XS;
  17 +use Net::AMQP::Common qw(show_ascii);
  18 +use Term::ANSIColor qw(:constants);
  19 +
  20 +my $debug = $ENV{DEBUG} || 1;
  21 +
  22 +Net::AMQP::Protocol->load_xml_spec($ARGV[0] || $FindBin::Bin . '/../../net-amqp/spec/amqp0-8.xml');
  23 +
  24 +our $amq = POE::Component::Client::AMQP->create(
  25 + RemoteAddress => '127.0.0.1',
  26 +
  27 + ($debug ? (
  28 + Debug => {
  29 + logic => 1,
  30 +
  31 + frame_input => 1,
  32 + frame_output => 1,
  33 + frame_dumper => sub {
  34 + my $output = YAML::XS::Dump(shift);
  35 + chomp $output;
  36 + return "\n" . BLUE . $output . RESET;
  37 + },
  38 +
  39 + raw_input => 1,
  40 + raw_output => 1,
  41 + raw_dumper => sub {
  42 + my $raw = shift;
  43 + my $output = "raw [".length($raw)."]: ".show_ascii($raw);
  44 + return "\n" . YELLOW . $output . RESET;
  45 + },
  46 + },
  47 + ) : ()),
  48 +);
  49 +
  50 +our $channel = $amq->channel();
40 examples/pingpong.pl
@@ -2,44 +2,8 @@
2 2
3 3 use strict;
4 4 use warnings;
5   -use POE qw(Component::Client::AMQP);
6   -
7   -# Libraries for the dumper() calls
8   -use YAML::XS;
9   -use Net::AMQP::Common qw(show_ascii);
10   -use Term::ANSIColor qw(:constants);
11   -
12   -my $debug = $ENV{DEBUG};
13   -
14   -Net::AMQP::Protocol->load_xml_spec($ARGV[0]);
15   -
16   -my $amq = POE::Component::Client::AMQP->create(
17   - RemoteAddress => '127.0.0.1',
18   -
19   - ($debug ? (
20   - Debug => {
21   - logic => 1,
22   -
23   - frame_input => 1,
24   - frame_output => 1,
25   - frame_dumper => sub {
26   - my $output = YAML::XS::Dump(shift);
27   - chomp $output;
28   - return "\n" . BLUE . $output . RESET;
29   - },
30   -
31   - raw_input => 1,
32   - raw_output => 1,
33   - raw_dumper => sub {
34   - my $raw = shift;
35   - my $output = "raw [".length($raw)."]: ".show_ascii($raw);
36   - return "\n" . YELLOW . $output . RESET;
37   - },
38   - },
39   - ) : ()),
40   -);
41   -
42   -my $channel = $amq->channel();
  5 +use examples;
  6 +use POE;
43 7
44 8 $channel->queue('one')->subscribe(sub {
45 9 my $msg = shift;
63 lib/POE/Component/Client/AMQP.pm
@@ -43,13 +43,12 @@ use POE qw(
43 43 use Params::Validate qw(validate_with);
44 44 use Net::AMQP;
45 45 use Net::AMQP::Common qw(:all);
46   -use Scalar::Util qw(blessed);
47 46 use Carp;
48 47
49 48 use base qw(Class::Accessor);
50   -__PACKAGE__->mk_accessors(qw(Logger));
  49 +__PACKAGE__->mk_accessors(qw(Logger is_stopped is_started is_stopping));
51 50
52   -our $VERSION = 0.1;
  51 +our $VERSION = 0.01;
53 52
54 53 =head1 USAGE
55 54
@@ -61,7 +60,7 @@ Create a new AMQP client. Arguments to this method:
61 60
62 61 =over 4
63 62
64   -=item I<RemoteAddress> (required)
  63 +=item I<RemoteAddress> (default: 127.0.0.1)
65 64
66 65 Connect to this host
67 66
@@ -121,6 +120,10 @@ The POE session alias of the TCP client
121 120
122 121 Provide callbacks. At the moment, 'Startup' is the only recognized callback.
123 122
  123 +=item I<is_testing>
  124 +
  125 +Set to '1' to avoid creating POE::Sessions (mainly useful in t/ scripts)
  126 +
124 127 =back
125 128
126 129 Returns a class object.
@@ -135,7 +138,7 @@ sub create {
135 138 my %self = validate_with(
136 139 params => \@_,
137 140 spec => {
138   - RemoteAddress => 1,
  141 + RemoteAddress => { default => '127.0.0.1' },
139 142 RemotePort => { default => 5672 },
140 143 Username => { default => 'guest' },
141 144 Password => { default => 'guest' },
@@ -150,6 +153,7 @@ sub create {
150 153
151 154 channels => { default => {} },
152 155 is_started => { default => 0 },
  156 + is_testing => { default => 0 },
153 157 },
154 158 allow_extra => 1,
155 159 );
@@ -185,18 +189,20 @@ sub create {
185 189 _start
186 190 server_send
187 191 server_connected
  192 + server_disconnect
188 193 )],
189 194 ],
190   - );
  195 + ) unless $self->{is_testing};
191 196
192 197 POE::Component::Client::TCP->new(
193 198 Alias => $self->{AliasTCP},
194 199 RemoteAddress => $self->{RemoteAddress},
195 200 RemotePort => $self->{RemotePort},
196 201 Connected => sub { $self->tcp_connected(@_) },
  202 + Disconnected => sub { $self->Logger->info("TCP connection is disconnected") },
197 203 ServerInput => sub { $self->tcp_server_input(@_) },
198 204 Filter => 'POE::Filter::Stream',
199   - );
  205 + ) unless $self->{is_testing};
200 206
201 207 return $self;
202 208 }
@@ -267,6 +273,21 @@ sub run {
267 273 $poe_kernel->run();
268 274 }
269 275
  276 +=head2 stop ()
  277 +
  278 +=over 4
  279 +
  280 +Shortcut to calling the POE state 'disconnect'
  281 +
  282 +=back
  283 +
  284 +=cut
  285 +
  286 +sub stop {
  287 + my $self = shift;
  288 + $poe_kernel->call($self->{Alias}, 'server_disconnect');
  289 +}
  290 +
270 291 =head1 POE STATES
271 292
272 293 The following are states you can post to to interact with the client. Use the alias defined in the C<create()> call above.
@@ -279,6 +300,32 @@ sub _start {
279 300 $kernel->alias_set($self->{Alias});
280 301 }
281 302
  303 +=head2 server_disconnect
  304 +
  305 +=over 4
  306 +
  307 +Send a Connection.Close request
  308 +
  309 +=back
  310 +
  311 +=cut
  312 +
  313 +sub server_disconnect {
  314 + my ($self, $kernel) = @_[OBJECT, KERNEL];
  315 +
  316 + $self->{is_stopping} = 1;
  317 +
  318 + $kernel->yield(server_send =>
  319 + Net::AMQP::Frame::Method->new(
  320 + synchronous_callback => sub {
  321 + $self->{is_stopped} = 1;
  322 + $self->{is_started} = 0;
  323 + },
  324 + method_frame => Net::AMQP::Protocol::Connection::Close->new(),
  325 + )
  326 + );
  327 +}
  328 +
282 329 sub server_connected {
283 330 my ($self, $kernel) = @_[OBJECT, KERNEL];
284 331
@@ -308,7 +355,7 @@ sub server_send {
308 355 my ($self, $kernel, @output) = @_[OBJECT, KERNEL, ARG0 .. $#_];
309 356
310 357 while (my $output = shift @output) {
311   - if (! defined $output || ! ref $output || ! blessed $output) {
  358 + if (! defined $output || ! ref $output) {
312 359 $self->{Logger}->error("Server send called with invalid output (".(defined $output ? $output : 'undef').")");
313 360 next;
314 361 }
22 lib/POE/Component/Client/AMQP/Channel.pm
@@ -14,11 +14,12 @@ use strict;
14 14 use warnings;
15 15 use POE;
16 16 use Params::Validate;
17   -use Data::Dumper;
18 17 use Carp;
19 18 use base qw(Class::Accessor);
20 19 __PACKAGE__->mk_accessors(qw(id server Alias));
21 20
  21 +our $VERSION = 0.01;
  22 +
22 23 =head1 CLASS METHODS
23 24
24 25 =head2 create (...)
@@ -163,7 +164,7 @@ sub do_when_created {
163 164
164 165 ### Deferred methods ###
165 166
166   -=head2 queue ($name, %opts)
  167 +=head2 queue ($name, \%opts)
167 168
168 169 =over 4
169 170
@@ -176,7 +177,8 @@ If you pass %opts, the values you pass will override defaults in the L<Net::AMQP
176 177 =cut
177 178
178 179 sub queue {
179   - my ($self, $name, %user_opts) = @_;
  180 + my ($self, $name, $user_opts) = @_;
  181 + $user_opts ||= {};
180 182
181 183 if (defined $name && $self->{queues}{$name}) {
182 184 return $self->{queues}{$name};
@@ -193,7 +195,7 @@ sub queue {
193 195 auto_delete => 1, # queue is deleted after the last consumer
194 196 #nowait => 0, # do not send a DeclareOk response
195 197 #arguments => {},
196   - %user_opts,
  198 + %$user_opts,
197 199 );
198 200
199 201 # TODO: if user sets $opts{nowait}, we can't do the synchronous_callback below
@@ -319,7 +321,17 @@ sub server_input {
319 321 $content_meta->{$_} = $consumer_data->{$_} foreach qw(queue opts);
320 322
321 323 # Let the consumer know via the recorded callback
322   - $consumer_data->{callback}($content_meta->{payload}, $content_meta);
  324 + my $callback_return = $consumer_data->{callback}($content_meta->{payload}, $content_meta);
  325 +
  326 + # The return value is normally ignored unless the Consume call had 'no_ack => 0',
  327 + # in which case a 'true' response from the callback will automatically ack
  328 + if ($callback_return && ! $consumer_data->{opts}{no_ack}) {
  329 + $kernel->call($self->{Alias}, server_send =>
  330 + Net::AMQP::Protocol::Basic::Ack->new(
  331 + delivery_tag => $content_meta->{method_frame}->delivery_tag
  332 + )
  333 + );
  334 + }
323 335 }
324 336 }
325 337 }
26 lib/POE/Component/Client/AMQP/Queue.pm
@@ -18,6 +18,8 @@ use Params::Validate qw(validate_with);
18 18 use base qw(Class::Accessor);
19 19 __PACKAGE__->mk_accessors(qw(name channel is_created));
20 20
  21 +our $VERSION = 0.01;
  22 +
21 23 =head1 CLASS METHODS
22 24
23 25 =head2 create (...)
@@ -104,7 +106,7 @@ sub created {
104 106 }
105 107 }
106 108
107   -=head2 subscribe ($subref, %opts)
  109 +=head2 subscribe ($subref, \%opts)
108 110
109 111 =over 4
110 112
@@ -114,10 +116,14 @@ Optionally provide %opts which will override defaults for the Basic.Consume call
114 116
115 117 The argument signature of the callback is like so:
116 118
117   - $subref->($message, $meta)
  119 + my $do_ack = $subref->($message, $meta)
118 120
119 121 =over 4
120 122
  123 +=item I<$do_ack>
  124 +
  125 +If in the %opts hash you choose 'no_ack => 0', then messages have to be explicitly ack'ed once handled. If your callback returns true in this condition, an ack message will automatically be sent for you.
  126 +
121 127 =item I<$message>
122 128
123 129 Opaque payload of the content body.
@@ -165,7 +171,8 @@ The options used to create the Basic.Consume call (merge of default values and %
165 171 =cut
166 172
167 173 sub subscribe {
168   - my ($self, $callback, %user_opts) = @_;
  174 + my ($self, $callback, $user_opts) = @_;
  175 + $user_opts ||= {};
169 176
170 177 my %opts = (
171 178 ticket => 0,
@@ -175,7 +182,7 @@ sub subscribe {
175 182 no_ack => 1,
176 183 #exclusive => 0,
177 184 #nowait => 0, # do not send the ConsumeOk response
178   - %user_opts,
  185 + %$user_opts,
179 186 );
180 187
181 188 # TODO: if user sets $opts{nowait}, we can't do the synchronous_callback or even know the consumer_tag.
@@ -202,7 +209,7 @@ sub subscribe {
202 209 });
203 210 }
204 211
205   -=head2 publish ($message, %opts)
  212 +=head2 publish ($message, \%opts)
206 213
207 214 =over 4
208 215
@@ -215,7 +222,8 @@ Optionally pass %opts, which can override any option in the L<Net::AMQP::Protoco
215 222 =cut
216 223
217 224 sub publish {
218   - my ($self, $message, %user_opts) = @_;
  225 + my ($self, $message, $user_opts) = @_;
  226 + $user_opts ||= {};
219 227
220 228 my %method_opts = (
221 229 ticket => 0,
@@ -223,7 +231,7 @@ sub publish {
223 231 routing_key => $self->{name}, # route to my queue
224 232 mandatory => 1,
225 233 #immediate => 0,
226   - %user_opts,
  234 + %$user_opts,
227 235 );
228 236
229 237 my %content_opts = (
@@ -241,14 +249,14 @@ sub publish {
241 249 #user_id => '',
242 250 #app_id => '',
243 251 #cluster_id => '',
244   - %user_opts,
  252 + %$user_opts,
245 253 );
246 254
247 255 $self->do_when_created(sub {
248 256 $poe_kernel->post($self->{channel}{Alias}, server_send =>
249 257 Net::AMQP::Protocol::Basic::Publish->new(%method_opts),
250 258 Net::AMQP::Frame::Header->new(
251   - weight => $user_opts{weight} || 0,
  259 + weight => $user_opts->{weight} || 0,
252 260 body_size => length($message),
253 261 header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(%content_opts),
254 262 ),
14 t/01_basic.t
... ... @@ -0,0 +1,14 @@
  1 +use strict;
  2 +use warnings;
  3 +use Test::More tests => 2;
  4 +
  5 +BEGIN {
  6 + use_ok('POE::Component::Client::AMQP');
  7 +}
  8 +
  9 +my $client = POE::Component::Client::AMQP->create(
  10 + RemoteAddress => '127.0.0.1',
  11 + is_testing => 1, # don't create POE sessions
  12 +);
  13 +
  14 +isa_ok($client, 'POE::Component::Client::AMQP');

0 comments on commit c0df2b6

Please sign in to comment.
Something went wrong with that request. Please try again.