Skip to content
This repository
Browse code

Merge pull request #15 from gittex/multi-socket

Add multi socket support
  • Loading branch information...
commit 580a9f040e4a7ef8309d79f08b62f547f9603ec1 2 parents 18d1a36 + a61cbf8
Martin Atkins apparentlymart authored

Showing 2 changed files with 264 additions and 18 deletions. Show diff stats Hide diff stats

  1. +129 18 lib/DJabberd.pm
  2. +135 0 t/new-connects-multiport.t
147 lib/DJabberd.pm
@@ -53,23 +53,45 @@ $SIG{USR2} = sub { Carp::cluck("USR2") };
53 53
54 54 sub new {
55 55 my ($class, %opts) = @_;
  56 +
  57 + my $s2s_port = delete $opts{s2s_port};
  58 + my $c2s_port = delete $opts{c2s_port};
  59 + my $ssl_port = delete $opts{ssl_port};
56 60
57 61 my $self = {
58 62 'daemonize' => delete $opts{daemonize},
59   - 's2s_port' => delete $opts{s2s_port},
60   - 'c2s_port' => delete($opts{c2s_port}) || 5222, # {=clientportnumber}
61 63 'old_ssl' => delete $opts{old_ssl},
62 64 'vhosts' => {},
63 65 'fake_peers' => {}, # for s2s testing. $hostname => "ip:port"
64 66 'share_parsers' => 1,
65 67 'monitor_host' => {},
66 68 };
67   -
  69 +
68 70 # if they set s2s_port to explicitly 0, it's disabled for all vhosts
69 71 # but not setting it means 5269 still listens, if vhosts are configured
70 72 # for s2s.
71 73 # {=serverportnumber}
72   - $self->{s2s_port} = 5269 unless defined $self->{s2s_port};
  74 + if(defined($s2s_port) && ref($s2s_port) eq 'HASH') { # hashref
  75 + $self->{s2s_port} = $s2s_port;
  76 + } elsif($s2s_port && !ref($s2s_port)) { # scalar
  77 + $self->{s2s_port}->{$s2s_port}++;
  78 + } elsif(!defined($s2s_port)) { # default
  79 + $self->{s2s_port}->{5269}++;
  80 + }
  81 +
  82 + if(defined($c2s_port) && ref($c2s_port) eq 'HASH') { # hashref
  83 + $self->{c2s_port} = $c2s_port;
  84 + } elsif($c2s_port && !ref($c2s_port)) { # scalar
  85 + $self->{c2s_port}->{$c2s_port}++;
  86 + } else { # default
  87 + $self->{c2s_port}->{5222}++;
  88 + }
  89 +
  90 + if(defined($ssl_port) && ref($ssl_port) eq 'HASH') {
  91 + $self->{ssl_port} = $ssl_port;
  92 + } elsif($ssl_port && !ref($ssl_port)) { # scalar
  93 + $self->{ssl_port}->{$ssl_port}++;
  94 + }
73 95
74 96 croak("Unknown server parameters: " . join(", ", keys %opts)) if %opts;
75 97
@@ -133,29 +155,88 @@ sub set_config_oldssl {
133 155 $self->{old_ssl} = as_bool($val);
134 156 }
135 157
  158 +sub add_config_sslport {
  159 + my ($self, $val) = @_;
  160 + $self->{ssl_port}->{as_bind_addr($val)}++;
  161 +}
  162 +
  163 +sub set_config_sslport {
  164 + my ($self, $val) = @_;
  165 + $self->{ssl_port} = {
  166 + as_bind_addr($val) => 1,
  167 + };
  168 +}
  169 +
  170 +sub add_config_unixdomainsocket {
  171 + my ($self, $val) = @_;
  172 + $self->{unixdomainsocket}->{$val}++;
  173 +}
  174 +
136 175 sub set_config_unixdomainsocket {
137 176 my ($self, $val) = @_;
138   - $self->{unixdomainsocket} = $val;
  177 + $self->{unixdomainsocket} = {
  178 + $val => 1,
  179 + };
139 180 }
140 181
141 182 sub set_config_clientport {
142 183 my ($self, $val) = @_;
143   - $self->{c2s_port} = as_bind_addr($val);
  184 +
  185 + $self->{c2s_port} = {
  186 + as_bind_addr($val) => 1,
  187 + };
  188 +}
  189 +
  190 +sub add_config_clientport {
  191 + my ($self, $val) = @_;
  192 + # necessary because the default in the constructor
  193 + # will be 5222 and subsequent binds would fail
  194 + if($self->{c2s_port} && ref($self->{c2s_port}) && exists $self->{c2s_port}->{5222}) {
  195 + delete $self->{c2s_port}->{5222};
  196 + }
  197 + $self->{c2s_port}->{as_bind_addr($val)}++;
144 198 }
145 199
146 200 sub set_config_serverport {
147 201 my ($self, $val) = @_;
148   - $self->{s2s_port} = as_bind_addr($val);
  202 +
  203 + $self->{s2s_port} = {
  204 + as_bind_addr($val) => 1,
  205 + };
  206 +}
  207 +
  208 +sub add_config_serverport {
  209 + my ($self, $val) = @_;
  210 + # necessary because the default in the constructor
  211 + # will be 5269 and subsequent binds would fail
  212 + if($self->{s2s_port} && ref($self->{s2s_port}) && exists $self->{s2s_port}->{5269}) {
  213 + delete $self->{s2s_port}->{5269};
  214 + }
  215 + $self->{s2s_port}->{as_bind_addr($val)}++;
149 216 }
150 217
151 218 sub set_config_adminport {
152 219 my ($self, $val) = @_;
153   - $self->{admin_port} = as_bind_addr($val);
  220 + $self->{admin_port} = {
  221 + as_bind_addr($val) => 1,
  222 + };
  223 +}
  224 +
  225 +sub add_config_adminport {
  226 + my ($self, $val) = @_;
  227 + $self->{admin_port}->{as_bind_addr($val)}++;
154 228 }
155 229
156 230 sub set_config_intradomainlisten {
157 231 my ($self, $val) = @_;
158   - $self->{cluster_listen} = $val;
  232 + $self->{cluster_listen} = {
  233 + $val => 1,
  234 + };
  235 +}
  236 +
  237 +sub add_config_intradomainlisten {
  238 + my ($self, $val) = @_;
  239 + $self->{cluster_listen}->{$val}++;
159 240 }
160 241
161 242 sub set_config_pidfile {
@@ -294,13 +375,29 @@ sub run {
294 375
295 376 $self->start_cluster_server() if $self->{cluster_listen};
296 377
297   - $self->_start_server($self->{admin_port}, "DJabberd::Connection::Admin") if $self->{admin_port};
  378 + $self->_start_servers($self->{admin_port}, "DJabberd::Connection::Admin") if $self->{admin_port};
298 379
299 380 DJabberd::Connection::Admin->on_startup;
300 381 Danga::Socket->EventLoop();
301 382 unlink($self->{pid_file}) if (-f $self->{pid_file});
302 383 }
303 384
  385 +# this will start up one server for each defined listening socket
  386 +sub _start_servers {
  387 + my ($self, $localaddrs, $class) = @_;
  388 +
  389 + my @addrs;
  390 + if(ref($localaddrs) eq 'HASH') {
  391 + @addrs = sort keys %$localaddrs;
  392 + } else {
  393 + @addrs = ($localaddrs);
  394 + }
  395 +
  396 + foreach my $localaddr (@addrs) {
  397 + $self->_start_server($localaddr, $class);
  398 + }
  399 +}
  400 +
304 401 sub _start_server {
305 402 my ($self, $localaddr, $class) = @_;
306 403
@@ -326,7 +423,7 @@ sub _start_server {
326 423 Proto => IPPROTO_TCP,
327 424 Reuse => 1,
328 425 Listen => 10 )
329   - or $logger->logdie("Error creating socket: $@\n");
  426 + or $logger->logdie("Error creating socket for $localaddr: $@\n");
330 427
331 428 my $success = $server->blocking(0);
332 429
@@ -374,27 +471,36 @@ sub _start_server {
374 471
375 472 sub start_c2s_server {
376 473 my $self = shift;
377   - $self->_start_server($self->{c2s_port},
  474 + $self->_start_servers($self->{c2s_port},
378 475 "DJabberd::Connection::ClientIn");
379 476
380   - if ($self->{old_ssl}) {
381   - $self->_start_server(5223, "DJabberd::Connection::OldSSLClientIn");
  477 + # usually setting OldSSL 1 in the config file
  478 + # the server would magically listen on port 5223
  479 + # however this conflicts a litte with the new
  480 + # multi socket capabilities so add a new keyword
  481 + # SSLPort which works like e.g. ClientPort and
  482 + # have a fallback for old configs here
  483 + if ($self->{old_ssl} || $self->{ssl_port}) {
  484 + if(!$self->{ssl_port}) {
  485 + $self->{ssl_port}->{5223}++;
  486 + }
  487 + $self->_start_servers($self->{ssl_port}, "DJabberd::Connection::OldSSLClientIn");
382 488 }
383 489
384 490 if ($self->{unixdomainsocket}) {
385   - $self->_start_server($self->{unixdomainsocket}, "DJabberd::Connection::ClientIn");
  491 + $self->_start_servers($self->{unixdomainsocket}, "DJabberd::Connection::ClientIn");
386 492 }
387 493 }
388 494
389 495 sub start_s2s_server {
390 496 my $self = shift;
391   - $self->_start_server($self->{s2s_port},
  497 + $self->_start_servers($self->{s2s_port},
392 498 "DJabberd::Connection::ServerIn");
393 499 }
394 500
395 501 sub start_cluster_server {
396 502 my $self = shift;
397   - $self->_start_server($self->{cluster_listen},
  503 + $self->_start_servers($self->{cluster_listen},
398 504 "DJabberd::Connection::ClusterIn");
399 505 }
400 506
@@ -458,7 +564,12 @@ sub _load_config_ref {
458 564 my $key = lc $1;
459 565 my $val = $2;
460 566 my $inv = $plugin || $vhost || $self;
461   - my $meth = "set_config_$key";
  567 + my $meth = "add_config_$key";
  568 + if ($inv->can($meth)) {
  569 + $inv->$meth($val);
  570 + return;
  571 + }
  572 + $meth = "set_config_$key";
462 573 if ($inv->can($meth)) {
463 574 $inv->$meth($val);
464 575 return;
135 t/new-connects-multiport.t
... ... @@ -0,0 +1,135 @@
  1 +#!/usr/bin/perl
  2 +
  3 +use strict;
  4 +use Test::More 'no_plan';
  5 +use IO::Socket::INET6;
  6 +use Devel::Peek;
  7 +use lib 't/lib';
  8 +require 'djabberd-test.pl';
  9 +
  10 +my @client_ports = qw(127.0.0.3:5222 127.0.0.1:11001 127.0.0.2:11001 [::1]:11001 11002 11003);
  11 +my @server_ports = qw(127.0.0.3:5269 127.0.0.1:12001 127.0.0.2:12001 [::1]:12001 12002 12003);
  12 +my @admin_ports = qw(127.0.0.3:5200 127.0.0.1:14001 127.0.0.2:14001 [::1]:14001 14002 14003);
  13 +
  14 +my $vhost = DJabberd::VHost->new(
  15 + server_name => 'jabber.example.com',
  16 + s2s => 0,
  17 + plugins => [
  18 + DJabberd::Authen::AllowedUsers->new(
  19 + policy => "deny",
  20 + allowedusers => [qw(tester)]
  21 + ),
  22 + DJabberd::Authen::StaticPassword->new(
  23 + password => "password"
  24 + ),
  25 + DJabberd::PresenceChecker::Local->new(),
  26 + DJabberd::Delivery::Local->new(),
  27 + DJabberd::Delivery::S2S->new(),
  28 + DJabberd::RosterStorage::InMemoryOnly->new(),
  29 + ],
  30 +);
  31 +
  32 +my $server = DJabberd->new();
  33 +
  34 +foreach my $client_port (@client_ports) {
  35 + $server->add_config_clientport($client_port);
  36 +}
  37 +
  38 +foreach my $server_port (@server_ports) {
  39 + $server->add_config_serverport($server_port);
  40 +}
  41 +
  42 +foreach my $admin_port (@admin_ports) {
  43 + $server->add_config_adminport($admin_port);
  44 +}
  45 +
  46 +$server->add_vhost($vhost);
  47 +
  48 +my $childpid = fork;
  49 +if (!$childpid) {
  50 + $0 = "[djabberd]";
  51 + $server->run;
  52 + exit 0;
  53 +}
  54 +
  55 +my $err = sub {
  56 + kill 9, $childpid;
  57 + die $_[0];
  58 +};
  59 +
  60 +my $conn;
  61 +
  62 +foreach my $addr (@client_ports, @server_ports, @admin_ports) {
  63 + foreach my $try ( 1 .. 3 ) {
  64 + unless ($addr =~ /:\d+$/) {
  65 + $addr = '[::1]:'.$addr;
  66 + }
  67 + diag("Connecting to $addr ...");
  68 + $conn = IO::Socket::INET6->new(
  69 + PeerAddr => $addr,
  70 + Timeout => 1,
  71 + );
  72 + last if $conn;
  73 + sleep 1;
  74 + }
  75 + $err->("Can't connect to server")
  76 + unless $conn;
  77 +
  78 + $conn->close;
  79 +}
  80 +
  81 +END {
  82 + kill 9, $childpid;
  83 +}
  84 +
  85 +foreach my $client_port (@client_ports) {
  86 + my $max_connections = 200;
  87 + foreach my $n (1 .. $max_connections) {
  88 +
  89 + my @events;
  90 + my ($handler, $p);
  91 + $handler = DJabberd::TestSAXHandler->new(\@events);
  92 + $p = DJabberd::XMLParser->new( Handler => $handler );
  93 +
  94 + my $get_event = sub {
  95 + while (! @events) {
  96 + my $byte;
  97 + my $rv = sysread($conn, $byte, 1);
  98 + $p->parse_more($byte);
  99 + }
  100 + return shift @events;
  101 + };
  102 +
  103 + my $get_stream_start = sub {
  104 + my $ev = $get_event->();
  105 + die unless $ev && $ev->isa("DJabberd::StreamStart");
  106 + return $ev;
  107 + };
  108 +
  109 + diag("connect $n/$max_connections\n") if $n % 50 == 0;
  110 + unless ($client_port =~ /:\d+$/) {
  111 + $client_port = '[::1]:'.$client_port;
  112 + }
  113 + $conn = IO::Socket::INET6->new(
  114 + PeerAddr => $client_port,
  115 + Timeout => 1,
  116 + );
  117 +
  118 + print $conn qq{
  119 + <stream:stream
  120 + xmlns:stream='http://etherx.jabber.org/streams' to='jabber.example.com'
  121 + xmlns='jabber:client'>
  122 + };
  123 +
  124 + my $ss = $get_stream_start->();
  125 + die unless $ss && $ss->id;
  126 +
  127 + if ($n == 1) {
  128 + ok($ss, "got a stream back");
  129 + ok($ss->id, "got a stream id back");
  130 + }
  131 +
  132 + $p->finish_push;
  133 + $conn->close;
  134 + }
  135 +}

0 comments on commit 580a9f0

Please sign in to comment.
Something went wrong with that request. Please try again.