diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..aa7964e --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +META.yml +Makefile +inc/ +pm_to_blib +*~ diff --git a/.shipit b/.shipit new file mode 100644 index 0000000..d2778c7 --- /dev/null +++ b/.shipit @@ -0,0 +1,2 @@ +steps = FindVersion, ChangeVersion, CheckChangeLog, DistTest, Commit, Tag, MakeDist, UploadCPAN +git.push_to = origin diff --git a/Changes b/Changes new file mode 100644 index 0000000..91edf8d --- /dev/null +++ b/Changes @@ -0,0 +1,4 @@ +Revision history for Perl extension AnyMQ::AMQP + +0.01 Mon Jan 18 21:12:35 2010 + - original version diff --git a/MANIFEST b/MANIFEST new file mode 100644 index 0000000..5d28338 --- /dev/null +++ b/MANIFEST @@ -0,0 +1,30 @@ +.gitignore +Changes +inc/Module/Install.pm +inc/Module/Install/AuthorTests.pm +inc/Module/Install/Base.pm +inc/Module/Install/Can.pm +inc/Module/Install/Fetch.pm +inc/Module/Install/Include.pm +inc/Module/Install/Makefile.pm +inc/Module/Install/Metadata.pm +inc/Module/Install/ReadmeFromPod.pm +inc/Module/Install/TestBase.pm +inc/Module/Install/Win32.pm +inc/Module/Install/WriteAll.pm +inc/Spiffy.pm +inc/Test/Base.pm +inc/Test/Base/Filter.pm +inc/Test/Builder.pm +inc/Test/Builder/Module.pm +inc/Test/More.pm +lib/AnyMQ/AMQP.pm +Makefile.PL +MANIFEST This list of files +META.yml +README +t/00_compile.t +xt/perlcritic.t +xt/pod.t +xt/podspell.t +xt/synopsis.t diff --git a/MANIFEST.SKIP b/MANIFEST.SKIP new file mode 100644 index 0000000..54c53b5 --- /dev/null +++ b/MANIFEST.SKIP @@ -0,0 +1,14 @@ +\bRCS\b +\bCVS\b +\.svn/ +\.git/ +^MANIFEST\. +^Makefile$ +~$ +\.old$ +^blib/ +^pm_to_blib +^MakeMaker-\d +\.gz$ +\.cvsignore +\.shipit diff --git a/Makefile.PL b/Makefile.PL new file mode 100644 index 0000000..14c654f --- /dev/null +++ b/Makefile.PL @@ -0,0 +1,10 @@ +use inc::Module::Install; +name 'AnyMQ-AMQP'; +all_from 'lib/AnyMQ/AMQP.pm'; +readme_from 'lib/AnyMQ/AMQP.pm'; +build_requires 'Test::More'; +use_test_base; +auto_include_deps; +author_tests('xt'); +auto_set_repository; +WriteAll; diff --git a/README b/README new file mode 100644 index 0000000..7165f8d --- /dev/null +++ b/README @@ -0,0 +1,17 @@ +NAME + AnyMQ::AMQP - + +SYNOPSIS + use AnyMQ::AMQP; + +DESCRIPTION + AnyMQ::AMQP is + +AUTHOR + Chia-liang Kao + +LICENSE + This library is free software; you can redistribute it and/or modify it + under the same terms as Perl itself. + +SEE ALSO diff --git a/lib/AnyMQ/AMQP.pm b/lib/AnyMQ/AMQP.pm new file mode 100644 index 0000000..ff2d4c2 --- /dev/null +++ b/lib/AnyMQ/AMQP.pm @@ -0,0 +1,37 @@ +package AnyMQ::AMQP; + +use strict; +use 5.008_001; +our $VERSION = '0.01'; + +1; +__END__ + +=encoding utf-8 + +=for stopwords + +=head1 NAME + +AnyMQ::AMQP - + +=head1 SYNOPSIS + + use AnyMQ::AMQP; + +=head1 DESCRIPTION + +AnyMQ::AMQP is + +=head1 AUTHOR + +Chia-liang Kao Eclkao@clkao.orgE + +=head1 LICENSE + +This library is free software; you can redistribute it and/or modify +it under the same terms as Perl itself. + +=head1 SEE ALSO + +=cut diff --git a/lib/AnyMQ/Trait/AMQP.pm b/lib/AnyMQ/Trait/AMQP.pm new file mode 100644 index 0000000..301278a --- /dev/null +++ b/lib/AnyMQ/Trait/AMQP.pm @@ -0,0 +1,81 @@ +package AnyMQ::Trait::AMQP; +use Moose::Role; + +use AnyEvent; +use AnyEvent::RabbitMQ; + +has host => (is => "ro", isa => "Str"); +has port => (is => "ro", isa => "Int"); +has user => (is => "ro", isa => "Str"); +has pass => (is => "ro", isa => "Str"); +has vhost => (is => "ro", isa => "Str"); +has exchange => (is => "ro", isa => "Str"); + +has bind_mode => (is => "ro", isa => "Str", default => sub { 'exchange' }); + +has _rf => (is => "rw"); +has _rf_channel => (is => "rw"); +has _rf_queue => (is => "rw"); + +sub BUILD { + my $self = shift; + + my $rf = AnyEvent::RabbitMQ->new({timeout => 1, verbose => 0,}); + $rf->load_xml_spec('fixed_amqp0-8.xml'); + + my $cv = AE::cv; + $rf->connect((map { $_ => $self->$_ } + qw(host port user pass vhost)), + on_success => sub { + $rf->open_channel( + on_success => sub { + my $channel = shift; + $self->_rf_channel($channel); + $channel->qos(); + $channel->declare_queue( + exclusive => 1, + on_success => sub { + my $method = shift; + my $queue = $method->method_frame->queue; + $self->_rf_queue($queue); + $channel->consume(queue => $queue, + no_ack => 1, + on_success => sub { + $cv->send('init'); + }, + on_consume => sub { + my $frame = shift; + my $payload = $frame->{body}->payload; + my $reply_to = $frame->{header}->reply_to; + next if $reply_to && $reply_to eq $self->_queue; + my $topic = $frame->{deliver}->method_frame->routing_key; + $self->topics->{$topic}->publish($payload); + + }, + on_failure => $cv, + ); + }, + on_failure => $cv, + ), + }, + on_failure => $cv, + ); + }, + on_failure => $cv, + ); + $cv->recv; + +} + +around 'new_topic' => sub { + my ($next, $self, @args) = @_; + my $topic = $self->$next(@args); + warn "topic: ".$topic->name; + $self->_rf_channel->bind_queue( + queue => $self->_rf_queue, + routing_key => $topic->name, + ); + return $topic; +}; + +1; diff --git a/t/00_compile.t b/t/00_compile.t new file mode 100644 index 0000000..5abb539 --- /dev/null +++ b/t/00_compile.t @@ -0,0 +1,4 @@ +use strict; +use Test::More tests => 1; + +BEGIN { use_ok 'AnyMQ::AMQP' } diff --git a/t/basic.t b/t/basic.t new file mode 100644 index 0000000..e630780 --- /dev/null +++ b/t/basic.t @@ -0,0 +1,43 @@ +use Test::More; +use Time::HiRes 'time'; +use strict; +use AnyEvent; +use AnyMQ; + +my $bus = AnyMQ->new_with_traits(traits => ['AMQP'], + host => 'localhost', + port => 5672, + user => 'guest', + pass => 'guest', + vhost => '/', + exchange => '', + ); + +is($bus->host, 'localhost'); + +#$cv->recv; +my $test = $bus->topic('test_q'); + +my $q = AnyEvent->condvar; + +my $client = $bus->new_listener; $client->subscribe($test); +my $cnt = 0; +$client->poll( sub { + my $timestamp = shift; + diag "latency: ".(time() - $timestamp); + ++$cnt; + $q->send([1]) if $cnt == 10; + }); + +# simulate messages coming to the exchange +$bus->_rf_channel->publish( routing_key => 'test_q', + body => time(), + ) for 1..10; + +my $w; $w = AnyEvent->timer( after => 5, + cb => sub { + $q->send([0, 'test timeout']); + } ); +ok(@{ $q->recv }); + +done_testing; diff --git a/xt/perlcritic.t b/xt/perlcritic.t new file mode 100644 index 0000000..950f64f --- /dev/null +++ b/xt/perlcritic.t @@ -0,0 +1,5 @@ +use strict; +use Test::More; +eval q{ use Test::Perl::Critic }; +plan skip_all => "Test::Perl::Critic is not installed." if $@; +all_critic_ok("lib"); diff --git a/xt/pod.t b/xt/pod.t new file mode 100644 index 0000000..437887a --- /dev/null +++ b/xt/pod.t @@ -0,0 +1,4 @@ +use Test::More; +eval "use Test::Pod 1.00"; +plan skip_all => "Test::Pod 1.00 required for testing POD" if $@; +all_pod_files_ok(); diff --git a/xt/podspell.t b/xt/podspell.t new file mode 100644 index 0000000..c4f1f21 --- /dev/null +++ b/xt/podspell.t @@ -0,0 +1,9 @@ +use Test::More; +eval q{ use Test::Spelling }; +plan skip_all => "Test::Spelling is not installed." if $@; +add_stopwords(); +set_spell_cmd("aspell -l en list"); +all_pod_files_spelling_ok('lib'); +__DATA__ +Tatsuhiko +Miyagawa diff --git a/xt/synopsis.t b/xt/synopsis.t new file mode 100644 index 0000000..07aa750 --- /dev/null +++ b/xt/synopsis.t @@ -0,0 +1,4 @@ +use Test::More; +eval "use Test::Synopsis"; +plan skip_all => "Test::Synopsis required" if $@; +all_synopsis_ok();