Skip to content

Commit

Permalink
Channel create and close improvements
Browse files Browse the repository at this point in the history
AMQP->channel() now allows user opts.
Waiting for synchronous method responses is now keyed on channel.
Server input that is not channel 0 will now go to the Channel, regardless of other handling by the server_input() method.
Added 'CascadeFailure' and 'CloseCallback' to Channel->create() opts to allow for a)
 the server connection to be closed if the channel closes and b) a user callback to
 trigger if the channel closes.  Better handling for channel close events, allowing a
  channel number to be reused.  Added explicit method close().
  • Loading branch information
ewaters committed Oct 9, 2009
1 parent 35b5a3d commit b9036fe
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 34 deletions.
63 changes: 35 additions & 28 deletions lib/POE/Component/Client/AMQP.pm
Expand Up @@ -271,7 +271,8 @@ Call with an optional argument $id (1 - 65536). Returns a L<POE::Component::Cli
=cut

sub channel {
my ($self, $id) = @_;
my ($self, $id, $opts) = @_;
$opts ||= {};

if (defined $id && $self->{channels}{$id}) {
return $self->{channels}{$id};
Expand All @@ -280,6 +281,7 @@ sub channel {
my $channel = POE::Component::Client::AMQP::Channel->create(
id => $id,
server => $self,
%$opts,
);

# We don't need to record the channel, as the Channel->create() did so already in our 'channels' hash
Expand Down Expand Up @@ -536,28 +538,31 @@ sub server_send {

my $output_class = ref($output->method_frame);

$self->{wait_synchronous}{ $output->channel } ||= {};
my $wait_synchronous = $self->{wait_synchronous}{ $output->channel };

# FIXME: It appears that RabbitMQ won't let us do two disimilar synchronous requests at once
if (my @waiting_classes = keys %{ $self->{wait_synchronous} }) {
if (my @waiting_classes = keys %$wait_synchronous) {
$self->{Logger}->debug("Class $waiting_classes[0] is already waiting; do nothing else until it's complete; defering")
if $self->{Debug}{logic};
push @{ $self->{wait_synchronous}{$waiting_classes[0]}{process_after} }, [ $output, @output ];
push @{ $wait_synchronous->{ $waiting_classes[0] }{process_after} }, [ $output, @output ];
return;
}

if ($self->{wait_synchronous}{$output_class}) {
# There are already other things waiting; enqueue this output
$self->{Logger}->debug("Class $output_class is already synchronously waiting; defering this and subsequent output")
if $self->{Debug}{logic};
push @{ $self->{wait_synchronous}{$output_class}{process_after} }, [ $output, @output ];
return;
}
# if ($self->{wait_synchronous}{$output_class}) {
# # There are already other things waiting; enqueue this output
# $self->{Logger}->debug("Class $output_class is already synchronously waiting; defering this and subsequent output")
# if $self->{Debug}{logic};
# push @{ $self->{wait_synchronous}{$output_class}{process_after} }, [ $output, @output ];
# return;
# }

my $responses = $output_class->method_spec->{responses};

if (keys %$responses) {
$self->{Logger}->debug("Setting up synchronous callback for $output_class")
if $self->{Debug}{logic};
$self->{wait_synchronous}{$output_class} = {
$wait_synchronous->{$output_class} = {
request => $output,
responses => $responses,
process_after => [],
Expand All @@ -571,9 +576,8 @@ sub server_send {
. ($self->{Debug}{frame_output} ? $self->{Debug}{frame_dumper}($output) : '')
. ($self->{Debug}{raw_output} ? $self->{Debug}{raw_dumper}($raw_output) : '')
);
$output = $raw_output;

$self->{HeapTCP}{server}->put($output);
$self->{HeapTCP}{server}->put($raw_output);
$self->{last_server_put} = time;
}
}
Expand Down Expand Up @@ -653,6 +657,17 @@ sub tcp_server_input {
. ($self->{Debug}{frame_input} ? $self->{Debug}{frame_dumper}($frame) : '')
);

my $handled = 0;
if ($frame->channel != 0) {
my $channel = $self->{channels}{ $frame->channel };
if (! $channel) {
$self->{Logger}->error("Received frame on channel ".$frame->channel." which we didn't request the creation of");
next FRAMES;
}
$kernel->post($channel->{Alias}, server_input => $frame);
$handled++;
}

if ($frame->isa('Net::AMQP::Frame::Method')) {
my $method_frame = $frame->method_frame;

Expand All @@ -662,7 +677,7 @@ sub tcp_server_input {
$self->{Logger}->debug("Checking 'wait_synchronous' hash against $method_frame_class") if $self->{Debug}{logic};

my $matching_output_class;
while (my ($output_class, $details) = each %{ $self->{wait_synchronous} }) {
while (my ($output_class, $details) = each %{ $self->{wait_synchronous}{ $frame->channel } }) {
next unless $details->{responses}{ $method_frame_class };
$matching_output_class = $output_class;
last;
Expand All @@ -672,7 +687,7 @@ sub tcp_server_input {
$self->{Logger}->debug("Response type '$method_frame_class' found from waiting request '$matching_output_class'")
if $self->{Debug}{logic};

my $details = delete $self->{wait_synchronous}{$matching_output_class};
my $details = delete $self->{wait_synchronous}{ $frame->channel }{$matching_output_class};

# Call the asynch callback if there is one
if (my $callback = delete $details->{request}{synchronous_callback}) {
Expand All @@ -687,12 +702,12 @@ sub tcp_server_input {
}

# Consider this frame handled
next FRAMES;
$handled++;
}
}

# Act upon connection-level methods
if ($frame->channel == 0) {
if (! $handled && $frame->channel == 0) {
if ($method_frame->isa('Net::AMQP::Protocol::Connection::Start')) {
$kernel->post($self->{Alias}, server_send =>
Net::AMQP::Protocol::Connection::StartOk->new(
Expand All @@ -707,7 +722,7 @@ sub tcp_server_input {
locale => 'en_US',
),
);
next FRAMES;
$handled++;
}
elsif ($method_frame->isa('Net::AMQP::Protocol::Connection::Tune')) {
$kernel->post($self->{Alias}, server_send =>
Expand All @@ -727,20 +742,12 @@ sub tcp_server_input {
),
),
);
next FRAMES;
$handled++;
}
}
}

if ($frame->channel != 0) {
my $channel = $self->{channels}{ $frame->channel };
if (! $channel) {
$self->{Logger}->error("Received frame on channel ".$frame->channel." which we didn't request the creation of");
next FRAMES;
}
$kernel->post($channel->{Alias}, server_input => $frame);
}
else {
if (! $handled) {
$self->{Logger}->error("Unhandled input frame ".ref($frame));
}
}
Expand Down
89 changes: 83 additions & 6 deletions lib/POE/Component/Client/AMQP/Channel.pm
Expand Up @@ -47,6 +47,10 @@ The L<POE::Session> alias so you can post to it's POE states.
At the moment, only 'Created' is used.
=item I<CascadeFailure> (default: 1)
If this channel is closed, close also the server connection.
=back
Returns an object in this class.
Expand All @@ -55,6 +59,8 @@ Returns an object in this class.
=cut

my $_ids = 0;

sub create {
my $class = shift;

Expand All @@ -65,6 +71,8 @@ sub create {
# User definable
Alias => 0,
Callbacks => { default => {} },
CascadeFailure => { default => 1 },
CloseCallback => { default => sub {} },

# Private
consumers => { default => {} },
Expand All @@ -90,7 +98,7 @@ sub create {

# Ensure we have a unique alias name

$args{Alias} ||= $args{server}{Alias} . '-channel-' . $args{id};
$args{Alias} ||= $args{server}{Alias} . '-channel-' . $_ids++;

# Create the object and session

Expand Down Expand Up @@ -248,6 +256,16 @@ sub queue {
return $queue;
}

sub close {
my $self = shift;

$self->do_when_created(sub {
$poe_kernel->post($self->{Alias}, server_send =>
Net::AMQP::Protocol::Channel::Close->new()
);
});
}

=head1 POE STATES
The following are states you can post to to interact with the client. Use the alias defined in the C<create()> call above.
Expand Down Expand Up @@ -289,6 +307,10 @@ sub channel_created {
sub server_input {
my ($self, $kernel, $frame) = @_[OBJECT, KERNEL, ARG0];

if ($self->{should_be_dead}) {
$self->server->{Logger}->error("!! I should be dead !!");
}

if ($frame->isa('Net::AMQP::Frame::Method')) {
my $method_frame = $frame->method_frame;

Expand All @@ -304,11 +326,66 @@ sub server_input {
return;
}
elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
$self->server->{Logger}->error(
"Channel ".$self->id." has been closed by the server: " .
$method_frame->reply_code . ': ' . $method_frame->reply_text
);
$self->server->stop();
# Come up with a descriptive reason why the channel closed

my $close_reason;
# If the Channel.Close event gives class and method id indicating what event cause the closure, find a
# friendly name from this and use it in the close reason
if ($method_frame->class_id && $method_frame->method_id) {
my $method_class = Net::AMQP::Frame::Method->registered_method_class($method_frame->class_id, $method_frame->method_id);
my ($class_name, $method_name) = $method_class =~ m{^Net::AMQP::Protocol::(.+)::(.+)$};
$close_reason = "The method $class_name.$method_name caused channel " . $self->id . ' to be';
}
else {
$close_reason = "The channel has been";
}
$close_reason .= ' closed by the server: ' . $method_frame->reply_code . ': ' . $method_frame->reply_text;

$self->server->{Logger}->error($close_reason);

if ($self->{CloseCallback}) {
$self->{CloseCallback}->($close_reason);
}

if ($self->{CascadeFailure}) {
$self->server->stop()
}
else {
$kernel->call($self->{Alias}, 'server_send',
Net::AMQP::Protocol::Channel::CloseOk->new()
);
}

# Delete references to myself in the server
delete $self->server->{channels}{ $self->id };
delete $self->server->{wait_synchronous}{ $self->id };

# Delete the reference to the server to reduce circular references
#$self->{server} = undef;;
$self->{should_be_dead} = 1;

$kernel->alias_remove( $self->{Alias} );

return;
}
elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) {
if ($self->{CloseOkCallback}) {
$self->{CloseOkCallback}->();
}

# Delete references to myself in the server
delete $self->server->{channels}{ $self->id };
delete $self->server->{wait_synchronous}{ $self->id };

$self->server->{Logger}->info("Closing channel ".$self->id."; now have channels " . join(', ', sort { $a <=> $b } keys %{ $self->server->{channels} }) . " open");

# Delete the reference to the server to reduce circular references
#$self->{server} = undef;;
$self->{should_be_dead} = 1;

$kernel->alias_remove( $self->{Alias} );

return;
}

}
Expand Down

0 comments on commit b9036fe

Please sign in to comment.