Skip to content

Commit

Permalink
Support for Reconnect & round robin RemoteAddress
Browse files Browse the repository at this point in the history
You can now provide 'Reconnect => 1' and 'Callbacks => { Reconnected => [ sub {} ] }' in the new() args to the Client.
Any queues, channels, subscriptions, etc. that were created synchronously will be recreated, however I'm unclear at this point how dynamically generated queue names will be handled.  For this reason, adding callbacks to the 'Disconnected' and 'Reconnected' triggers to resetup your queue/exchange/channel topology would be best practice, as is checking $amq->is_started before doing server_send events.
If $amq->server_send is called while in the progress of reconnecting, the messages will be enqueued to $amq->{failed_server_send}, which the user is at this point expected to do something with.
  • Loading branch information
ewaters committed Nov 6, 2009
1 parent 2333f2d commit f5e5cbc
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 22 deletions.
5 changes: 5 additions & 0 deletions examples/clock.pl
Expand Up @@ -35,6 +35,11 @@
my ($kernel, $heap) = @_[KERNEL, HEAP];
$kernel->delay(clock => 1);

if (! $amq->is_started) {
$amq->Logger->error("Server not started; not publishing time to 'amq.fanout'");
return;
}

my $message = time . '';
$amq->Logger->info("Sending '$message' to exchange 'amq.fanout'");

Expand Down
12 changes: 11 additions & 1 deletion examples/examples.pm
Expand Up @@ -22,7 +22,17 @@ my $debug = defined $ENV{DEBUG} ? $ENV{DEBUG} : 1;
Net::AMQP::Protocol->load_xml_spec($ARGV[0] || $FindBin::Bin . '/../../net-amqp/spec/amqp0-8.xml');

our $amq = POE::Component::Client::AMQP->create(
RemoteAddress => '127.0.0.1',
RemoteAddress => [qw(127.0.0.1 127.0.0.2)],

Reconnect => 1,
Callbacks => {
Reconnected => [
sub {
my $amq = shift;
$amq->Logger->info("We have been reconnected");
},
],
},

($debug ? (
Debug => {
Expand Down
8 changes: 7 additions & 1 deletion examples/pingpong.pl
Expand Up @@ -27,9 +27,15 @@

ping => sub {
my ($kernel, $heap) = @_[KERNEL, HEAP];
$kernel->delay(ping => 1);

if (! $amq->is_started) {
$amq->Logger->error("Server not started; not sending 'ping'");
return;
}

$channel->queue('one')->publish('ping');
$amq->Logger->info("Sending 'ping' to queue 'one'");
$kernel->delay(ping => 1);
},
},
);
Expand Down
8 changes: 6 additions & 2 deletions examples/stocks.pl
Expand Up @@ -56,6 +56,12 @@

publish_stock_prices => sub {
my ($kernel, $heap) = @_[KERNEL, HEAP];
$kernel->delay(publish_stock_prices => 1);

if (! $amq->is_started) {
$amq->Logger->error("Server not started; not publishing stock prices");
return;
}

my %stocks = (
appl => 170 + rand(1000) / 100.0,
Expand All @@ -74,8 +80,6 @@
)
);
}

$kernel->delay(publish_stock_prices => 1);
},
},
);
Expand Down
125 changes: 108 additions & 17 deletions lib/POE/Component/Client/AMQP.pm
Expand Up @@ -171,6 +171,7 @@ sub create {
Callbacks => { default => {} },
SSL => { default => 0 },
Keepalive => { default => 0 },
Reconnect => { default => 0 },

channels => { default => {} },
is_started => { default => 0 },
Expand Down Expand Up @@ -221,17 +222,41 @@ sub create {
],
) unless $self->{is_testing};

# If the user passed an arrayref as the RemoteAddress, pick one
# at random to connect to.
if (ref $self->{RemoteAddress}) {
# Shuffle the RemoteAddress array (thanks http://community.livejournal.com/perl/101830.html)
my $array = $self->{RemoteAddress};
for (my $i = @$array; --$i; ) {
my $j = int rand ($i+1);
next if $i == $j;
@$array[$i,$j] = @$array[$j,$i];
}

# Take the first shuffled address and move it to the back
$self->{current_RemoteAddress} = shift @{ $self->{RemoteAddress} };
push @{ $self->{RemoteAddress} }, $self->{current_RemoteAddress};
}
else {
$self->{current_RemoteAddress} = $self->{RemoteAddress};
}

POE::Component::Client::AMQP::TCP->new(
Alias => $self->{AliasTCP},
RemoteAddress => $self->{RemoteAddress},
RemotePort => $self->{RemotePort},
Connected => sub { $self->tcp_connected(@_) },
Disconnected => sub { $self->Logger->info("TCP connection is disconnected") },
ServerInput => sub { $self->tcp_server_input(@_) },
ServerFlushed => sub { $self->tcp_server_flush(@_) },
ServerError => sub { $self->tcp_server_error(@_) },
Filter => 'POE::Filter::Stream',
SSL => $self->{SSL},
Alias => $self->{AliasTCP},
RemoteAddress => $self->{current_RemoteAddress},
RemotePort => $self->{RemotePort},
Connected => sub { $self->tcp_connected(@_) },
Disconnected => sub { $self->tcp_disconnected(@_) },
ConnectError => sub { $self->tcp_connect_error(@_) },
ConnectTimeout => 20,
ServerInput => sub { $self->tcp_server_input(@_) },
ServerFlushed => sub { $self->tcp_server_flush(@_) },
ServerError => sub { $self->tcp_server_error(@_) },
Filter => 'POE::Filter::Stream',
SSL => $self->{SSL},
InlineStates => {
reconnect_delayed => sub { $self->tcp_reconnect_delayed(@_) },
},
) unless $self->{is_testing};

return $self;
Expand Down Expand Up @@ -498,12 +523,7 @@ sub server_connected {

$self->{Logger}->info("Connected to the AMQP server ".($self->{SSL} ? '(over SSL) ' : '')."and ready to act");

# Call the callbacks if present
if ($self->{Callbacks}{Startup}) {
foreach my $subref (@{ $self->{Callbacks}{Startup} }) {
$subref->();
}
}
$self->do_callback('Startup');

$self->{is_started} = 1;

Expand All @@ -525,7 +545,13 @@ Pass one or more L<Net::AMQP::Frame> objects. For short hand, you may pass L<Ne
sub server_send {
my ($self, $kernel, @output) = @_[OBJECT, KERNEL, ARG0 .. $#_];

return if $self->{is_stopped};
if ($self->{is_stopped}) {
$self->{Logger}->error("Server send called while stopped with ".int(@output)." messages");
push @{ $self->{pending_server_send} }, @output;
# FIXME: nothing is currently done with this pending server send queue; users can choose
# to resend them in their Reconnected callback
return;
}

while (my $output = shift @output) {
if (! defined $output || ! ref $output) {
Expand Down Expand Up @@ -654,7 +680,14 @@ sub tcp_connected {
#$self->{Logger}->debug("Sending 4.2.2 Protocol Header");
$heap->{server}->put( Net::AMQP::Protocol->header );

# If 'reconnect_attempt' has a value, we have reconnected
if ($self->{reconnect_attempt}) {
$self->{reconnect_attempt} = 0;
$self->do_callback('Reconnected');
}

$self->{HeapTCP} = $heap;
$self->{is_stopped} = 0;
}

sub tcp_server_flush {
Expand Down Expand Up @@ -801,6 +834,64 @@ sub tcp_server_error {
$self->{Logger}->error("TCP error: $name (num: $num, string: $string)");
}

sub tcp_connect_error {
my $self = shift;
my ($kernel, $heap, $name, $num, $string) = @_[KERNEL, HEAP, ARG0, ARG1, ARG2];

$self->{Logger}->error("TCP connect error: $name (num: $num, string: $string)");
$kernel->post($self->{AliasTCP}, 'reconnect_delayed') if $self->{Reconnect};
}

sub tcp_disconnected {
my $self = shift;
my ($kernel, $heap) = @_[KERNEL, HEAP];

$self->{Logger}->error("TCP connection is disconnected");

# The flag 'is_stopping' will be 1 if server_disconnect was explicitly called
return if $self->{is_stopping};

# We are here due to an error; we should record that we're stopped, and try and reconnect
$self->{is_stopped} = 1;
$self->{is_started} = 0;
$self->{wait_synchronous} = {};

if ($self->{Reconnect}) {
$kernel->post($self->{AliasTCP}, 'reconnect_delayed');
}

$self->do_callback('Disconnected');
}

sub tcp_reconnect_delayed {
my $self = shift;
my ($kernel, $heap) = @_[KERNEL, HEAP];

return unless $self->{Reconnect};

# Pick a new RemoteAddress if there's more than one
if (ref $self->{RemoteAddress}) {
$self->{current_RemoteAddress} = shift @{ $self->{RemoteAddress} };
push @{ $self->{RemoteAddress} }, $self->{current_RemoteAddress};
}

my $delay = 2 ** ++$self->{reconnect_attempt};
$self->{Logger}->info("Reconnecting to '$$self{current_RemoteAddress}' in $delay sec");

# This state is in the TCP session, so we can call 'reconnect' directly
$kernel->delay('reconnect', $delay, $self->{current_RemoteAddress}, $self->{RemotePort});
}

sub do_callback {
my ($self, $callback) = @_;

return unless $self->{Callbacks}{$callback};
foreach my $subref (@{ $self->{Callbacks}{$callback} }) {
$subref->($self);
}
return;
}

{
package POE::Component::Client::AMQP::FakeLogger;

Expand Down
2 changes: 1 addition & 1 deletion lib/POE/Component/Client/AMQP/Queue.pm
Expand Up @@ -101,7 +101,7 @@ sub created {
my $self = shift;

$self->{is_created} = 1;
while (my $callback = shift @{ $self->{on_created} }) {
foreach my $callback (@{ $self->{on_created} }) {
$callback->();
}
}
Expand Down

0 comments on commit f5e5cbc

Please sign in to comment.