Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions lib/Mojo/Redis/Processor.pm
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Version 0.02

=cut

our $VERSION = '0.04';
our $VERSION = '0.06';

=head1 DESCRIPTION

Expand Down Expand Up @@ -257,7 +257,6 @@ sub on_processed {
$self->_read->on(
message => sub {
my ($redis, $msg, $channel) = @_;
$self->_write->expire($self->_unique, $self->{expire});
$code->($msg, $channel);
});
$self->_read->subscribe([$self->_processed_channel]);
Expand Down Expand Up @@ -303,7 +302,9 @@ sub _expired {

=head2 C<< on_trigger() >>

Daemon will call this to register a processor code reference that will be called everytime trigger happens. The return value will be passed to Mojo apps which requested it using Redis Pub/Sub system.
Daemon will call this to register a processor code reference that will be called everytime trigger happens.
The return value will be passed to Mojo apps which requested it using Redis Pub/Sub system.
on_trigger will exit the loop when there is no more subscriber to the channel.

=cut

Expand All @@ -314,8 +315,12 @@ sub on_trigger {
$self->_daemon_redis->subscription_loop(
default_callback => sub {
my $c = shift;
$self->_publish($pricer->($self->{data}));
$c->unsubscribe() if $self->_expired;
my $count = $self->_publish($pricer->($self->{data}));
$self->_write->expire($self->_unique, $self->{expire});
if ($count == 0) {
$c->unsubscribe();
$self->_write->del($self->_unique);
}
},
subscribe => [$self->{trigger}]);
}
Expand Down