Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Java client adding support for multi_metrics + timer flush

  • Loading branch information...
commit 00b1d10b34225194c51b469762aaa3ca9295669b 1 parent fb2ed48
@charlesdaniel charlesdaniel authored
Showing with 100 additions and 9 deletions.
  1. +100 −9 examples/StatsdClient.java
View
109 examples/StatsdClient.java
@@ -22,6 +22,14 @@
* // multiple keys with a sample rate
* client.increment(10, .1, "foo.bar.baz", "foo.bar.boo", "foo.baz.bar");
*
+ * // To enable multi metrics (aka more than 1 metric in a UDP packet) (disabled by default)
+ * client.enableMultiMetrics(true); //disable by passing in false
+ * // To fine-tune udp packet buffer size (default=1500)
+ * client.setBufferSize((short) 1500);
+ * // To force flush the buffer out (good idea to add to your shutdown path)
+ * client.flush();
+ *
+ *
* Note: For best results, and greater availability, you'll probably want to
* create a wrapper class which creates a static client and proxies to it.
*
@@ -36,10 +44,16 @@
import java.nio.channels.DatagramChannel;
import java.util.Locale;
import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.log4j.Logger;
-public class StatsdClient {
+public class StatsdClient extends TimerTask {
+ private ByteBuffer sendBuffer;
+ private Timer flushTimer;
+ private boolean multi_metrics = false;
+
private static final Random RNG = new Random();
private static final Logger log = Logger.getLogger(StatsdClient.class.getName());
@@ -53,8 +67,49 @@ public StatsdClient(String host, int port) throws UnknownHostException, IOExcept
public StatsdClient(InetAddress host, int port) throws IOException {
_address = new InetSocketAddress(host, port);
_channel = DatagramChannel.open();
+ setBufferSize((short) 1500);
}
+ protected void finalize() {
+ flush();
+ }
+
+ public synchronized void setBufferSize(short packetBufferSize) {
+ if(sendBuffer != null) {
+ flush();
+ }
+ sendBuffer = ByteBuffer.allocate(packetBufferSize);
+ }
+
+ public synchronized void enableMultiMetrics(boolean enable) {
+ multi_metrics = enable;
+ }
+
+ public synchronized boolean startFlushTimer(long period) {
+ if(flushTimer == null) {
+ // period is in msecs
+ if(period <= 0) { period = 2000; }
+ flushTimer = new Timer();
+
+ // We pass this object in as the TimerTask (which calls run())
+ flushTimer.schedule((TimerTask)this, period, period);
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized void stopFlushTimer() {
+ if(flushTimer != null) {
+ flushTimer.cancel();
+ flushTimer = null;
+ }
+ }
+
+ public void run() { // used by Timer, we're a Runnable TimerTask
+ flush();
+ }
+
+
public boolean timing(String key, int value) {
return timing(key, value, 1.0);
}
@@ -143,24 +198,60 @@ private boolean send(double sampleRate, String... stats) {
return retval;
}
- private boolean doSend(final String stat) {
+ private synchronized boolean doSend(String stat) {
+ try {
+ final byte[] data = stat.getBytes("utf-8");
+
+ // If we're going to go past the threshold of the buffer then flush.
+ // the +1 is for the potential '\n' in multi_metrics below
+ if(sendBuffer.remaining() < (data.length + 1)) {
+ flush();
+ }
+
+ if(sendBuffer.position() > 0) { // multiple metrics are separated by '\n'
+ sendBuffer.put( (byte) '\n');
+ }
+
+ sendBuffer.put(data); // append the data
+
+ if(! multi_metrics) {
+ flush();
+ }
+
+ return true;
+
+ } catch (IOException e) {
+ log.error(
+ String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
+ _address.getPort()), e);
+ return false;
+ }
+ }
+
+ public synchronized boolean flush() {
try {
- final byte[] data = stat.getBytes("utf-8");
- final ByteBuffer buff = ByteBuffer.wrap(data);
- final int nbSentBytes = _channel.send(buff, _address);
+ final int sizeOfBuffer = sendBuffer.position();
+
+ if(sizeOfBuffer <= 0) { return false; } // empty buffer
+
+ // send and reset the buffer
+ sendBuffer.flip();
+ final int nbSentBytes = _channel.send(sendBuffer, _address);
+ sendBuffer.limit(sendBuffer.capacity());
+ sendBuffer.rewind();
- if (data.length == nbSentBytes) {
+ if (sizeOfBuffer == nbSentBytes) {
return true;
} else {
log.error(String.format(
- "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", stat,
- _address.getHostName(), _address.getPort(), nbSentBytes, data.length));
+ "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", sendBuffer.toString(),
+ _address.getHostName(), _address.getPort(), nbSentBytes, sizeOfBuffer));
return false;
}
} catch (IOException e) {
log.error(
- String.format("Could not send stat %s to host %s:%d", stat, _address.getHostName(),
+ String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
_address.getPort()), e);
return false;
}
Please sign in to comment.
Something went wrong with that request. Please try again.