Skip to content

Commit

Permalink
Adjust subscriber() role to recover the subscriptions after reconnect
Browse files Browse the repository at this point in the history
Also updated the callbacks to the new hooks of {Net|Protocol}::SAPO::Broker.

Signed-off-by: Pedro Melo <melo@simplicidade.org>
  • Loading branch information
melo committed Sep 1, 2008
1 parent 43792ec commit 6bcdac0
Showing 1 changed file with 47 additions and 40 deletions.
87 changes: 47 additions & 40 deletions App-SAPO-Broker-Utils/script/sapo-broker-client
Expand Up @@ -146,45 +146,49 @@ else {
### Roles

sub subscriber {
my $sb = get_agent_connection();

foreach my $topic (@topics) {
$sb->subscribe({
topic => $topic,
on_success => sub {
diag("%% Subscribed topic '$topic'.");
},
on_error => sub {
diag("ERROR subscribing to topic '$topic':");
foreach my $e (@_) {
next if ref($e);
diag(" $e");
my $on_connected = sub {
my ($sb) = @_;

foreach my $topic (@topics) {
$sb->subscribe({
topic => $topic,
on_success => sub {
diag("%% Subscribed topic '$topic'.");
},
on_error => sub {
diag("!! ERROR subscribing to topic '$topic':");
foreach my $e (@_) {
next if ref($e);
diag(" $e");
}
},
on_message => sub {
my (undef, $notif) = @_;
my $payload = $notif->payload;
my $topic = $notif->topic;
my ($now, $count, $rate, $delta) = _calc_rate($topic);

my $rt = sprintf('%0.3f mesg/sec (%d messages)', $rate, $count);
my $ts = localtime($now->[0]);
substr($ts, -5, 0, '.'.substr($now->[1], 0, 4));
$ts .= sprintf(' (%0.3f elapsed since last message)', $delta) if $delta;

my $len = length($payload);

diag("** Got message on topic '$topic':");
diag("** at $ts");
diag("** rate for topic is $rt");
diag();
diag("-- Message starts (length $len)");
print $payload,"\n";
diag("-- Message ends");
diag();
}
},
on_message => sub {
my (undef, $notif) = @_;
my $payload = $notif->payload;
my $topic = $notif->topic;
my ($now, $count, $rate, $delta) = _calc_rate($topic);
})
}
};

my $rt = sprintf('%0.3f mesg/sec (%d messages)', $rate, $count);
my $ts = localtime($now->[0]);
substr($ts, -5, 0, '.'.substr($now->[1], 0, 4));
$ts .= sprintf(' (%0.3f elapsed since last message)', $delta) if $delta;

my $len = length($payload);

diag("** Got message on topic '$topic':");
diag("** at $ts");
diag("** rate for topic is $rt");
diag();
diag("-- Message starts (length $len)");
print $payload,"\n";
diag("-- Message ends");
diag();
}
})
}
my $sb = get_agent_connection($on_connected);

# Temp hack, deliver_messages() should wait forever
while ($sb->state eq 'connected') { $sb->deliver_messages(5) };
Expand Down Expand Up @@ -444,6 +448,8 @@ sub max_rate {
### Agent connection

sub get_agent_connection {
my ($on_connected_cb) = @_;

my $sb = Net::SAPO::Broker->new({
host => $host,
port => $port,
Expand All @@ -453,16 +459,17 @@ sub get_agent_connection {
my $agent = $sb->host;
diag("## Connecting to agent at '$agent'");
},
on_connected => sub {
on_state_connected => sub {
diag("## Connected!");
$on_connected_cb->(@_) if $on_connected_cb;
},
on_eof => sub {
on_state_eof => sub {
diag("## Lost connection to agent");
},
on_state_disconnecting => sub {
diag("## Disconnecting");
},
on_reconnect => sub {
on_state_reconnecting => sub {
diag("## Reconnecting");
},
on_read_error => sub {
Expand Down

0 comments on commit 6bcdac0

Please sign in to comment.