Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Add option to topic to not notify local queues when publishing events #2

Merged
merged 3 commits into from

2 participants

@revmischa

ZeroMQ doesn't want to automatically append events to local listeners when calling ->publish(). This additional optional allows the code to stay clean without changing any existing interfaces.

Much better: revmischa/anymq-zeromq@abe29d0

Also as a result, my pubsub hippie middleware can fully use AnyMQ instead of relying on AnyMQ::ZeroMQ-specific behavior

@clkao
Owner

Hi,

Can you add a test case for this patch?

@revmischa

Test case is in commit 13f5493 in AnyMQ::ZeroMQ. Test fails if publish_to_queues is true for ZeroMQ

@revmischa

Test case attached

@revmischa

Is this chill? Are you waiting on anything from me?

@clkao clkao merged commit cc1c94e into clkao:master
@clkao
Owner

merged, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 1, 2011
  1. @revmischa

    split up publish into dispatch_messages/append_to_queues so topic tra…

    revmischa authored
    …its can override the appropriate behavior
  2. @revmischa
Commits on Nov 2, 2011
  1. @revmischa
This page is out of date. Refresh to see the latest.
Showing with 46 additions and 1 deletion.
  1. +1 −1  lib/AnyMQ.pm
  2. +11 −0 lib/AnyMQ/Topic.pm
  3. +34 −0 t/publish_queues.t
View
2  lib/AnyMQ.pm
@@ -1,7 +1,7 @@
package AnyMQ;
use strict;
use 5.008_001;
-our $VERSION = '0.33';
+our $VERSION = '0.34';
use AnyEvent;
use Any::Moose;
View
11 lib/AnyMQ/Topic.pm
@@ -25,6 +25,7 @@ has queues => (traits => ['Hash'],
);
has recycle => (is => "rw", isa => "Bool", default => sub { 0 });
has 'reaper_interval' => (is => 'ro', isa => 'Int', default => sub { 30 });
+has 'publish_to_queues' => (is => 'rw', isa => 'Bool', default => sub { 1 });
has '_listener_reaper' => (is => 'rw');
has '+_trait_namespace' => (default => 'AnyMQ::Topic::Trait');
@@ -55,6 +56,16 @@ sub reap_destroyed_listeners {
sub publish {
my ($self, @messages) = @_;
+ $self->append_to_queues(@messages) if $self->publish_to_queues;
+ $self->dispatch_messages(@messages);
+}
+
+sub dispatch_messages {
+ my ($self, @messages) = @_;
+}
+
+sub append_to_queues {
+ my ($self, @messages) = @_;
$self->reap_destroyed_listeners;
for (values %{$self->queues}) {
$_->append(@messages);
View
34 t/publish_queues.t
@@ -0,0 +1,34 @@
+use strict;
+use Test::More;
+use AnyMQ;
+use AnyMQ::Topic;
+
+my $bus = AnyMQ->new;
+
+my $t1 = AnyMQ::Topic->new(
+ bus => $bus,
+ publish_to_queues => 1,
+);
+test_topic($t1);
+
+my $t2 = AnyMQ::Topic->new(
+ bus => $bus,
+ publish_to_queues => 0,
+);
+test_topic($t2);
+
+sub test_topic {
+ my ($channel) = @_;
+
+ my $client = AnyMQ->new_listener($channel);
+
+ my $events = 0;
+ $client->poll(sub { $events++; });
+
+ $channel->publish({ data => 1});
+
+ my $expected = $channel->publish_to_queues ? 1 : 0;
+ is($events, $expected, "Got expected events published to queues");
+}
+
+done_testing;
Something went wrong with that request. Please try again.