From 97863591fd5461500370f2a0679a79acb433ea05 Mon Sep 17 00:00:00 2001 From: Kaveh Mousavi Zamani Date: Tue, 31 May 2016 21:20:12 +0800 Subject: [PATCH 1/3] Assuming the pub/sub happens at the same server makes managing process lifetime easier --- lib/Mojo/Redis/Processor.pm | 15 ++++++++++----- t/01_init.t | 2 -- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/Mojo/Redis/Processor.pm b/lib/Mojo/Redis/Processor.pm index f109627..5c0699a 100644 --- a/lib/Mojo/Redis/Processor.pm +++ b/lib/Mojo/Redis/Processor.pm @@ -20,7 +20,7 @@ Version 0.02 =cut -our $VERSION = '0.04'; +our $VERSION = '0.05'; =head1 DESCRIPTION @@ -257,7 +257,6 @@ sub on_processed { $self->_read->on( message => sub { my ($redis, $msg, $channel) = @_; - $self->_write->expire($self->_unique, $self->{expire}); $code->($msg, $channel); }); $self->_read->subscribe([$self->_processed_channel]); @@ -303,7 +302,9 @@ sub _expired { =head2 C<< on_trigger() >> -Daemon will call this to register a processor code reference that will be called everytime trigger happens. The return value will be passed to Mojo apps which requested it using Redis Pub/Sub system. +Daemon will call this to register a processor code reference that will be called everytime trigger happens. +The return value will be passed to Mojo apps which requested it using Redis Pub/Sub system. +on_trigger will exit the loop when there is no more subscriber to the channel. =cut @@ -314,8 +315,12 @@ sub on_trigger { $self->_daemon_redis->subscription_loop( default_callback => sub { my $c = shift; - $self->_publish($pricer->($self->{data})); - $c->unsubscribe() if $self->_expired; + my $count = $self->_publish($pricer->($self->{data})); + $self->_write->expire($self->_unique, $self->{expire}); + if ($count == 0) { + $c->unsubscribe(); + $self->del($self->_unique); + } }, subscribe => [$self->{trigger}]); } diff --git a/t/01_init.t b/t/01_init.t index 06e5c5f..bd8911c 100644 --- a/t/01_init.t +++ b/t/01_init.t @@ -52,8 +52,6 @@ RedisDB->new->flushall; is($rp->{trigger}, 'R_25', 'got the trigger correct'); is($rp->_expired, undef, 'at first a new job should not be expired'); - RedisDB->new->expire('Redis::Processor::34b18bba480282531e815255f2012110', 0); - is($rp->_expired, 1, 'no activity for sometime should set the expiry'); } done_testing(); From 1cd4a4e98238b3c4f570ee67a5391bf8d65f36b3 Mon Sep 17 00:00:00 2001 From: Kaveh Mousavi Zamani Date: Wed, 1 Jun 2016 10:54:38 +0800 Subject: [PATCH 2/3] correcting the del call --- lib/Mojo/Redis/Processor.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Mojo/Redis/Processor.pm b/lib/Mojo/Redis/Processor.pm index 5c0699a..a78c1e7 100644 --- a/lib/Mojo/Redis/Processor.pm +++ b/lib/Mojo/Redis/Processor.pm @@ -20,7 +20,7 @@ Version 0.02 =cut -our $VERSION = '0.05'; +our $VERSION = '0.06'; =head1 DESCRIPTION @@ -319,7 +319,7 @@ sub on_trigger { $self->_write->expire($self->_unique, $self->{expire}); if ($count == 0) { $c->unsubscribe(); - $self->del($self->_unique); + $self->_write->del($self->_unique); } }, subscribe => [$self->{trigger}]); From 6e2c09d1568d02bab661564d424b0d1dc291b561 Mon Sep 17 00:00:00 2001 From: Kaveh Mousavi Zamani Date: Wed, 1 Jun 2016 10:56:51 +0800 Subject: [PATCH 3/3] removed by mistake --- t/01_init.t | 2 ++ 1 file changed, 2 insertions(+) diff --git a/t/01_init.t b/t/01_init.t index bd8911c..06e5c5f 100644 --- a/t/01_init.t +++ b/t/01_init.t @@ -52,6 +52,8 @@ RedisDB->new->flushall; is($rp->{trigger}, 'R_25', 'got the trigger correct'); is($rp->_expired, undef, 'at first a new job should not be expired'); + RedisDB->new->expire('Redis::Processor::34b18bba480282531e815255f2012110', 0); + is($rp->_expired, 1, 'no activity for sometime should set the expiry'); } done_testing();