Permalink
Browse files

Merge pull request #2 from revmischa/master

  • Loading branch information...
2 parents 054d4e5 + cc1c94e commit 52526e7e1272c07ed715159cfca946d520646d62 @clkao committed Feb 2, 2012
Showing with 45 additions and 0 deletions.
  1. +11 −0 lib/AnyMQ/Topic.pm
  2. +34 −0 t/publish_queues.t
View
@@ -25,6 +25,7 @@ has queues => (traits => ['Hash'],
);
has recycle => (is => "rw", isa => "Bool", default => sub { 0 });
has 'reaper_interval' => (is => 'ro', isa => 'Int', default => sub { 30 });
+has 'publish_to_queues' => (is => 'rw', isa => 'Bool', default => sub { 1 });
has '_listener_reaper' => (is => 'rw');
has '+_trait_namespace' => (default => 'AnyMQ::Topic::Trait');
@@ -54,6 +55,16 @@ sub reap_destroyed_listeners {
}
sub publish {
+ my ($self, @messages) = @_;
+ $self->append_to_queues(@messages) if $self->publish_to_queues;
+ $self->dispatch_messages(@messages);
+}
+
+sub dispatch_messages {
+ my ($self, @messages) = @_;
+}
+
+sub append_to_queues {
my ($self, @messages) = @_;
$self->reap_destroyed_listeners;
for (values %{$self->queues}) {
View
@@ -0,0 +1,34 @@
+use strict;
+use Test::More;
+use AnyMQ;
+use AnyMQ::Topic;
+
+my $bus = AnyMQ->new;
+
+my $t1 = AnyMQ::Topic->new(
+ bus => $bus,
+ publish_to_queues => 1,
+);
+test_topic($t1);
+
+my $t2 = AnyMQ::Topic->new(
+ bus => $bus,
+ publish_to_queues => 0,
+);
+test_topic($t2);
+
+sub test_topic {
+ my ($channel) = @_;
+
+ my $client = AnyMQ->new_listener($channel);
+
+ my $events = 0;
+ $client->poll(sub { $events++; });
+
+ $channel->publish({ data => 1});
+
+ my $expected = $channel->publish_to_queues ? 1 : 0;
+ is($events, $expected, "Got expected events published to queues");
+}
+
+done_testing;

0 comments on commit 52526e7

Please sign in to comment.