diff --git a/lib/Mojo/Redis/Processor.pm b/lib/Mojo/Redis/Processor.pm index f109627..a78c1e7 100644 --- a/lib/Mojo/Redis/Processor.pm +++ b/lib/Mojo/Redis/Processor.pm @@ -20,7 +20,7 @@ Version 0.02 =cut -our $VERSION = '0.04'; +our $VERSION = '0.06'; =head1 DESCRIPTION @@ -257,7 +257,6 @@ sub on_processed { $self->_read->on( message => sub { my ($redis, $msg, $channel) = @_; - $self->_write->expire($self->_unique, $self->{expire}); $code->($msg, $channel); }); $self->_read->subscribe([$self->_processed_channel]); @@ -303,7 +302,9 @@ sub _expired { =head2 C<< on_trigger() >> -Daemon will call this to register a processor code reference that will be called everytime trigger happens. The return value will be passed to Mojo apps which requested it using Redis Pub/Sub system. +Daemon will call this to register a processor code reference that will be called everytime trigger happens. +The return value will be passed to Mojo apps which requested it using Redis Pub/Sub system. +on_trigger will exit the loop when there is no more subscriber to the channel. =cut @@ -314,8 +315,12 @@ sub on_trigger { $self->_daemon_redis->subscription_loop( default_callback => sub { my $c = shift; - $self->_publish($pricer->($self->{data})); - $c->unsubscribe() if $self->_expired; + my $count = $self->_publish($pricer->($self->{data})); + $self->_write->expire($self->_unique, $self->{expire}); + if ($count == 0) { + $c->unsubscribe(); + $self->_write->del($self->_unique); + } }, subscribe => [$self->{trigger}]); }