Skip to content

Commit

Permalink
POD and implementation of emit_direct.
Browse files Browse the repository at this point in the history
  • Loading branch information
Cory G Watson authored and Cory G Watson committed Oct 11, 2011
1 parent 1df1e0d commit fa6b69b
Showing 1 changed file with 93 additions and 2 deletions.
95 changes: 93 additions & 2 deletions lib/IO/Storm.pm
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ has '_stdin' => (
}
);

=method read_string_message
Read a message from the ShellBolt. Reads until it finds a "end" line.
=cut
sub read_string_message {
my ($self) = @_;

Expand All @@ -68,18 +73,36 @@ sub read_string_message {
return join("\n", @messages);
}

=method read_message
Read a message from the ShellBolt and decode it from JSON.
=cut

sub read_message {
my ($self) = @_;

return decode_json($self->read_string_message);
}

=method send_message_to_parent
Sent a message to the ShellBolt, encoding it as JSON.
=cut

sub send_message_to_parent {
my ($self, $href) = @_;

$self->send_to_parent(encode_json($href));
}

=method send_to_parent
Send a message to the ShellBolt.
=cut

sub send_to_parent {
my ($self, $s) = @_;

Expand All @@ -89,12 +112,24 @@ sub send_to_parent {
print "end\n";
}

=method sync
Send a sync.
=cut

sub sync {
my ($self) = @_;
$logger->debug('sending sync');
print "sync\n";
}

=method send_pid
Send this processes PID.
=cut

sub send_pid {
my ($self, $hbdir) = @_;

Expand All @@ -110,6 +145,12 @@ sub send_pid {
$logger->debug("wrote pid to $hbdir/$pid");
}

=method emit_tuple
Send a tuple to the ShellBolt.
=cut

sub emit_tuple {
my ($self, $tuple, $stream, $anchors, $direct_task) = @_;

Expand All @@ -129,6 +170,12 @@ sub emit_tuple {
$self->send_message_to_parent(\%message);
}

=method emit
Emit a tuple to the the ShellBolt and return the response.
=cut

sub emit {
my ($self, $tuple, $stream, $anchors) = @_;

Expand All @@ -137,28 +184,60 @@ sub emit {
return $self->read_message;
}

sub emitDirect {
# XXX
=method emit_direct
Emit a tuple to the Shell bolt, but do not get a response.
=cut

sub emit_direct {
my ($self, $task, $tuple, $stream, $anchors) = @_;

emit_tuple($tuple, $stream, $anchors, $task);
}

=method ack
Acknowledge a tuple.
=cut

sub ack {
my ($self, $tuple) = @_;

$self->send_message_to_parent({ command => 'ack', id => $tuple->id })
}

=method fail
Fail a tuple.
=cut

sub fail {
my ($self, $tuple) = @_;

$self->send_message_to_parent({ command => 'fail', id => $tuple->id })
}

=method log
Send a log command to the ShellBolt
=cut

sub log {
my ($self, $message) = @_;

$self->send_message_to_parent({ command => 'log', msg => $message })
}

=method read_env
Read the configuration and context from the ShellBolt.
=cut

sub read_env {
my ($self) = @_;
$logger->debug('read_env');
Expand All @@ -168,6 +247,12 @@ sub read_env {
return [ $conf, $context ];
}

=method read_tuple
Turn the incoming Tuple structure into an L<IO::Storm::Tuple>.
=cut

sub read_tuple {
my ($self) = @_;

Expand All @@ -182,6 +267,12 @@ sub read_tuple {
);
}

=method init_bolt
Initialize this bolt.
=cut

sub init_bolt {
my ($self) = @_;

Expand Down

0 comments on commit fa6b69b

Please sign in to comment.