Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 14 commits
  • 6 files changed
  • 0 commit comments
  • 3 contributors
Commits on Aug 11, 2012
@timbunce timbunce Remove redundant lines from MockServer.pm. Add sample rate to test di…
…ag message.
4439cca
@timbunce timbunce Add simple benchmark utility. Currently ~6k/sec to 'localhost', ~10k/…
…sec to 127.0.0.1.

Using lib/Net/Statsd.pm sending to 127.0.0.1:9
Benchmark: timing 10000 iterations of decrement, gauge, increment, timing_001, timing_100...
 decrement:  0 wallclock secs ( 0.67 usr +  0.24 sys =  0.91 CPU) @ 10989.01/s (n=10000)
     gauge:  1 wallclock secs ( 0.65 usr +  0.27 sys =  0.92 CPU) @ 10869.57/s (n=10000)
 increment:  1 wallclock secs ( 0.68 usr +  0.34 sys =  1.02 CPU) @ 9803.92/s (n=10000)
timing_001:  1 wallclock secs ( 0.65 usr +  0.26 sys =  0.91 CPU) @ 10989.01/s (n=10000)
timing_100:  1 wallclock secs ( 0.66 usr +  0.27 sys =  0.93 CPU) @ 10752.69/s (n=10000)

Press ENTER or type command to continue
Using lib/Net/Statsd.pm sending to localhost:9
Benchmark: timing 10000 iterations of decrement, gauge, increment, timing_001, timing_100...
 decrement:  3 wallclock secs ( 1.03 usr +  0.63 sys =  1.66 CPU) @ 6024.10/s (n=10000)
     gauge:  2 wallclock secs ( 0.99 usr +  0.62 sys =  1.61 CPU) @ 6211.18/s (n=10000)
 increment:  3 wallclock secs ( 1.02 usr +  0.63 sys =  1.65 CPU) @ 6060.61/s (n=10000)
timing_001:  2 wallclock secs ( 1.00 usr +  0.62 sys =  1.62 CPU) @ 6172.84/s (n=10000)
timing_100:  3 wallclock secs ( 1.00 usr +  0.62 sys =  1.62 CPU) @ 6172.84/s (n=10000)
a7c2801
@timbunce timbunce Cache the socket. Boosts performance from ~6k to >60k sends/sec! 62f96d4
@timbunce timbunce Use send() builtin and each. Now over 13 times faster than 0.07.
When sending to a remote host:
 increment: 31 wallclock secs (11.70 usr +  9.27 sys = 20.97 CPU) @  4768.72/s (n=100000)
 increment:  1 wallclock secs ( 1.05 usr +  0.49 sys =  1.54 CPU) @ 64935.06/s (n=100000)
85b6984
@cosimo Merge pull request #8 from timbunce/tim-perf1
This branch is 13 times faster than the 0.07 release :)
21e56f3
@cosimo Documented changes that went into 0.08 071bba5
@cosimo Bumped up version 8c00c7a
Commits on Jun 05, 2014
@cosimo RT#96097: fixed FSF address
Thanks to David Dick for reporting.

The non-completely-obvious way this is fixed is by rerunning
'dzil build', which adds a LICENSE file automatically.

That LICENSE file is generated by Software::License though,
so that module must be updated before doing this.
05efd89
@cosimo RT#94294: experiment with multi-metric packets
Ville Mattila sent a patch to make gauge() send multi-metric gauges.
I improved that patch and added test cases for same-metric multi value,
and different metric multi-value.

Still trying to understand if this is the correct and wanted
behaviour. Stay tuned here:

https://rt.cpan.org/Ticket/Display.html?id=94294
34d5944
@cosimo Mark as development (multi-metric values) version 5c928d8
Commits on Apr 23, 2015
Zefram RT#103913: diag() is used to indicate problems 68fa7f8
@cosimo send() could be interpreted as our own send method
which is not. It's the socket CORE::send().
87354ca
@cosimo Bump version for CPAN release (0.10) c363a4e
@cosimo CPAN release 0.11:
Changes file wasn't updated.
ed3d113
View
24 Changes
@@ -1,5 +1,28 @@
Change history for Net::Statsd
+0.11 - Thu Apr 23 16:27:58 CEST 2015
+
+ Fixed RT#103913, use note() instead of diag() in the test suite.
+ Thanks to Zefram for reporting it and sending a patch.
+
+ Implemented experimental multi-metric gauge sending.
+ Thanks to Ville Mattila for the initial patch.
+
+0.09 - Wed Jun 04 16:23:01 CEST 2014
+
+ Fixed RT#96097 (fsf address, thanks to David Dick for reporting)
+
+0.08 - Sat Aug 11 22:55:53 CEST 2012
+
+ Improved performance by a ~13x factor, optimizing the
+ send code and reusing the same socket instead of creating
+ a new one every time.
+
+ Added a benchmarking script, to measure Net::Statsd
+ throughput in msg/s.
+
+ Thanks to Tim Bunce for these great patches.
+
0.07 - Fri Aug 10 09:16:20 CEST 2012
Fixed tests failure when an already existing statsd server
@@ -16,4 +39,3 @@ Change history for Net::Statsd
Thanks to Andreas Marienborg.
0.05 and below, let's consider them initial release
-
View
35 benchmark.pl
@@ -0,0 +1,35 @@
+#!/usr/bin/perl
+
+=head1 NAME
+
+benchmark.pl - report max send rate
+
+=head1 DESCRIPTION
+
+A basic test of the time is takes to send stats.
+
+=cut
+
+use strict;
+use warnings;
+
+use Net::Statsd;
+use Benchmark;
+use Getopt::Long;
+
+GetOptions(
+ 'host|h=s' => \($Net::Statsd::HOST = 'localhost'),
+ 'port|p=i' => \($Net::Statsd::PORT = 9), # port 9 is the standard discard service
+) or exit 1;
+
+my $count = shift || 10_000;
+
+print "Using $INC{'Net/Statsd.pm'} sending to $Net::Statsd::HOST:$Net::Statsd::PORT\n";
+
+timethese($count, {
+ increment => sub { Net::Statsd::increment('foo.bar.i') },
+ decrement => sub { Net::Statsd::increment('foo.bar.d') },
+ timing_100 => sub { Net::Statsd::timing('foo.bar.t', 1) },
+ timing_001 => sub { Net::Statsd::timing('foo.bar.t', 0.1) },
+ gauge => sub { Net::Statsd::gauge('foo.bar.g', 42) },
+});
View
4 dist.ini
@@ -2,9 +2,9 @@ name = Net-Statsd
author = Cosimo Streppone <cosimo@cpan.org>
license = Perl_5
copyright_holder = Cosimo Streppone
-copyright_year = 2012
+copyright_year = 2015
-version = 0.07
+version = 0.11
[@Basic]
[PkgVersion]
View
124 lib/Net/Statsd.pm
@@ -11,6 +11,9 @@ use IO::Socket ();
our $HOST = 'localhost';
our $PORT = 8125;
+my $SOCK;
+my $SOCK_PEER;
+
=head1 NAME
Net::Statsd - Perl client for Etsy's statsd daemon
@@ -207,17 +210,48 @@ Log arbitrary values, as a temperature, or server load.
Net::Statsd::gauge('core.temperature', 55);
+Statsd interprets gauge values with C<+> or C<-> sign as increment/decrement.
+Therefore, to explicitly set a gauge to a negative number it has to be set
+to zero first.
+
+However, if either the zero or the actual negative value is lost in UDP
+transport to statsd server because of e.g. network congestion or packet loss,
+your gauge will become skewed.
+
+To ensure network problems will not skew your data, C<Net::Statsd::gauge()>
+supports packing multiple values in single UDP packet sent to statsd:
+
+ Net::Statsd::gauge(
+ 'core.temperature' => 55,
+ 'freezer.temperature' => -18
+ );
+
+Make sure you don't supply too many values, or you might risk exceeding the
+MTU of the network interface and cause the resulting UDP packet to be dropped.
+
+In general, a safe limit should be 512 bytes. Related to the example
+above, C<core.temperature> of 55 will be likely packed as a string:
+
+ core.temperature:55|g
+
+which is 21 characters, plus a newline used as delimiter (22).
+Using this example, you can pack at least 20 distinct gauge values without
+problems. That will result in a UDP message of 440 bytes (22 times 20),
+which is well below the I<safe> threshold of 512.
+
+In reality, if the communication happens on a local interface, or over
+a 10G link, you are allowed much more than that.
+
=cut
sub gauge {
- my ($name, $value) = @_;
-
- $value = 0 unless defined $value;
+ my $stats = {};
- # Didn't use '%d' because values might be floats
- my $stats = {
- $name => sprintf "%s|g", $value
- };
+ while (my($name, $value) = splice(@_, 0, 2)) {
+ $value = 0 unless defined $value;
+ # Didn't use '%d' because values might be floats
+ push @{ $stats->{$name} }, sprintf("%s|g", $value);
+ }
return Net::Statsd::send($stats, 1);
}
@@ -242,31 +276,50 @@ sub send {
return;
}
- my $udp_sock = IO::Socket::INET->new(
- Proto => 'udp',
- PeerAddr => $HOST,
- PeerPort => $PORT,
- ) or do {
- # warn perhaps?
- return
- };
-
- # We don't want to die if Net::Statsd::send() doesn't work...
- # We could though:
- #
- # or die "Could not create UDP socket: $!\n";
+ # Cache the socket to avoid dns and socket creation overheads
+ # (this boosts performance from ~6k to >60k sends/sec)
+ if (!$SOCK || !$SOCK_PEER || "$HOST:$PORT" ne $SOCK_PEER) {
+
+ $SOCK = IO::Socket::INET->new(
+ Proto => 'udp',
+ PeerAddr => $HOST,
+ PeerPort => $PORT,
+ ) or do {
+ Carp::carp("Net::Statsd can't create a socket to $HOST:$PORT: $!")
+ unless our $_warn_once->{"$HOST:$PORT"}++;
+ return
+ };
+ $SOCK_PEER = "$HOST:$PORT";
+
+ # We don't want to die if Net::Statsd::send() doesn't work...
+ # We could though:
+ #
+ # or die "Could not create UDP socket: $!\n";
+ }
my $all_sent = 1;
- for my $stat (keys %{ $sampled_data }) {
- my $value = $sampled_data->{$stat};
- my $packet = "$stat:$value";
- $udp_sock->send($packet);
- # XXX If you want warnings...
- # or do {
- # warn "[" . localtime() . "] UDP packet '$packet' send failed\n";
- # $all_sent = 0;
- #};
+ keys %{ $sampled_data }; # reset iterator
+ while (my ($stat, $value) = each %{ $sampled_data }) {
+ my $packet;
+ if (ref $value eq 'ARRAY') {
+ # https://github.com/etsy/statsd/blob/master/docs/metric_types.md#multi-metric-packets
+ $packet = join("\n", map { "$stat:$_" } @{ $value });
+ }
+ else {
+ # Single value as scalar
+ $packet = "$stat:$value";
+ }
+ # send() returns the number of characters sent, or undef on error.
+ my $r = CORE::send($SOCK, $packet, 0);
+ if (!defined $r) {
+ #warn "Net::Statsd send error: $!";
+ $all_sent = 0;
+ }
+ elsif ($r != length($packet)) {
+ #warn "Net::Statsd send truncated: $!";
+ $all_sent = 0;
+ }
}
return $all_sent;
@@ -320,7 +373,18 @@ sub _sample_data {
# Uglier, but if there's no data to be sampled,
# we get a clean undef as returned value
$sampled_data ||= {};
- $sampled_data->{$stat} = sprintf "%s|@%s", $value, $sample_rate;
+
+ # Multi-metric packet:
+ # https://github.com/etsy/statsd/blob/master/docs/metric_types.md#multi-metric-packets
+ if (ref $value eq 'ARRAY') {
+ foreach my $v ( @{ $value } ) {
+ push @{ $sampled_data->{$stat} }, sprintf("%s|@%s", $v, $sample_rate);
+ }
+ }
+ # Single value as scalar
+ else {
+ $sampled_data->{$stat} = sprintf "%s|@%s", $value, $sample_rate;
+ }
}
}
View
96 t/MockServer.pm
@@ -5,7 +5,7 @@ use IO::Select;
$| = 1;
-use vars qw ($socket @messages $select);
+use vars qw ($socket @messages $select $PACKETS_RECEIVED);
sub start {
@@ -15,6 +15,8 @@ sub start {
Proto => 'udp',
) or die "unable to create socket: $!\n";
+ $PACKETS_RECEIVED = 0;
+
$select = IO::Select->new($socket);
reset_messages();
return $socket->sockport();
@@ -26,53 +28,59 @@ sub run {
while (1) {
my @ready = $select->can_read($timeout);
last unless @ready;
-
+
my $msg = {};
$socket->recv($_data, 1024);
+
$_data =~ s/^\s+//;
$_data =~ s/\s+$//;
$msg->{_raw_data} = $_data;
+
+ # Don't count 'quit' as a received packet
last if $_data =~ /^quit/i;
-
- my @bits = split(':', $_data);
- my $key = shift @bits;
- $key =~ s/\s+/_/g;
- $key =~ s/\//-/g;
- $key =~ s/[^a-zA-Z_\-0-9\.]//g;
- $msg->{key} = $key;
+ $PACKETS_RECEIVED++;
- if (@bits == 0 || ! defined $bits[0]) {
- push @bits, 1;
- }
+ # Multi-metric packets are separated by a newline
+ for my $pkt (split "\n", $_data) {
- for (@bits) {
- my @fields = split m{\|};
+ my @bits = split(':', $pkt);
- if (@fields == 1 || ! defined $fields[1]) {
- $msg->{error} = "bad line";
- next;
- }
+ my $key = shift @bits;
+ $key =~ s/\s+/_/g;
+ $key =~ s/\//-/g;
+ $key =~ s/[^a-zA-Z_\-0-9\.]//g;
+ $msg->{key} = $key;
- # Timer
- if ($fields[1] eq 'ms') {
- $msg->{timers} = [] unless $msg->{timers};
- push @{$msg->{timers}}, $fields[0];
+ if (@bits == 0 || ! defined $bits[0]) {
+ push @bits, 1;
}
- # Gauge (FIXME I'll just pretend this is correct)
- elsif ($fields[1] eq 'g') {
- $msg->{gauges} = [] unless $msg->{gauges};
- push @{$msg->{gauges}}, $fields[0];
- }
+ for (@bits) {
+ my @fields = split m{\|};
+
+ if (@fields == 1 || ! defined $fields[1]) {
+ $msg->{error} = "bad line";
+ next;
+ }
+
+ # Timer
+ if ($fields[1] eq 'ms') {
+ push @{$msg->{timers}}, $fields[0];
+ }
+
+ # Gauge
+ elsif ($fields[1] eq 'g') {
+ push @{$msg->{gauges}}, $fields[0];
+ }
- # Counter, evt. sampled
- else {
- if ($fields[2] && $fields[2] =~ /^\s*@([\d\.]+)/) {
- $msg->{sample_rate} = $1;
+ # Counter, evt. sampled
+ else {
+ if ($fields[2] && $fields[2] =~ /^\s*@([\d\.]+)/) {
+ $msg->{sample_rate} = $1;
+ }
+ push @{$msg->{counters}}, $fields[0];
}
- $msg->{counters} = [] unless $msg->{counters};
- push @{$msg->{counters}}, $fields[0];
}
}
push @messages, $msg;
@@ -88,10 +96,23 @@ sub get_messages {
sub get_and_reset_messages {
my $ret = get_messages();
reset_messages();
- return $ret
+ return $ret;
}
-sub reset_messages { @messages = () }
+sub packets_received {
+ my $curr_value = $PACKETS_RECEIVED;
+ $PACKETS_RECEIVED = 0;
+ return $curr_value;
+}
+
+sub process {
+ stop();
+ run();
+}
+
+sub reset_messages {
+ @messages = ();
+}
sub stop {
my $s_send = IO::Socket::INET->new(
@@ -102,9 +123,4 @@ sub stop {
$s_send->close();
}
-sub process {
- stop();
- run();
-}
-
1;
View
54 t/mock-server.t
@@ -19,7 +19,7 @@ BEGIN {
use strict;
use warnings;
-use Test::More tests => 10;
+use Test::More tests => 17;
use Net::Statsd;
my $dirname;
@@ -97,6 +97,56 @@ is_deeply($msgs, [ {
_raw_data => 'oxygen.level:0.98|g',
} ], "Gauge message was stored correctly");
+# -----------------------------------------------------------------------------
+note("Multi-metric packet test 1");
+
+my @multi_metric_values = (
+ 'oxygen.level' => 0.98,
+ 'hydrogen.level' => 0.95,
+ 'helium.level' => 0.92);
+
+Net::Statsd::gauge(@multi_metric_values);
+
+$msgs = MockServer::get_and_reset_messages();
+ok(ref $msgs eq 'ARRAY');
+is(scalar @{ $msgs } => 3, "We should have got 3 messages in a single packet");
+my %expected_msg = @multi_metric_values;
+for (@{ $msgs }) {
+ my $key = $_->{key};
+ my $value = $_->{gauges}->[0];
+ # If we get the expected value, remove it
+ if ($value == $expected_msg{$key}) {
+ delete $expected_msg{$key};
+ }
+}
+is(scalar keys %expected_msg, 0, "Got all three metrics back");
+
+# -----------------------------------------------------------------------------
+note("Multi-metric packet test 2. Same metric twice.");
+
+MockServer::packets_received(); # reset
+
+Net::Statsd::gauge(
+ 'core.temperature', 55,
+ 'core.temperature', 56,
+);
+
+$msgs = MockServer::get_and_reset_messages();
+ok(ref $msgs eq 'ARRAY');
+is(scalar @{ $msgs } => 1,
+ "Multiple values for same metric should be aggregated in the same messge");
+
+#use Data::Dumper; note(Dumper($msgs));
+is_deeply($msgs, [ {
+ key => 'core.temperature',
+ gauges => [ 55, 56 ],
+ _raw_data => "core.temperature:55|g\ncore.temperature:56|g",
+} ]);
+
+is(MockServer::packets_received(), 1,
+ "Two metric values should have been sent in one packet");
+
+# -----------------------------------------------------------------------------
note("The following test validates sent data");
my $tries = 10000;
@@ -113,7 +163,7 @@ for (1 .. $tries) {
my $expected_seen = $tries * $sample_rate;
my $num_seen = scalar @messages;
-diag("Got $num_seen samples out of $tries tries");
+note("Got $num_seen samples out of $tries tries (sample rate $sample_rate)");
cmp_ok(
int(abs($num_seen - $expected_seen)), '<=', (int($expected_seen * 0.05) | 1),
"5% delta or less"

No commit comments for this range

Something went wrong with that request. Please try again.