Permalink
Browse files

Import AnyMQ::MQ, forked from Tatsumaki::MessageQueue.

  • Loading branch information...
0 parents commit 8be9fc8cfcb605311f2d220b58ea1d8ee3160c51 @clkao committed Jan 2, 2010
Showing with 437 additions and 0 deletions.
  1. +5 −0 .gitignore
  2. +2 −0 .shipit
  3. +4 −0 Changes
  4. +31 −0 MANIFEST
  5. +14 −0 MANIFEST.SKIP
  6. +10 −0 Makefile.PL
  7. +17 −0 README
  8. +37 −0 lib/AnyMQ.pm
  9. +215 −0 lib/AnyMQ/MQ.pm
  10. +4 −0 t/00_compile.t
  11. +19 −0 t/basic.t
  12. +23 −0 t/cycle.t
  13. +34 −0 t/events_before_long_poll.t
  14. +5 −0 xt/perlcritic.t
  15. +4 −0 xt/pod.t
  16. +9 −0 xt/podspell.t
  17. +4 −0 xt/synopsis.t
@@ -0,0 +1,5 @@
+META.yml
+Makefile
+inc/
+pm_to_blib
+*~
@@ -0,0 +1,2 @@
+steps = FindVersion, ChangeVersion, CheckChangeLog, DistTest, Commit, Tag, MakeDist, UploadCPAN
+git.push_to = origin
@@ -0,0 +1,4 @@
+Revision history for Perl extension AnyMQ
+
+0.01 Sat Jan 2 11:54:48 2010
+ - original version
@@ -0,0 +1,31 @@
+.gitignore
+Changes
+inc/Module/Install.pm
+inc/Module/Install/AuthorTests.pm
+inc/Module/Install/Base.pm
+inc/Module/Install/Can.pm
+inc/Module/Install/Fetch.pm
+inc/Module/Install/Include.pm
+inc/Module/Install/Makefile.pm
+inc/Module/Install/Metadata.pm
+inc/Module/Install/ReadmeFromPod.pm
+inc/Module/Install/Repository.pm
+inc/Module/Install/TestBase.pm
+inc/Module/Install/Win32.pm
+inc/Module/Install/WriteAll.pm
+inc/Spiffy.pm
+inc/Test/Base.pm
+inc/Test/Base/Filter.pm
+inc/Test/Builder.pm
+inc/Test/Builder/Module.pm
+inc/Test/More.pm
+lib/AnyMQ.pm
+Makefile.PL
+MANIFEST This list of files
+META.yml
+README
+t/00_compile.t
+xt/perlcritic.t
+xt/pod.t
+xt/podspell.t
+xt/synopsis.t
@@ -0,0 +1,14 @@
+\bRCS\b
+\bCVS\b
+\.svn/
+\.git/
+^MANIFEST\.
+^Makefile$
+~$
+\.old$
+^blib/
+^pm_to_blib
+^MakeMaker-\d
+\.gz$
+\.cvsignore
+\.shipit
@@ -0,0 +1,10 @@
+use inc::Module::Install;
+name 'AnyMQ';
+all_from 'lib/AnyMQ.pm';
+readme_from 'lib/AnyMQ.pm';
+build_requires 'Test::More';
+use_test_base;
+auto_include_deps;
+author_tests('xt');
+auto_set_repository;
+WriteAll;
17 README
@@ -0,0 +1,17 @@
+NAME
+ AnyMQ -
+
+SYNOPSIS
+ use AnyMQ;
+
+DESCRIPTION
+ AnyMQ is
+
+AUTHOR
+ Chia-liang Kao <clkao@clkao.org>
+
+LICENSE
+ This library is free software; you can redistribute it and/or modify it
+ under the same terms as Perl itself.
+
+SEE ALSO
@@ -0,0 +1,37 @@
+package AnyMQ;
+
+use strict;
+use 5.008_001;
+our $VERSION = '0.01';
+
+1;
+__END__
+
+=encoding utf-8
+
+=for stopwords
+
+=head1 NAME
+
+AnyMQ -
+
+=head1 SYNOPSIS
+
+ use AnyMQ;
+
+=head1 DESCRIPTION
+
+AnyMQ is
+
+=head1 AUTHOR
+
+Chia-liang Kao E<lt>clkao@clkao.orgE<gt>
+
+=head1 LICENSE
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself.
+
+=head1 SEE ALSO
+
+=cut
@@ -0,0 +1,215 @@
+package AnyMQ::MQ;;
+use strict;
+
+use AnyEvent;
+use Any::Moose;
+use Try::Tiny;
+use Scalar::Util;
+use Time::HiRes;
+use constant DEBUG => 0;
+
+has channel => (is => 'rw', isa => 'Str');
+has backlog => (is => 'rw', isa => 'ArrayRef', default => sub { [] });
+has clients => (is => 'rw', isa => 'HashRef', default => sub { +{} });
+
+our $BacklogLength = 30; # TODO configurable
+
+my %instances;
+
+sub channels {
+ values %instances;
+}
+
+sub instance {
+ my($class, $name) = @_;
+ $instances{$name} ||= $class->new(channel => $name);
+}
+
+sub backlog_events {
+ my $self = shift;
+ reverse grep defined, @{$self->backlog};
+}
+
+sub append_backlog {
+ my($self, @events) = @_;
+ my @new_backlog = (reverse(@events), @{$self->backlog});
+ $self->backlog([ splice @new_backlog, 0, $BacklogLength ]);
+}
+
+sub publish {
+ my($self, @events) = @_;
+
+ for my $client_id (keys %{$self->clients}) {
+ my $client = $self->clients->{$client_id};
+ if ($client->{cv}->cb) {
+ # currently listening: flush and send the events right away
+ $self->flush_events($client_id, @events);
+ } else {
+ # between long poll comet: buffer the events
+ # TODO: limit buffer length
+ warn "Buffering new events for $client_id" if DEBUG;
+ push @{$client->{buffer}}, @events;
+ }
+ }
+ $self->append_backlog(@events);
+}
+
+sub flush_events {
+ my($self, $client_id, @events) = @_;
+
+ my $client = $self->clients->{$client_id} or return;
+ try {
+ my $cb = $client->{cv}->cb;
+ $client->{cv}->send(@events);
+ $client->{cv} = AE::cv;
+ $client->{buffer} = [];
+
+ if ($client->{persistent}) {
+ $client->{cv}->cb($cb);
+ } else {
+ $client->{timer} = AE::timer 30, 0, sub {
+ Scalar::Util::weaken $self;
+ warn "Sweep $client_id (no long-poll reconnect)" if DEBUG;
+ undef $client;
+ delete $self->clients->{$client_id};
+ };
+ Scalar::Util::weaken $client->{timer};
+ }
+ } catch {
+ warn $_;
+ };
+}
+
+sub poll_once {
+ my($self, $client_id, $cb, $timeout) = @_;
+
+ my $is_new;
+ my $client = $self->clients->{$client_id} ||= do {
+ $is_new = 1;
+ + { cv => AE::cv, persistent => 0, buffer => [] };
+ };
+
+ $client->{cv}->cb(sub { $cb->($_[0]->recv) });
+
+ # reset garbage collection timeout with the long-poll timeout
+ # $timeout = 0 is a valid timeout for interval-polling
+ $timeout = 55 unless defined $timeout;
+ $client->{timer} = AE::timer $timeout || 55, 0, sub {
+ Scalar::Util::weaken $self;
+ warn "Timing out $client_id long-poll" if DEBUG;
+ $self->flush_events($client_id);
+ };
+ Scalar::Util::weaken $client->{timer};
+
+ if ($is_new) {
+ # flush backlog for a new client
+ my @events = $self->backlog_events;
+ $self->flush_events($client_id, @events) if @events;
+ }elsif ( @{ $client->{buffer} } ) {
+ # flush buffer for a long-poll client
+ $self->flush_events($client_id, @{ $client->{buffer} });
+ }
+}
+
+sub poll {
+ my($self, $client_id, $cb) = @_;
+
+ # TODO register client info like names and remote host in $client
+ my $cv = AE::cv;
+ $cv->cb(sub { $cb->($_[0]->recv) });
+ my $s = $self->clients->{$client_id} = {
+ cv => $cv, persistent => 1, buffer => [],
+ };
+
+ my @events = $self->backlog_events;
+ $self->flush_events($client_id, @events) if @events;
+}
+
+1;
+
+__END__
+
+=encoding utf-8
+
+=for stopwords
+
+=head1 NAME
+
+AnyMQ::MQ - AnyMQ Message Queue
+
+=head1 SYNOPSIS
+
+To publish a message, you first create an instance of the message queue on
+a specific channel:
+
+ my $mq = AnyMQ::MQ->instance($channel);
+ $mq->publish({
+ type => "message", data => $your_data,
+ address => $self->request->address,
+ time => scalar Time::HiRes::gettimeofday,
+ });
+
+Later, in a handler, you can poll for new messages:
+
+ my $mq = AnyMQ::MQ->instance($channel);
+ my $client_id = $self->request->param('client_id')
+ or die;
+ $mq->poll_once($client_id, sub { $self->write(\@_); $self->finish; });
+
+Additionally, if you are using Multipart XmlHttpRequest (MXHR) you can use
+the event API, and run a callback each time a new message is published:
+
+ my $mq = AnyMQ::MQ->instance($channel);
+ $mq->poll($client_id, sub {
+ my @events = @_;
+ for my $event (@events) {
+ $self->stream_write($event);
+ }
+ });
+
+=head1 DESCRIPTION
+
+AnyMQ::MQ is a simple message queue, storing all messages in
+memory, and keeping track of a configurable backlog. All polling requests
+are made with a C<$client_id>, and the message queue keeps track of a buffer
+per client, to ensure proper message delivery.
+
+=head1 CONFIGURATION
+
+=over
+
+=item BacklogLength
+
+To configure the number of messages in the backlog, set
+C<$AnyMQ::MQ::BacklogLength>. By default, this is set to 30.
+
+=back
+
+=head1 METHODS
+
+=head2 publish
+
+This method publishes a message into the message queue, for immediate
+consumption by all polling clients.
+
+=head2 poll($client_id, $code_ref)
+
+This is the event-driven poll mechanism, which accepts a callback as the
+second parameter. It will stream messages to the code ref passed in.
+
+=head2 poll_once($client_id, $code_ref)
+
+This method returns all messages since the last poll to the code reference
+passed as the second parameter.
+
+=head1 AUTHOR
+
+Tatsuhiko Miyagawa
+
+Chia-liang Kao
+
+=head1 SEE ALSO
+
+L<Tatsumaki>
+
+=cut
@@ -0,0 +1,4 @@
+use strict;
+use Test::More tests => 1;
+
+BEGIN { use_ok 'AnyMQ' }
@@ -0,0 +1,19 @@
+use Test::More;
+use AnyMQ::MQ;
+
+my $channel = 'test1';
+
+my $clients = 5;
+my $inc = 0;
+
+for my $client ( 1 .. $clients ) {
+ my $sub = AnyMQ::MQ->instance( $channel );
+ $sub->poll_once($client, sub { $inc++ });
+}
+
+my $pub = AnyMQ::MQ->instance( $channel );
+$pub->publish({ data => 'hello' });
+
+is( $inc, $clients, 'messagequeue publish' );
+
+done_testing;
@@ -0,0 +1,23 @@
+use Test::More;
+use Test::Requires qw(Test::Memory::Cycle);
+use AnyMQ::MQ;
+
+my $channel = 'test1';
+
+my $client_id = rand(1);
+
+my $sub = AnyMQ::MQ->instance( $channel );
+$sub->poll_once($client_id, sub { ok(1, 'got message') });
+
+memory_cycle_ok( $sub, 'no leaks' );
+
+my $pub = AnyMQ::MQ->instance( $channel );
+$pub->publish({ data => 'hello' });
+
+memory_cycle_ok( $sub, 'no leaks in subscriber' );
+memory_cycle_ok( $pub, 'no leaks in publisher' );
+
+# We''re actually relying on the poll_once test, hacky but not sure how to
+# verify
+
+done_testing;
Oops, something went wrong.

0 comments on commit 8be9fc8

Please sign in to comment.