Permalink
Browse files

First commit.

  • Loading branch information...
0 parents commit 6fdb7a779970c7be305131e2a6cce9f007131b2d Cory G Watson committed Oct 6, 2011
Showing with 284 additions and 0 deletions.
  1. +5 −0 Changes
  2. +8 −0 dist.ini
  3. +170 −0 lib/IO/Storm.pm
  4. +29 −0 lib/IO/Storm/BasicBolt.pm
  5. +25 −0 lib/IO/Storm/Bolt.pm
  6. +24 −0 lib/IO/Storm/Tuple.pm
  7. +23 −0 weaver.ini
@@ -0,0 +1,5 @@
+Revision history for IO-Storm
+
+{{$NEXT}}
+ - First version, released on an unsuspecting world.
+
@@ -0,0 +1,8 @@
+name = IO-Storm
+author = Cory G Watson <gphat@cpan.org>
+license = Perl_5
+copyright_holder= Infinity Interacive, Inc
+
+[@GPHAT]
+
+[Prereqs]
@@ -0,0 +1,170 @@
+package IO::Storm;
+use Moose;
+
+use IO::Handle qw(autoflush);
+use IO::File;
+use JSON::XS qw(decode_json encode_json);
+use Log::Log4perl;
+use Storm::Tuple;
+
+my $logger = Log::Log4perl->get_logger('storm');
+
+has 'stdin' => (
+ is => 'rw',
+ default => sub {
+ my $io = IO::Handle->new;
+ $io->fdopen(fileno(STDIN), 'r');
+ }
+);
+
+sub read_string_message {
+ my ($self) = @_;
+
+ my @messages = ();
+ while(1) {
+ $logger->debug("reading");
+ my $line = $self->stdin->getline;
+ chomp($line);
+ $logger->debug("got $line");
+ if($line eq 'end') {
+ last;
+ }
+ push(@messages, $line);
+ }
+ return join("\n", @messages);
+}
+
+sub read_message {
+ my ($self) = @_;
+
+ return decode_json($self->read_string_message);
+}
+
+sub send_message_to_parent {
+ my ($self, $href) = @_;
+
+ $self->send_to_parent(encode_json($href));
+}
+
+sub send_to_parent {
+ my ($self, $s) = @_;
+
+ $logger->debug("sending $s");
+ print "$s\n";;
+ $logger->debug("sending end");
+ print "end\n";
+}
+
+sub sync {
+ my ($self) = @_;
+ $logger->debug("sending sync");
+ print "sync\n";
+}
+
+sub send_pid {
+ my ($self, $hbdir) = @_;
+
+ my $pid = $$;
+ print "$pid\n";
+ $logger->debug("sent $pid");
+
+ # XXX error handling
+ my $fh = IO::File->new;
+ $fh->open('> '.$hbdir.'/'.$pid);
+ $fh->close;
+
+ $logger->debug("### wrote pid to $hbdir/$pid");
+}
+
+sub emit_tuple {
+ my ($self, $tuple, $stream, $anchors, $direct_task) = @_;
+
+ # global ANCHOR_TUPLE
+ # if ANCHOR_TUPLE is not None:
+ # anchors = [ANCHOR_TUPLE]
+ # m = {"command": "emit"}
+ # if stream is not None:
+ # m["stream"] = stream
+ # m["anchors"] = map(lambda a: a.id, anchors)
+ # if directTask is not None:
+ # m["task"] = directTask
+ # m["tuple"] = tup
+ # sendMsgToParent(m)
+
+ my %message = ( command => 'emit' );
+ if(defined($stream)) {
+ $message{stream} = $stream;
+ }
+ if(defined($direct_task)) {
+ $message{task} = $direct_task;
+ }
+ $message{tuple} = $tuple;
+ $self->send_message_to_parent(\%message);
+}
+
+sub emit {
+ my ($self, $tuple, $stream, $anchors) = @_;
+
+ $anchors = [];
+
+ $self->emit_tuple($tuple, $stream, $anchors);
+ return $self->read_message;
+}
+
+sub emitDirect {
+ # XXX
+}
+
+sub ack {
+ my ($self, $tuple) = @_;
+
+ $self->send_message_to_parent({ command => 'ack', id => $tuple->id })
+}
+
+sub fail {
+ my ($self, $tuple) = @_;
+
+ $self->send_message_to_parent({ command => 'fail', id => $tuple->id })
+}
+
+sub log {
+ my ($self, $message) = @_;
+
+ $self->send_message_to_parent({ command => 'log', msg => $message })
+}
+
+sub read_env {
+ my ($self) = @_;
+ $logger->debug("### read_env");
+ my $conf = $self->read_message;
+ my $context = $self->read_message;
+
+ return [ $conf, $context ];
+}
+
+sub read_tuple {
+ my ($self) = @_;
+
+ my $tupmap = $self->read_message;
+
+ return Storm::Tuple->new(
+ id => $tupmap->{id},
+ component => $tupmap->{comp},
+ stream => $tupmap->{stream},
+ task => $tupmap->{task},
+ values => $tupmap->{tuple}
+ );
+}
+
+sub init_bolt {
+ my ($self) = @_;
+
+ autoflush STDOUT 1;
+
+ $logger->debug("init_bolt");
+ my $hbdir = $self->read_string_message;
+ $self->send_pid($hbdir);
+ return $self->read_env;
+}
+
+1;
@@ -0,0 +1,29 @@
+package IO::Storm::BasicBolt;
+use Moose;
+
+extends 'Storm';
+
+use Log::Log4perl;
+
+my $logger = Log::Log4perl->get_logger('storm.basicbolt');
+
+sub process {
+ my ($self, $tuple) = @_;
+}
+
+sub run {
+ my ($self) = @_;
+ # XXX
+ my $ANCHOR_TUPLE;
+
+ my ($conf, $context) = $self->init_bolt;
+ while(1) {
+ my $tup = $self->read_tuple;
+ $ANCHOR_TUPLE = $tup;
+ $self->process($tup);
+ $self->ack($tup);
+ $self->sync;
+ }
+}
+
+1;
@@ -0,0 +1,25 @@
+package IO::Storm::Bolt;
+use Moose;
+
+extends 'Storm';
+
+use Log::Log4perl;
+
+my $logger = Log::Log4perl->get_logger('storm.bolt');
+
+sub process {
+ my ($self, $tuple) = @_;
+}
+
+sub run {
+ my ($self) = @_;
+
+ my ($conf, $context) = $self->initbolt;
+ while(1) {
+ my $tup = $self->readtuple;
+ $self->process($tup);
+ $self->sync;
+ }
+}
+
+1;
@@ -0,0 +1,24 @@
+package IO::Storm::Tuple;
+use Moose;
+
+has 'id' => (
+ is => 'rw'
+);
+
+has 'component' => (
+ is => 'rw'
+);
+
+has 'stream' => (
+ is => 'rw'
+);
+
+has 'task' => (
+ is => 'rw'
+);
+
+has 'values' => (
+ is => 'rw'
+);
+
+1;
@@ -0,0 +1,23 @@
+[@CorePrep]
+
+[Name]
+[Version]
+
+[Generic / SYNOPSIS]
+[Generic / DESCRIPTION]
+[Generic / OVERVIEW]
+
+[Region / prelude]
+
+[Leftovers]
+
+[Collect / ATTRIBUTES]
+command = attr
+
+[Collect / METHODS]
+command = method
+
+[Region / postlude]
+
+[Authors]
+[Legal]

0 comments on commit 6fdb7a7

Please sign in to comment.