Skip to content

Commit

Permalink
Merge pull request statsd#224 from charlesdaniel/master
Browse files Browse the repository at this point in the history
Adding multi metric support to Java client (multiple metrics in a single datagram)
  • Loading branch information
mrtazz committed Dec 25, 2012
2 parents 86ad6ed + 00b1d10 commit d03221f
Showing 1 changed file with 100 additions and 9 deletions.
109 changes: 100 additions & 9 deletions examples/StatsdClient.java
Expand Up @@ -22,6 +22,14 @@
* // multiple keys with a sample rate
* client.increment(10, .1, "foo.bar.baz", "foo.bar.boo", "foo.baz.bar");
*
* // To enable multi metrics (aka more than 1 metric in a UDP packet) (disabled by default)
* client.enableMultiMetrics(true); //disable by passing in false
* // To fine-tune udp packet buffer size (default=1500)
* client.setBufferSize((short) 1500);
* // To force flush the buffer out (good idea to add to your shutdown path)
* client.flush();
*
*
* Note: For best results, and greater availability, you'll probably want to
* create a wrapper class which creates a static client and proxies to it.
*
Expand All @@ -36,10 +44,16 @@
import java.nio.channels.DatagramChannel;
import java.util.Locale;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;

import org.apache.log4j.Logger;

public class StatsdClient {
public class StatsdClient extends TimerTask {
private ByteBuffer sendBuffer;
private Timer flushTimer;
private boolean multi_metrics = false;

private static final Random RNG = new Random();
private static final Logger log = Logger.getLogger(StatsdClient.class.getName());

Expand All @@ -53,8 +67,49 @@ public StatsdClient(String host, int port) throws UnknownHostException, IOExcept
public StatsdClient(InetAddress host, int port) throws IOException {
_address = new InetSocketAddress(host, port);
_channel = DatagramChannel.open();
setBufferSize((short) 1500);
}

protected void finalize() {
flush();
}

public synchronized void setBufferSize(short packetBufferSize) {
if(sendBuffer != null) {
flush();
}
sendBuffer = ByteBuffer.allocate(packetBufferSize);
}

public synchronized void enableMultiMetrics(boolean enable) {
multi_metrics = enable;
}

public synchronized boolean startFlushTimer(long period) {
if(flushTimer == null) {
// period is in msecs
if(period <= 0) { period = 2000; }
flushTimer = new Timer();

// We pass this object in as the TimerTask (which calls run())
flushTimer.schedule((TimerTask)this, period, period);
return true;
}
return false;
}

public synchronized void stopFlushTimer() {
if(flushTimer != null) {
flushTimer.cancel();
flushTimer = null;
}
}

public void run() { // used by Timer, we're a Runnable TimerTask
flush();
}


public boolean timing(String key, int value) {
return timing(key, value, 1.0);
}
Expand Down Expand Up @@ -143,24 +198,60 @@ private boolean send(double sampleRate, String... stats) {
return retval;
}

private boolean doSend(final String stat) {
private synchronized boolean doSend(String stat) {
try {
final byte[] data = stat.getBytes("utf-8");

// If we're going to go past the threshold of the buffer then flush.
// the +1 is for the potential '\n' in multi_metrics below
if(sendBuffer.remaining() < (data.length + 1)) {
flush();
}

if(sendBuffer.position() > 0) { // multiple metrics are separated by '\n'
sendBuffer.put( (byte) '\n');
}

sendBuffer.put(data); // append the data

if(! multi_metrics) {
flush();
}

return true;

} catch (IOException e) {
log.error(
String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
_address.getPort()), e);
return false;
}
}

public synchronized boolean flush() {
try {
final byte[] data = stat.getBytes("utf-8");
final ByteBuffer buff = ByteBuffer.wrap(data);
final int nbSentBytes = _channel.send(buff, _address);
final int sizeOfBuffer = sendBuffer.position();

if(sizeOfBuffer <= 0) { return false; } // empty buffer

// send and reset the buffer
sendBuffer.flip();
final int nbSentBytes = _channel.send(sendBuffer, _address);
sendBuffer.limit(sendBuffer.capacity());
sendBuffer.rewind();

if (data.length == nbSentBytes) {
if (sizeOfBuffer == nbSentBytes) {
return true;
} else {
log.error(String.format(
"Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", stat,
_address.getHostName(), _address.getPort(), nbSentBytes, data.length));
"Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", sendBuffer.toString(),
_address.getHostName(), _address.getPort(), nbSentBytes, sizeOfBuffer));
return false;
}

} catch (IOException e) {
log.error(
String.format("Could not send stat %s to host %s:%d", stat, _address.getHostName(),
String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
_address.getPort()), e);
return false;
}
Expand Down

0 comments on commit d03221f

Please sign in to comment.