Skip to content

Commit

Permalink
connect & subscribe with custom headers & ids
Browse files Browse the repository at this point in the history
  • Loading branch information
dakkar committed Jul 14, 2011
1 parent 5d0e35b commit bf4a216
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 34 deletions.
245 changes: 212 additions & 33 deletions lib/Catalyst/Engine/Stomp.pm
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ Catalyst::Engine::Stomp - write message handling apps with Catalyst.
{
'hostname' => 'localhost',
'port' => '61613'
connect_headers => {
login => 'myuser',
passcode => 'mypassword',
},
},
{
'hostname' => 'stomp.yourmachine.com',
Expand Down Expand Up @@ -98,6 +102,134 @@ compatibility)
=back
=head2 Connction and Subscription Headers
You can specify custom headers to send with the C<CONNECT> and
C<SUBSCRIBE> STOMP messages. You can specify them globally:
MyApp->config(
Engine::Stomp' = {
'servers' => [
{
'hostname' => 'localhost',
'port' => '61613'
},
],
subscribe_headers => {
transformation => 'jms-to-json',
},
connect_headers => {
login => 'myuser',
passcode => 'mypassword',
},
},
);
per server:
MyApp->config(
Engine::Stomp' = {
'servers' => [
{
'hostname' => 'localhost',
'port' => '61613'
subscribe_headers => {
strange_stuff => 'something',
},
connect_headers => {
login => 'myuser',
passcode => 'mypassword',
},
},
],
},
);
or per-controller (subscribe headers only):
package MyApp::Controller::Special;
use Moose;
BEGIN { extends 'Catalyst::Controller::MessageDriven' };
has stomp_destination => (
is => 'ro',
isa => 'Str',
default => '/topic/crowded',
);
has stomp_subscribe_headers => (
is => 'ro',
isa => 'HashRef',
default => sub { +{
selector => q{custom_header = '1' or JMSType = 'test_foo'},
} },
);
This is very useful to set filters / selectors on the subscription.
There are a few caveats, mostly summarized by "if you do confusing
things, the program may not work".
=over 4
=item *
you can have the C<stomp_destination> and the C<action_namespace>
different in a single controller, but this may become confusing if you
have more than one controller subscribed to the same destination; you
can remove some of the confusion by restricting the kind of messages
that each subscription receives
=item *
if you filter out some messages, don't be surprised if they are never
received by your application
=item *
you can set persistent topic subscriptions, to prevent message loss
during reconnects (the broker will remember your subscription and keep
the messages while you are not connected):
MyApp->config(
Engine::Stomp' = {
'servers' => [
{
'hostname' => 'localhost',
'port' => '61613'
},
],
connect_headers => {
'client-id' => 'myapp',
},
},
);
package MyApp::Controller::Persistent;
use Moose;
BEGIN { extends 'Catalyst::Controller::MessageDriven' };
has stomp_destination => (
is => 'ro',
isa => 'Str',
default => '/topic/important',
);
has stomp_subscribe_headers => (
is => 'ro',
isa => 'HashRef',
default => sub { +{
'activemq.subscriptionName' => 'important-subscription',
} },
);
According to the ActiveMQ docs, the C<client-id> must be globally
unique, and C<activemq.subscriptionName> must be unique within the
client. Non-ActiveMQ brokers may use different headers to specify the
subscription name.
=back
=head1 FAILOVER
You can specify one or more servers in a list for the apps config.
Expand Down Expand Up @@ -173,21 +305,51 @@ Only after handling a request does it check the flag.
=cut

sub _qualify_destination {
my ($self,$dest) = @_;
my ($self,$unq_dest) = @_;

my $ret = $dest;
if ($dest !~ m{^ /? (?: queue | topic ) / }x) {
$ret = "/queue/$dest";
my $q_dest = $unq_dest;
if ($unq_dest !~ m{^ /? (?: queue | topic ) / }x) {
$q_dest = "/queue/$unq_dest";
}

# normalize slashes
$dest =~ s{^/}{};
$ret =~ s{/+}{/};
$ret = "/$ret" unless $ret =~ m{^/};
$unq_dest =~ s{^/}{};
$q_dest =~ s{/+}{/};
$q_dest = "/$q_dest" unless $q_dest =~ m{^/};

return ($unq_dest,$q_dest);
}

sub _collect_destinations {
my ($self,$app) = @_;

my $sub_id=1;

my @dests;
for my $ctrl_name ($app->controllers) {
my $ctrl = $app->controller($ctrl_name);
my $dest_call = $ctrl->can('stomp_destination');
my $subh_call = $ctrl->can('stomp_subscribe_headers');
my ($unq_dest,$q_dest,$subh);
$unq_dest = $dest_call ? $ctrl->$dest_call() : $ctrl->action_namespace();
($unq_dest,$q_dest) = $self->_qualify_destination($unq_dest);
$subh = $subh_call ? $ctrl->$subh_call() : { };

push @dests,{
destination => $q_dest,
subscribe_headers => {
%$subh,
id => $sub_id,
}
};

$self->destination_namespace_map->{$ret} = $dest;
$self->destination_namespace_map->{$q_dest} =
$self->destination_namespace_map->{"/subscription/$sub_id"} =
$ctrl->action_namespace();
++$sub_id;
}

return $ret;
return @dests;
}

sub run {
Expand All @@ -198,12 +360,7 @@ sub run {
die 'No Engine::Stomp configuration found'
unless ref $app->config->{'Engine::Stomp'} eq 'HASH';

my @destinations =
uniq
map { $self->_qualify_destination($_) }
grep { length $_ }
map { $app->controller($_)->action_namespace }
$app->controllers;
my @destinations = $self->_collect_destinations($app);

# connect up
my $config = $app->config->{'Engine::Stomp'};
Expand All @@ -212,9 +369,11 @@ sub run {
# munge the configuration to make it easier to write
$config->{tries_per_server} ||= 1;
$config->{connect_retry_delay} ||= 15;
$config->{subscribe_headers} ||= {};
die("subscribe_headers config for Engine::Stomp must be a hashref!\n")
if (ref($config->{subscribe_headers}) ne 'HASH');
for my $h (qw(connect subscribe)) {
$config->{"${h}_headers"} ||= {};
die("${h}_headers config for Engine::Stomp must be a hashref!\n")
if (ref($config->{"${h}_headers"}) ne 'HASH');
}

if (! $config->{servers} ) {
$config->{servers} = [ {
Expand Down Expand Up @@ -245,32 +404,46 @@ sub run {
++$tries;

eval {
$template{subscribe_headers} ||= {};
die("subscribe_headers config for server $config->{hostname}:$config->{port} in Engine::Stomp must be a hashref!\n")
if (ref($template{subscribe_headers}) ne 'HASH');
for my $h (qw(connect subscribe)) {
$template{"${h}_headers"} ||= {};
die("${h}_headers config for for server $config->{hostname}:$config->{port} in Engine::Stomp must be a hashref!\n")
if (ref($template{"${h}_headers"}) ne 'HASH');
}

my $per_server_connect_headers = {
%{$config->{connect_headers}},
%{$template{connect_headers}},
};

my $subscribe_headers = {
my $per_server_subscribe_headers = {
%{$config->{subscribe_headers}},
%{$template{subscribe_headers}},
};

$app->log->info("Connecting to STOMP Q at " . $template{hostname}.':'.$template{port});
$app->log->debug("Connecting to STOMP Q at " . $template{hostname}.':'.$template{port})
if $app->debug;

$self->connection(Net::Stomp->new(\%template));
$self->connection->connect();
$self->connection->connect($per_server_connect_headers);
$self->conn_desc($template{hostname}.':'.$template{port});

# subscribe, with client ack.
foreach my $destination (@destinations) {
$app->log->info(
"subscribing to $destination",
'which is mapped to',
$self->destination_namespace_map->{$destination},
);
my $dest_name = $destination->{destination};
my $local_subscribe_headers =
$destination->{subscribe_headers};
my $id = $local_subscribe_headers->{id};
$app->log->debug(
"subscribing to $dest_name ($id) ".
'which is mapped to '.
$self->destination_namespace_map->{"/subscription/$id"}
)
if $app->debug;

$self->connection->subscribe({
%$subscribe_headers,
destination => $destination,
%$per_server_subscribe_headers,
%$local_subscribe_headers,
destination => $dest_name,
ack => 'client',
});
}
Expand All @@ -282,7 +455,7 @@ sub run {
while (1) {
my $frame = $self->connection->receive_frame(); # block
$self->handle_stomp_frame($app, $frame);

if ( $ENV{ENGINE_ONESHOT} || $stop ){
# Perl does not like 'last QUITLOOP' inside an eval, hence we die and do it
die "QUITLOOP\n";
Expand Down Expand Up @@ -381,7 +554,13 @@ sub handle_stomp_message {

# destination -> controller
my $destination = $frame->headers->{destination};
my ($controller) = $self->destination_namespace_map->{$destination};
my $subscription = $frame->headers->{subscription};

$app->log->debug("message from $destination ($subscription)")
if $app->debug;

my $controller = $self->destination_namespace_map->{"/subscription/$subscription"}
|| $self->destination_namespace_map->{$destination};

# set up request
my $config = $app->config->{'Engine::Stomp'};
Expand Down
Loading

0 comments on commit bf4a216

Please sign in to comment.