Skip to content
Browse files

New version - v1.31

  - [BUG] Perl does not call flush() automatically before socket shutdown(). It
    sometimes (unstable!) causes unexpected SIGPIPEs and data loss. Fixed: now flush()
    is called manually.
  - [BUG] STATS command is not processed twice anymore.
  - [NEW] Ability to limit memory usage and auto-restart the daemon if it
    consumes too much memory. (Note that unsent data is lost during this restart.)
  - [NEW} PHP API: cmdOnlineWithCounters(): for each online ID also returns
    the number of browsers connected just now (it is NOT a "number of online
    users who listen this channel", but its approximation).
  • Loading branch information...
1 parent af15ba0 commit 7ba2fd52da7c71c2a3fac3f46b10d9313e73420c @DmitryKoterov committed
View
33 Connection/In.pm
@@ -119,6 +119,7 @@ sub try_process_cmd {
$self->{data} =~ m/(?: ^ | \r?\n\r?\n) (ONLINE|STATS|WATCH) (?:\s+ ([^\r\n]*) )? (?: $tail_re )/six or return 0;
# Cmd extracted, process it.
$self->{pairs} = undef;
+ $self->{data} = undef;
# Assert authorization.
$self->assert_auth();
my $cmd = uc $1;
@@ -209,7 +210,7 @@ sub cmd_online {
$self->_async(sub {
my $rids = $online_timers->get_ids_ref($self->_id_prefixes_to_re($id_prefixes));
$self->debug("sending " . scalar(@$rids) . " online identifiers");
- $self->_send_response(@$rids? join(",", @$rids) . "\n" : "");
+ $self->_send_response(join("", map { "$_ " . $connected_fhs->get_num_fhs_by_id($_) . "\n" } @$rids));
});
}
@@ -231,21 +232,19 @@ sub cmd_watch {
sub cmd_stats {
my ($self) = @_;
return if $self->{cred};
- $self->_async(sub {
- $self->debug("sending stats");
- $self->_send_response(
- "[data_to_send]\n" .
- $data_to_send->get_stats() .
- "\n[connected_fhs]\n" .
- $connected_fhs->get_stats() .
- "\n[online_timers]\n" .
- $online_timers->get_stats() .
- "\n[cleanup_timers]\n" .
- $cleanup_timers->get_stats() .
- "\n[pairs_by_fhs]\n" .
- $pairs_by_fhs->get_stats()
- );
- });
+ $self->debug("sending stats");
+ $self->_send_response(
+ "[data_to_send]\n" .
+ $data_to_send->get_stats() .
+ "\n[connected_fhs]\n" .
+ $connected_fhs->get_stats() .
+ "\n[online_timers]\n" .
+ $online_timers->get_stats() .
+ "\n[cleanup_timers]\n" .
+ $cleanup_timers->get_stats() .
+ "\n[pairs_by_fhs]\n" .
+ $pairs_by_fhs->get_stats()
+ );
}
# Send response anc close the connection.
@@ -256,7 +255,9 @@ sub _send_response {
print $fh "Content-Type: text/plain\r\n";
print $fh "Content-Length: " . length($$rdata) . "\r\n\r\n";
print $fh $$rdata;
+ $fh->flush(); # MUST be executed! else SIGPIPE may be issued
shutdown($fh, 2);
+ $self->{data} = undef;
}
return 1;
View
1 Connection/Wait.pm
@@ -111,6 +111,7 @@ sub ontimeout {
my ($self) = @_;
my $fh = $self->fh;
if ($fh) {
+ $fh->flush();
shutdown($fh, 2);
}
$self->SUPER::ontimeout();
View
13 README.txt
@@ -1,4 +1,4 @@
-Dklab Realplexor v1.30: Comet server which handles 1000000+ parallel browser connections.
+Dklab Realplexor v1.31: Comet server which handles 1000000+ parallel browser connections.
Author: Dmitry Koterov, dkLab (C)
Home page: http://dklab.ru/lib/dklab_realplexor/
Changelog: http://github.com/DmitryKoterov/dklab_realplexor/commits/master/
@@ -94,6 +94,17 @@ events
CHANGELOG
---------
+* Dklab Realplexor 2010-04-16: v1.31
+ - [BUG] Perl does not call flush() automatically before socket shutdown(). It
+ sometimes (unstable!) causes unexpected SIGPIPEs and data loss. Fixed: now flush()
+ is called manually.
+ - [BUG] STATS command is not processed twice anymore.
+ - [NEW] Ability to limit memory usage and auto-restart the daemon if it
+ consumes too much memory. (Note that unsent data is lost during this restart.)
+ - [NEW} PHP API: cmdOnlineWithCounters(): for each online ID also returns
+ the number of browsers connected just now (it is NOT a "number of online
+ users who listen this channel", but its approximation).
+
* Dklab Realplexor 2010-02-27: v1.30
- [SPD] Use EV library (http://search.cpan.org/~mlehmann/EV-3.9/EV.pm)
instead of libevent. It is faster and has no memory leaks.
View
2 Realplexor/Common.pm
@@ -97,6 +97,7 @@ sub _shutdown_fh {
$connected_fhs->del_from_id_by_fh($pair->[1], $fh);
}
$pairs_by_fhs->remove_by_fh($fh);
+ $fh->flush(); # MUST be executed! shutdown() does not issue flush()!
return shutdown($fh, 2);
}
@@ -238,6 +239,7 @@ sub send_static {
print $fh "Cache-Control: public\r\n";
print $fh "\r\n";
print $fh $CONFIG{"${param}_CONTENT"};
+ $fh->flush(); # MUST be executed! shutdown() does not issue flush()!
shutdown($fh, 2); # don't use close, it breaks event machine!
}
View
4 Realplexor/Config.pm
@@ -20,7 +20,7 @@ my $root = dirname(dirname(abs_path(__FILE__)));
# Load config.
sub load {
- my ($add) = @_;
+ my ($add, $silent) = @_;
# Reset config.
%CONFIG = ();
# Read default config.
@@ -28,7 +28,7 @@ sub load {
# Read custom config.
if ($add) {
if (-f $add) {
- Realplexor::Common::logger("CONFIG: appending configuration from $add");
+ Realplexor::Common::logger("CONFIG: appending configuration from $add") if !$silent;
do($add); die $@ if $@;
} else {
Realplexor::Common::logger("CONFIG: file $add does not exist, skipping");
View
4 Realplexor/Event/Server.pm
@@ -101,12 +101,12 @@ sub handle_read {
$connection->onerror("An error returned to event handler");
return 0;
}
-
+
# Read the next data chunk.
local $/;
my $data = <$fh>;
- # End of the request reached.
+ # End of the request reached (must never reach be cause of eof() check above?).
if (!defined $data) {
return 0;
}
View
43 Realplexor/Tools.pm
@@ -5,6 +5,7 @@ package Realplexor::Tools;
use strict;
use Time::HiRes;
use Math::BigFloat;
+use POSIX ":sys_wait_h";
# Counter to make the time unique.
my $time_counter = 0;
@@ -36,4 +37,46 @@ sub rerun_unlimited {
}
}
+# Returns amount of used memory by pid (in megabytes).
+sub get_memory_usage {
+ my ($pid) = @_;
+ my $mem = `ps -p $pid -o rss --no-headers`;
+ return 0 if !$mem;
+ $mem =~ s/\s+//sg;
+ return $mem / 1024;
+}
+
+# Wait for a process termination.
+# If the process limits memory usage, kills it.
+sub wait_pid_with_memory_limit {
+ my ($pid, $limit) = @_;
+ while (waitpid($pid, WNOHANG) != -1) {
+ sleep(1);
+ my $mem = get_memory_usage($pid);
+ if ($limit && int($mem) > int($limit)) {
+ print STDERR sprintf "Daemon process uses %d MB of memory which is larger than %d MB. Killing...\n", $mem, $limit;
+ graceful_kill($pid);
+ last;
+ }
+ }
+
+}
+
+# Gracefully kills a process.
+sub graceful_kill {
+ my ($pid, $pid_file) = @_;
+ kill(2, $pid);
+ sleep(1);
+ if (kill(0, $pid)) {
+ kill(9, $pid);
+ print STDERR "Killed the child using a heavy SIGKILL.\n";
+ } else {
+ print STDERR "Normally terminated.\n";
+ }
+ if ($pid_file) {
+ unlink($pid_file);
+ }
+ $pid = undef;
+}
+
return 1;
View
5 Storage/ConnectedFhs.pm
@@ -38,6 +38,11 @@ sub get_num_items {
return scalar(keys %$this);
}
+sub get_num_fhs_by_id {
+ my ($this, $id) = @_;
+ return $this->{$id}? scalar(keys %{$this->{$id}}) : 0;
+}
+
sub get_stats {
my ($this) = @_;
my @result = ();
View
41 api/php/Dklab/Realplexor.php
@@ -2,7 +2,7 @@
/**
* Dklab_Realplexor PHP API.
*
- * @version 1.24
+ * @version 1.31
*/
class Dklab_Realplexor
{
@@ -93,12 +93,14 @@ public function send($idsAndCursors, $data, $showOnlyForIds = null)
}
/**
- * Return list of online IDs.
+ * Return list of online IDs (keys) and number of online browsers
+ * for each ID. (Now "online" means "connected just now", it is
+ * very approximate; more precision is in TODO.)
*
* @param array $idPrefixes If set, only online IDs with these prefixes are returned.
- * @return array List of matched online IDs.
+ * @return array List of matched online IDs (keys) and online counters (values).
*/
- public function cmdOnline($idPrefixes = null)
+ public function cmdOnlineWithCounters($idPrefixes = null)
{
// Add namespace.
$idPrefixes = $idPrefixes !== null? (array)$idPrefixes : array();
@@ -111,18 +113,29 @@ public function cmdOnline($idPrefixes = null)
// Send command.
$resp = $this->_sendCmd("online" . ($idPrefixes? " " . join(" ", $idPrefixes) : ""));
if (!strlen(trim($resp))) return array();
- $resp = explode(",", trim($resp));
- // Cut off namespaces.
- if (strlen($this->_namespace)) {
- foreach ($resp as $i => $id) {
- if (strpos($id, $this->_namespace) === 0) {
- $resp[$i] = substr($id, strlen($this->_namespace));
- }
+ // Parse the result and trim namespace.
+ $result = array();
+ foreach (explode("\n", $resp) as $line) {
+ @list ($id, $counter) = explode(" ", $line);
+ if (!strlen($id)) continue;
+ if (strlen($this->_namespace) && strpos($id, $this->_namespace) === 0) {
+ $id = substr($id, strlen($this->_namespace));
}
+ $result[$id] = $counter;
}
- return $resp;
+ return $result;
+ }
+
+ /**
+ * Return list of online IDs.
+ *
+ * @param array $idPrefixes If set, only online IDs with these prefixes are returned.
+ * @return array List of matched online IDs.
+ */
+ public function cmdOnline($idPrefixes = null)
+ {
+ return array_keys($this->cmdOnlineWithCounters($idPrefixes));
}
-
/**
* Return all Realplexor events (e.g. ID offline/offline changes)
@@ -183,7 +196,7 @@ private function _sendCmd($cmd)
{
return $this->_send(null, "$cmd\n");
}
-
+
/**
* Internal method.
* Send specified data to IN channel. Return response data.
View
26 api/php/t/541_phpapi_online_with_counters.phpt
@@ -0,0 +1,26 @@
+--TEST--
+dklab_realplexor: PHP API test, online IDs with online browsers counters
+
+--FILE--
+<?php
+require dirname(__FILE__) . '/init.php';
+
+send_wait("
+ identifier=5:abc,10:def,6:abc
+ aaa
+");
+
+printr($mpl->cmdOnlineWithCounters());
+
+disconnect_wait();
+
+?>
+--EXPECT--
+WA <-- identifier=5:abc,10:def,6:abc
+WA <-- aaa
+array (
+ 'abc' => '1',
+ 'def' => '1',
+)
+WA :: Disconnecting.
+# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=2 cleanup_timers=0 events=*]
View
4 dklab_realplexor.conf
@@ -99,6 +99,10 @@
# 2: show messages only, with timestamps
# 3: show messages, timestamps and storage statistics
VERBOSITY => 3,
+
+ # If a realplexor daemon consumes more memory than specified here,
+ # it is cruelly restarted. Specify 0 to disable restarting.
+ MAX_MEM_MB => 0,
);
return 1;
View
2 dklab_realplexor.js
@@ -3,7 +3,7 @@
function Dklab_Realplexor(fullUrl, namespace, viaDocumentWrite)
{
// Current JS library version.
- var VERSION = "1.24";
+ var VERSION = "1.31";
// Detect current page hostname.
var host = document.location.host;
View
18 dklab_realplexor.pl
@@ -124,6 +124,10 @@ sub mainloop {
my $pid_file;
GetOptions("p=s" => \$pid_file);
+# Load config: it is also needed by parent watchdog.
+my $additional_conf = $ARGV[0];
+Realplexor::Config::load($additional_conf, 1);
+
# Save PID?
if ($pid_file) {
open(local *F, ">", $pid_file) or die "Cannot create $pid_file: $!\n";
@@ -158,22 +162,12 @@ sub mainloop {
};
# Waid for child termination.
- while (wait() != -1) {}
+ Realplexor::Tools::wait_pid_with_memory_limit($pid, $CONFIG{MAX_MEM_MB});
sleep(1);
}
# Called if process dies.
END {
return if !$pid; # children
- kill(2, $pid);
- sleep(1);
- if (kill(0, $pid)) {
- kill(9, $pid);
- print STDERR "Killed the child using a heavy SIGKILL.\n";
- } else {
- print STDERR "Normally terminated.\n";
- }
- if ($pid_file) {
- unlink($pid_file);
- }
+ Realplexor::Tools::graceful_kill($pid, $pid_file);
}
View
38 t/servertest/215_online_cmd.phpt
@@ -0,0 +1,38 @@
+--TEST--
+dklab_realplexor: online cmd
+
+--FILE--
+<?php
+require dirname(__FILE__) . '/init.php';
+
+send_wait("
+ identifier=abc
+ aaa
+");
+disconnect_wait();
+
+send_wait("
+ identifier=def
+ bbb
+");
+
+send_in(null, "online");
+
+disconnect_wait();
+
+?>
+--EXPECT--
+WA <-- identifier=abc
+WA <-- aaa
+WA :: Disconnecting.
+WA <-- identifier=def
+WA <-- bbb
+IN <== online
+IN ==> HTTP/1.0 200 OK
+IN ==> Content-Type: text/plain
+IN ==> Content-Length: 12
+IN ==>
+IN ==> abc 0
+IN ==> def 1
+WA :: Disconnecting.
+# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=2 cleanup_timers=0 events=*]
View
4 t/servertest/535_login_online_no_id.phpt
@@ -24,7 +24,7 @@ IN <==
IN <== online user_
IN ==> HTTP/1.0 200 OK
IN ==> Content-Type: text/plain
-IN ==> Content-Length: 9
+IN ==> Content-Length: 11
IN ==>
-IN ==> user_abc
+IN ==> user_abc 0
# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=1 cleanup_timers=0 events=*]
View
4 t/servertest/540_login_online_not_own.phpt
@@ -44,7 +44,7 @@ IN <==
IN <== online user_
IN ==> HTTP/1.0 200 OK
IN ==> Content-Type: text/plain
-IN ==> Content-Length: 9
+IN ==> Content-Length: 11
IN ==>
-IN ==> user_abc
+IN ==> user_abc 0
# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=1 cleanup_timers=0 events=*]
View
27 t/servertest/700_max_mem_mb.phpt
@@ -0,0 +1,27 @@
+--TEST--
+dklab_realplexor: killing on memory overrun
+
+--FILE--
+<?php
+$VERBOSE = 1;
+$REALPLEXOR_CONF = "max_mem_mb.conf";
+require dirname(__FILE__) . '/init.php';
+
+expect("/Normally/");
+
+?>
+--EXPECT--
+# Starting.
+# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=0 cleanup_timers=0 events=*]
+# CONFIG: appending configuration from ***
+# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=0 cleanup_timers=0 events=*]
+# WAIT: listening 0.0.0.0:8088
+# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=0 cleanup_timers=0 events=*]
+# IN: listening 127.0.0.1:10010
+# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=0 cleanup_timers=0 events=*]
+# Switching current user to unprivileged "nobody"
+# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=0 cleanup_timers=0 events=*]
+# Daemon process uses * MB of memory which is larger than * MB. Killing...
+# SIGINT received, exiting
+# [pairs_by_fhs=0 data_to_send=0 connected_fhs=0 online_timers=0 cleanup_timers=0 events=*]
+# Normally terminated.
View
3 t/servertest/fixture/max_mem_mb.conf
@@ -0,0 +1,3 @@
+$CONFIG{MAX_MEM_MB} = 1;
+
+return 1;
View
3 t/servertest/init.php
@@ -54,6 +54,7 @@ function start_realplexor()
s/(appending configuration from ).*/$1***/mg;
s/(\[)\d+\.\d+/$1*/sg;
s/(events=)\d+/$1*/sg;
+ s/\d+( MB)/*$1/sg;
s/^/# /sg;
if ($del) {
$_ = "";
@@ -64,7 +65,7 @@ function start_realplexor()
if ($REALPLEXOR_CONF) {
$args = escapeshellarg(dirname(__FILE__) . '/fixture/' . $REALPLEXOR_CONF);
}
- run("cd ../.. && perl dklab_realplexor.pl $args | tee -a $OUT_TMP " .
+ run("cd ../.. && perl dklab_realplexor.pl $args 2>&1 | tee -a $OUT_TMP " .
"| perl -pe " . escapeshellarg($filter) .
($GLOBALS['VERBOSE'] ? "" : " | tail -n1")
);

0 comments on commit 7ba2fd5

Please sign in to comment.
Something went wrong with that request. Please try again.