Skip to content

Commit

Permalink
first cut of AnyMQ::AMQP.
Browse files Browse the repository at this point in the history
  • Loading branch information
clkao committed Feb 11, 2010
0 parents commit c3fe068
Show file tree
Hide file tree
Showing 15 changed files with 269 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
@@ -0,0 +1,5 @@
META.yml
Makefile
inc/
pm_to_blib
*~
2 changes: 2 additions & 0 deletions .shipit
@@ -0,0 +1,2 @@
steps = FindVersion, ChangeVersion, CheckChangeLog, DistTest, Commit, Tag, MakeDist, UploadCPAN
git.push_to = origin
4 changes: 4 additions & 0 deletions Changes
@@ -0,0 +1,4 @@
Revision history for Perl extension AnyMQ::AMQP

0.01 Mon Jan 18 21:12:35 2010
- original version
30 changes: 30 additions & 0 deletions MANIFEST
@@ -0,0 +1,30 @@
.gitignore
Changes
inc/Module/Install.pm
inc/Module/Install/AuthorTests.pm
inc/Module/Install/Base.pm
inc/Module/Install/Can.pm
inc/Module/Install/Fetch.pm
inc/Module/Install/Include.pm
inc/Module/Install/Makefile.pm
inc/Module/Install/Metadata.pm
inc/Module/Install/ReadmeFromPod.pm
inc/Module/Install/TestBase.pm
inc/Module/Install/Win32.pm
inc/Module/Install/WriteAll.pm
inc/Spiffy.pm
inc/Test/Base.pm
inc/Test/Base/Filter.pm
inc/Test/Builder.pm
inc/Test/Builder/Module.pm
inc/Test/More.pm
lib/AnyMQ/AMQP.pm
Makefile.PL
MANIFEST This list of files
META.yml
README
t/00_compile.t
xt/perlcritic.t
xt/pod.t
xt/podspell.t
xt/synopsis.t
14 changes: 14 additions & 0 deletions MANIFEST.SKIP
@@ -0,0 +1,14 @@
\bRCS\b
\bCVS\b
\.svn/
\.git/
^MANIFEST\.
^Makefile$
~$
\.old$
^blib/
^pm_to_blib
^MakeMaker-\d
\.gz$
\.cvsignore
\.shipit
10 changes: 10 additions & 0 deletions Makefile.PL
@@ -0,0 +1,10 @@
use inc::Module::Install;
name 'AnyMQ-AMQP';
all_from 'lib/AnyMQ/AMQP.pm';
readme_from 'lib/AnyMQ/AMQP.pm';
build_requires 'Test::More';
use_test_base;
auto_include_deps;
author_tests('xt');
auto_set_repository;
WriteAll;
17 changes: 17 additions & 0 deletions README
@@ -0,0 +1,17 @@
NAME
AnyMQ::AMQP -

SYNOPSIS
use AnyMQ::AMQP;

DESCRIPTION
AnyMQ::AMQP is

AUTHOR
Chia-liang Kao <clkao@clkao.org>

LICENSE
This library is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.

SEE ALSO
37 changes: 37 additions & 0 deletions lib/AnyMQ/AMQP.pm
@@ -0,0 +1,37 @@
package AnyMQ::AMQP;

use strict;
use 5.008_001;
our $VERSION = '0.01';

1;
__END__
=encoding utf-8
=for stopwords
=head1 NAME
AnyMQ::AMQP -
=head1 SYNOPSIS
use AnyMQ::AMQP;
=head1 DESCRIPTION
AnyMQ::AMQP is
=head1 AUTHOR
Chia-liang Kao E<lt>clkao@clkao.orgE<gt>
=head1 LICENSE
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=head1 SEE ALSO
=cut
81 changes: 81 additions & 0 deletions lib/AnyMQ/Trait/AMQP.pm
@@ -0,0 +1,81 @@
package AnyMQ::Trait::AMQP;
use Moose::Role;

use AnyEvent;
use AnyEvent::RabbitMQ;

has host => (is => "ro", isa => "Str");
has port => (is => "ro", isa => "Int");
has user => (is => "ro", isa => "Str");
has pass => (is => "ro", isa => "Str");
has vhost => (is => "ro", isa => "Str");
has exchange => (is => "ro", isa => "Str");

has bind_mode => (is => "ro", isa => "Str", default => sub { 'exchange' });

has _rf => (is => "rw");
has _rf_channel => (is => "rw");
has _rf_queue => (is => "rw");

sub BUILD {
my $self = shift;

my $rf = AnyEvent::RabbitMQ->new({timeout => 1, verbose => 0,});
$rf->load_xml_spec('fixed_amqp0-8.xml');

my $cv = AE::cv;
$rf->connect((map { $_ => $self->$_ }
qw(host port user pass vhost)),
on_success => sub {
$rf->open_channel(
on_success => sub {
my $channel = shift;
$self->_rf_channel($channel);
$channel->qos();
$channel->declare_queue(
exclusive => 1,
on_success => sub {
my $method = shift;
my $queue = $method->method_frame->queue;
$self->_rf_queue($queue);
$channel->consume(queue => $queue,
no_ack => 1,
on_success => sub {
$cv->send('init');
},
on_consume => sub {
my $frame = shift;
my $payload = $frame->{body}->payload;
my $reply_to = $frame->{header}->reply_to;
next if $reply_to && $reply_to eq $self->_queue;
my $topic = $frame->{deliver}->method_frame->routing_key;
$self->topics->{$topic}->publish($payload);

},
on_failure => $cv,
);
},
on_failure => $cv,
),
},
on_failure => $cv,
);
},
on_failure => $cv,
);
$cv->recv;

}

around 'new_topic' => sub {
my ($next, $self, @args) = @_;
my $topic = $self->$next(@args);
warn "topic: ".$topic->name;
$self->_rf_channel->bind_queue(
queue => $self->_rf_queue,
routing_key => $topic->name,
);
return $topic;
};

1;
4 changes: 4 additions & 0 deletions t/00_compile.t
@@ -0,0 +1,4 @@
use strict;
use Test::More tests => 1;

BEGIN { use_ok 'AnyMQ::AMQP' }
43 changes: 43 additions & 0 deletions t/basic.t
@@ -0,0 +1,43 @@
use Test::More;
use Time::HiRes 'time';
use strict;
use AnyEvent;
use AnyMQ;

my $bus = AnyMQ->new_with_traits(traits => ['AMQP'],
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
exchange => '',
);

is($bus->host, 'localhost');

#$cv->recv;
my $test = $bus->topic('test_q');

my $q = AnyEvent->condvar;

my $client = $bus->new_listener; $client->subscribe($test);
my $cnt = 0;
$client->poll( sub {
my $timestamp = shift;
diag "latency: ".(time() - $timestamp);
++$cnt;
$q->send([1]) if $cnt == 10;
});

# simulate messages coming to the exchange
$bus->_rf_channel->publish( routing_key => 'test_q',
body => time(),
) for 1..10;

my $w; $w = AnyEvent->timer( after => 5,
cb => sub {
$q->send([0, 'test timeout']);
} );
ok(@{ $q->recv });

done_testing;
5 changes: 5 additions & 0 deletions xt/perlcritic.t
@@ -0,0 +1,5 @@
use strict;
use Test::More;
eval q{ use Test::Perl::Critic };
plan skip_all => "Test::Perl::Critic is not installed." if $@;
all_critic_ok("lib");
4 changes: 4 additions & 0 deletions xt/pod.t
@@ -0,0 +1,4 @@
use Test::More;
eval "use Test::Pod 1.00";
plan skip_all => "Test::Pod 1.00 required for testing POD" if $@;
all_pod_files_ok();
9 changes: 9 additions & 0 deletions xt/podspell.t
@@ -0,0 +1,9 @@
use Test::More;
eval q{ use Test::Spelling };
plan skip_all => "Test::Spelling is not installed." if $@;
add_stopwords(<DATA>);
set_spell_cmd("aspell -l en list");
all_pod_files_spelling_ok('lib');
__DATA__
Tatsuhiko
Miyagawa
4 changes: 4 additions & 0 deletions xt/synopsis.t
@@ -0,0 +1,4 @@
use Test::More;
eval "use Test::Synopsis";
plan skip_all => "Test::Synopsis required" if $@;
all_synopsis_ok();

0 comments on commit c3fe068

Please sign in to comment.