Permalink
Browse files

Did some unholy things to BigMemory to get support for the deliver-af…

…ter header.
  • Loading branch information...
1 parent 298bc6e commit fc4dd8be02af65dc97b7f8b898cbbf4618606969 @dsnopek committed Oct 6, 2008
Showing with 125 additions and 11 deletions.
  1. +38 −0 examples/mq_big_memory.pl
  2. +87 −11 lib/POE/Component/MessageQueue/Storage/BigMemory.pm
View
@@ -0,0 +1,38 @@
+
+use POE;
+use POE::Component::MessageQueue;
+use POE::Component::MessageQueue::Storage::BigMemory;
+use POE::Component::MessageQueue::Logger;
+use Getopt::Long;
+use Carp;
+use strict;
+
+$SIG{__DIE__} = sub {
+ Carp::confess(@_);
+};
+
+#use POE::Component::DebugShell;
+#POE::Component::DebugShell->spawn();
+
+# Force some logger output without using the real logger.
+$POE::Component::MessageQueue::Logger::LEVEL = 0;
+
+my $port = 61613;
+my $hostname = undef;
+
+GetOptions(
+ "port|p=i" => \$port,
+ "hostname|h=s" => \$hostname
+);
+
+POE::Component::MessageQueue->new({
+ port => $port,
+ hostname => $hostname,
+
+ storage => POE::Component::MessageQueue::Storage::BigMemory->new(),
+ pump_frequency => 1
+});
+
+POE::Kernel->run();
+exit;
+
@@ -16,7 +16,6 @@
#
package POE::Component::MessageQueue::Storage::BigMemory::MessageElement;
-use Heap::Elem;
use base qw(Heap::Elem);
sub new
@@ -36,6 +35,26 @@ sub cmp
1;
+package POE::Component::MessageQueue::Storage::BigMemory::DelayedMessageElement;
+use base qw(Heap::Elem);
+
+sub new
+{
+ my ($class, $message) = @_;
+ my $self = $class->SUPER::new;
+
+ $self->val($message);
+ bless($self, $class);
+}
+
+sub cmp
+{
+ my ($self, $other) = @_;
+ return $self->val->deliver_at <=> $other->val->deliver_at;
+}
+
+1;
+
package POE::Component::MessageQueue::Storage::BigMemory;
use Moose;
with qw(POE::Component::MessageQueue::Storage);
@@ -48,6 +67,8 @@ use constant empty_hashref => (is => 'ro', default => sub { {} });
has 'claimed' => empty_hashref;
# queue_name => heap of messages
has 'unclaimed' => empty_hashref;
+# queue_name => heap of messages
+has 'delayed' => empty_hashref;
# message_id => info hash
has 'messages' => empty_hashref;
@@ -59,6 +80,7 @@ has 'message_heap' => (
# Where messages are stored:
# -- A heap of all unclaimed messages sorted by timestamp
# -- Per destination heaps for unclaimed messages
+# -- Per destination heaps for delayed messages
# -- A hash of claimant => messages.
#
# There is also a hash of ids to info about heap elements and such.
@@ -68,11 +90,15 @@ sub _make_heap_elem
POE::Component::MessageQueue::Storage::BigMemory::MessageElement->new(@_);
}
+sub _make_delayed_heap_elem
+{
+ POE::Component::MessageQueue::Storage::BigMemory::DelayedMessageElement->new(@_);
+}
+
sub store
{
my ($self, $msg, $callback) = @_;
- my $elem = _make_heap_elem($msg);
my $main = _make_heap_elem($msg);
$self->message_heap->add($main);
@@ -81,16 +107,28 @@ sub store
main => $main,
};
- if ($msg->claimed)
+ if ($msg->has_delay && $msg->deliver_at > time())
{
- $self->claimed->{$msg->claimant}->{$msg->destination} = $elem;
+ my $elem = _make_delayed_heap_elem($msg);
+ my $heap =
+ ($self->delayed->{$msg->destination} ||= Heap::Fibonacci->new);
+ $heap->add($elem);
+ $info->{delayed} = $elem;
}
else
{
- my $heap =
- ($self->unclaimed->{$msg->destination} ||= Heap::Fibonacci->new);
- $heap->add($elem);
- $info->{unclaimed} = $elem;
+ my $elem = _make_heap_elem($msg);
+ if ($msg->claimed)
+ {
+ $self->claimed->{$msg->claimant}->{$msg->destination} = $elem;
+ }
+ else
+ {
+ my $heap =
+ ($self->unclaimed->{$msg->destination} ||= Heap::Fibonacci->new);
+ $heap->add($elem);
+ $info->{unclaimed} = $elem;
+ }
}
my $id = $msg->id;
@@ -124,6 +162,34 @@ sub get_oldest
sub claim_and_retrieve
{
my ($self, $destination, $client_id, $callback) = @_;
+
+ # move delayed messages to normal storage
+ if (my $delayed = $self->delayed->{$destination})
+ {
+ my $time = time();
+
+ while (my $elem = $delayed->top)
+ {
+ my $msg = $elem->val;
+ last unless ($msg->deliver_at <= $time);
+
+ my $info = $self->messages->{$msg->id};
+
+ # remove from the delayed heap
+ $delayed->delete($elem);
+ delete $info->{delayed};
+
+ # make a normal heap element
+ $elem = _make_heap_elem($msg);
+
+ # add to the unclaimed heap
+ my $unclaimed =
+ ($self->unclaimed->{$msg->destination} ||= Heap::Fibonacci->new);
+ $unclaimed->add($elem);
+ $info->{unclaimed} = $elem;
+ }
+ }
+
my $message;
my $heap = $self->unclaimed->{$destination};
if ($heap)
@@ -151,7 +217,11 @@ sub remove
{
delete $self->claimed->{$msg->claimant}->{$msg->destination};
}
- else
+ elsif ($info->{delayed})
+ {
+ $self->delayed->{$msg->destination}->delete($info->{delayed});
+ }
+ elsif ($info->{unclaimed})
{
$self->unclaimed->{$msg->destination}->delete($info->{unclaimed});
}
@@ -164,7 +234,7 @@ sub empty
{
my ($self, $callback) = @_;
- %{$self->$_} = () foreach qw(messages claimed unclaimed);
+ %{$self->$_} = () foreach qw(messages claimed unclaimed delayed);
$self->message_heap(Heap::Fibonacci->new);
goto $callback if $callback;
}
@@ -185,7 +255,13 @@ sub claim
$self->claimed->{$client_id}->{$destination} =
delete $self->claimed->{$message->claimant}->{$destination}
}
- else
+ elsif ($info->{delayed})
+ {
+ my $elem = $self->claimed->{$client_id}->{$destination} =
+ delete $self->messages->{$message->id}->{delayed};
+ $self->delayed->{$destination}->delete($elem);
+ }
+ elsif ($info->{unclaimed})
{
my $elem = $self->claimed->{$client_id}->{$destination} =
delete $self->messages->{$message->id}->{unclaimed};

0 comments on commit fc4dd8b

Please sign in to comment.