Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge remote-tracking branch 'origin/master'

  • Loading branch information...
commit 52bbcb7d5edfae4e2b1b9f04bd418120c182d6bc 2 parents f6808e8 + 47dca96
yangming authored
View
5 java/src/main/java/com/appfirst/statsd/AFClient.java
@@ -16,7 +16,7 @@
* @author Yangming Huang
*
*/
-public class AFClient extends AbstractStatsdClient implements StatsdClient {
+public class AFClient extends AbstractStatsdClient {
static Logger log = Logger.getLogger(AFClient.class);
private static String AFCAPIName = "/afcollectorapi";
@@ -32,6 +32,7 @@
* Default Constructor. Initialize AFClient.
*/
public AFClient() {
+ this.setStrategy(new GeyserStrategy(20));
}
private UDPClient _udpClient = null;
@@ -65,7 +66,7 @@ protected final boolean doSend(final String stat) {
// trim msg if over allowed size
String msg = (stat.length() > AFCMaxMsgSize) ? stat.substring(0, AFCMaxMsgSize) : stat;
- // log.info(String.format("Sending stat: %s", stat));
+ log.info(String.format("Sending stat: %s", stat));
try {
int mqd = openQueue();
int rc = this.mqlib.mq_send(mqd, msg, msg.length(), AFCSeverityStatsd);
View
89 java/src/main/java/com/appfirst/statsd/AbstractStatsdClient.java
@@ -1,6 +1,5 @@
package com.appfirst.statsd;
-import java.util.Date;
-import java.util.Random;
+import java.util.Map;
/**
* The Skeleton class of Java Statsd Client with AppFirst Extension.
@@ -11,12 +10,28 @@
* You know... the "Java way."
* <br/>
* Based on Statsd Client of (C) 2011 Meetup, Inc.
- * Author: Andrew Gwozdziewycz <andrew@meetup.com>, @apgwoz
+ * by Andrew Gwozdziewycz <andrew@meetup.com>, @apgwoz
*
* @author Yangming Huang @leonmax
*/
-public abstract class AbstractStatsdClient implements StatsdClient {
- private static Random RNG = new Random();
+public abstract class AbstractStatsdClient implements StatsdClient, Runnable {
+ private Strategy strategy = new InstantStrategy();
+
+ public void setStrategy(Strategy strategy){
+ this.strategy = strategy;
+ this.strategy.setTask(this);
+ }
+
+ private BucketBuffer buffer = new BucketBuffer();
+
+ public void run(){
+ if (!this.buffer.isEmpty()){
+ Map<String, Bucket> dumpcellar = this.buffer.dump();
+ for (Bucket bucket : dumpcellar.values()){
+ this.doSend(bucket.toString());
+ }
+ }
+ }
/* (non-Javadoc)
* @see com.appfirst.statsd.IStatsdClient#gauge(java.lang.String, int)
@@ -28,9 +43,11 @@ public boolean gauge(String bucket, int value) {
/* (non-Javadoc)
* @see com.appfirst.statsd.IStatsdClient#gauge(java.lang.String, int, java.lang.String)
*/
- public boolean gauge(String bucket, int value, String message) {
- String stat = buildMessage(bucket, value, "g", new Date().getTime(), message);
- return send(stat, 1);
+ public synchronized boolean gauge(String bucketname, int value, String message){
+ GaugeBucket bucket = this.buffer.getBucket(bucketname, GaugeBucket.class);
+ bucket.infuse(value, message);
+ strategy.process();
+ return true;
}
/* (non-Javadoc)
@@ -43,9 +60,11 @@ public boolean timing(String bucket, int value) {
/* (non-Javadoc)
* @see com.appfirst.statsd.IStatsdClient#timing(java.lang.String, int, java.lang.String)
*/
- public boolean timing(String bucket, int value, String message) {
- String stat = buildMessage(bucket, value, "ms", 1, message);
- return send(stat, 1);
+ public synchronized boolean timing(String bucketname, int value, String message){
+ TimerBucket bucket = this.buffer.getBucket(bucketname, TimerBucket.class);
+ bucket.infuse(value, message);
+ strategy.process();
+ return true;
}
/* (non-Javadoc)
@@ -82,50 +101,24 @@ public boolean updateStats(int value, double sampleRate, String... buckets){
public boolean updateStats(int value, String message, double sampleRate, String... buckets){
boolean result = true;
for (int i = 0; i < buckets.length; i++) {
- String stat = buildMessage(buckets[i], value, "c", sampleRate, message);
- result = result && send(stat, sampleRate);
+ result = result && this.updateStats(buckets[i], value, sampleRate, message);
}
return result;
}
- private String buildMessage(String bucket, int value, String type, double sampleRate, String message){
- String field2 = "";
- if (sampleRate < 1) {
- field2 = String.format("@%f", sampleRate);
- }
- return buildMessage(bucket, value, type, field2, message);
- }
-
- private String buildMessage(String bucket, int value, String type, long timestamp, String message){
- String field2 = String.valueOf(timestamp);
- return buildMessage(bucket, value, type, field2, message);
- }
-
- private String buildMessage(String bucket, int value, String type, String field2, String message){
- // bucket: field0 | field1 | field2 | field3
- // bucket: value | type | sampele_rate/timestamp | message
- String stat = String.format("%s:%d|%s", bucket, value, type);
- // when message is there, we always keep field2 even if it's blank:
- // bucket:2|c||some_message
- if (message != null && !message.equals("")){
- stat += String.format("|%s|%s", field2, message);
- } else if (!field2.equals("")){
- stat += String.format("|%s", field2);
- }
-
- return stat;
+ /* (non-Javadoc)
+ * @see com.appfirst.statsd.IStatsdClient#updateStats(java.lang.String, int, double, java.lang.String)
+ */
+ public synchronized boolean updateStats(String bucketname, int value, double sampleRate, String message){
+ CounterBucket bucket = this.buffer.getBucket(bucketname, CounterBucket.class);
+ bucket.infuse(value, sampleRate, message);
+ strategy.process();
+ return true;
}
- private boolean send(String stat, double sampleRate) {
- if (sampleRate < 1.0 && RNG.nextDouble() > sampleRate)
- return false;
- else {
- return this.doSend(stat);
- }
- }
-
/**
- * To write a customized client, all you need is to implement this method which sends the stats message to StatsD Server thru your own media.
+ * To write a customized client, all you need is to implement this method which sends the stats
+ * message to StatsD Server thru your own media.
*
* @param stat - the formatted message ready to send to the StatsD Server.
* @return True if success, False otherwise.
View
6 java/src/main/java/com/appfirst/statsd/Bucket.java
@@ -0,0 +1,6 @@
+package com.appfirst.statsd;
+
+public interface Bucket {
+ public abstract String getName();
+ public void setName(String name);
+}
View
39 java/src/main/java/com/appfirst/statsd/BucketBuffer.java
@@ -0,0 +1,39 @@
+package com.appfirst.statsd;
+
+import java.util.Hashtable;
+import java.util.Map;
+
+public class BucketBuffer {
+ private Map<String, Bucket> cellar = new Hashtable<String, Bucket>();
+
+ boolean isEmpty(){
+ return this.cellar.isEmpty();
+ }
+
+ <T extends Bucket> T getBucket(String bucketname, Class<T> clazz){
+ T bucket = null;
+ if (cellar.containsKey(bucketname)){
+ Bucket rawbucket = cellar.get(bucketname);
+ if (clazz.isInstance(rawbucket)){
+ bucket = (T) clazz.cast(rawbucket);
+ } else {
+ return null;
+ }
+ } else {
+ try {
+ bucket = clazz.newInstance();
+ bucket.setName(bucketname);
+ } catch (InstantiationException e) {
+ } catch (IllegalAccessException e) {
+ }
+ cellar.put(bucketname, bucket);
+ }
+ return bucket;
+ }
+
+ synchronized Map<String, Bucket> dump(){
+ Map<String, Bucket> dumpcellar = cellar;
+ cellar = new Hashtable<String, Bucket>();
+ return dumpcellar;
+ }
+}
View
45 java/src/main/java/com/appfirst/statsd/CounterBucket.java
@@ -0,0 +1,45 @@
+package com.appfirst.statsd;
+
+import java.util.Random;
+
+public class CounterBucket implements Bucket{
+ private String name;
+ private int value = 0;
+ private String message = null;
+ private static Random RNG = new Random();
+
+ @Override
+ public void setName(String name){
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString(){
+ String stat = String.format("%s:%d|c", name, this.value);
+ if (message != null && !message.equals("")){
+ stat += String.format("||%s", message);
+ }
+ return stat;
+ }
+
+ public CounterBucket infuse(int value, double sampleRate, String message){
+ if (sampleRate < 1.0 && RNG.nextDouble() > sampleRate)
+ return this;
+ this.value += value/sampleRate;
+ if (message != null && !message.equals("")){
+ if (this.message != null){
+ this.message += "|" + message;
+ } else {
+ this.message = message;
+ }
+ }
+ return this;
+ }
+
+// public void merge
+}
View
47 java/src/main/java/com/appfirst/statsd/GaugeBucket.java
@@ -0,0 +1,47 @@
+package com.appfirst.statsd;
+
+import java.util.Date;
+
+public class GaugeBucket implements Bucket {
+ private String name;
+ private int sumstat = 0;
+ private int count = 0;
+ private long timestamp;
+ private String message = null;
+
+ @Override
+ public void setName(String name){
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString(){
+ String stat = null;
+ int avg = this.sumstat/this.count;
+ if (message != null && !message.equals("")){
+ stat = String.format("%s:%d|g|%s|%s", name, avg, timestamp, message);
+ } else {
+ stat = String.format("%s:%d|g|%s", name, avg, timestamp);
+ }
+ return stat;
+ }
+
+ public GaugeBucket infuse(int value, String message){
+ this.sumstat += value;
+ this.count++;
+ if (message != null && !message.equals("")){
+ if (this.message != null){
+ this.message += "|" + message;
+ } else {
+ this.message = message;
+ }
+ }
+ this.timestamp = new Date().getTime();
+ return this;
+ }
+}
View
53 java/src/main/java/com/appfirst/statsd/GeyserStrategy.java
@@ -0,0 +1,53 @@
+package com.appfirst.statsd;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class GeyserStrategy implements Strategy, Runnable {
+ private int interval = 20;
+ private TimeUnit unit = TimeUnit.SECONDS;
+ private Runnable task;
+ private ScheduledFuture<?> f;
+ private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+ public GeyserStrategy(int interval){
+ this.setInterval(interval);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(this));
+ }
+
+ public void run(){
+ System.out.println("final execution");
+// executor.execute(task);
+ task.run();
+ if (!executor.isShutdown()){
+ executor.shutdown();
+ // if (!executor.awaitTermination(SHUTDOWN_TIME)) { //optional *
+ // Logger.log("Executor did not terminate in the specified time."); //optional *
+ // List<Runnable> droppedTasks = executor.shutdownNow(); //optional **
+ // Logger.log("Executor was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); //optional **
+ // }
+ }
+ }
+
+ public void setInterval(int interval){
+ this.interval = interval;
+ }
+
+ public void setTimeUnit(TimeUnit unit){
+ this.unit = unit;
+ }
+
+ public void setTask(Runnable task){
+ this.task = task;
+ }
+
+ public void process(){
+ if (f == null || f.isDone()){
+ System.out.println("scheduling execution");
+ f = executor.schedule(task, interval, unit);
+ }
+ }
+}
View
16 java/src/main/java/com/appfirst/statsd/InstantStrategy.java
@@ -0,0 +1,16 @@
+package com.appfirst.statsd;
+
+public class InstantStrategy implements Strategy {
+ private Runnable task;
+
+ @Override
+ public void setTask(Runnable task){
+ this.task = task;
+ }
+
+ @Override
+ public void process(){
+ this.task.run();
+ }
+
+}
View
12 java/src/main/java/com/appfirst/statsd/StatsdClient.java
@@ -92,4 +92,16 @@
public abstract boolean updateStats(int value, String message,
double sampleRate, String... buckets);
+ /**
+ * Send counter metrics with arbitrary magnitude. This one supports sampling and sending message (AppFirst Extension)
+ *
+ * @param bucket - The bucket name of the counter.
+ * @param value - the updating value of the counter.
+ * @param sampleRate - Rate of sampling. Note this is a counter only feature.
+ * @param message - the message of AppFirst Extended Statsd.
+ * @return True if success, False if fail to send stats.
+ */
+ public abstract boolean updateStats(String bucket, int value,
+ double sampleRate, String message);
+
}
View
7 java/src/main/java/com/appfirst/statsd/Strategy.java
@@ -0,0 +1,7 @@
+package com.appfirst.statsd;
+
+public interface Strategy {
+ public abstract void setTask(Runnable task);
+
+ public abstract void process();
+}
View
43 java/src/main/java/com/appfirst/statsd/TimerBucket.java
@@ -0,0 +1,43 @@
+package com.appfirst.statsd;
+
+public class TimerBucket implements Bucket{
+ private String name;
+ private int sumstat = 0;
+ private int count = 0;
+ private String message = null;
+
+ @Override
+ public void setName(String name){
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString(){
+ String stat = null;
+ int avg = this.sumstat/this.count;
+ if (message != null && !message.equals("")){
+ stat = String.format("%s:%d|ms||%s", name, avg, message);
+ } else {
+ stat = String.format("%s:%d|ms", name, avg);
+ }
+ return stat;
+ }
+
+ public TimerBucket infuse(int value, String message){
+ this.sumstat += value;
+ this.count++;
+ if (message != null && !message.equals("")){
+ if (this.message != null){
+ this.message += "|" + message;
+ } else {
+ this.message = message;
+ }
+ }
+ return this;
+ }
+}
View
1  java/src/main/java/com/appfirst/statsd/UDPClient.java
@@ -60,6 +60,7 @@ public UDPClient(InetAddress host, int port) throws IOException {
* @see com.appfirst.statsd.AbstractStatsdClient#doSend(java.lang.String)
*/
protected final boolean doSend(final String stat) {
+ log.info(String.format("Sending stat: %s", stat));
try {
final byte[] data = stat.getBytes("utf-8");
final ByteBuffer buff = ByteBuffer.wrap(data);
View
72 java/src/test/java/com/appfirst/statsd/test/TestBuckets.java
@@ -0,0 +1,72 @@
+package com.appfirst.statsd.test;
+
+import static org.junit.Assert.*;
+
+import java.util.Date;
+
+import org.junit.Test;
+
+import com.appfirst.statsd.CounterBucket;
+import com.appfirst.statsd.GaugeBucket;
+import com.appfirst.statsd.TimerBucket;
+
+public class TestBuckets {
+
+ @Test
+ public final void testCounter() {
+ CounterBucket bucket = new CounterBucket();
+ bucket.setName("counter");
+ String actual = bucket.infuse(1, 1, "message1")
+ .infuse(2, 1, "message2")
+ .infuse(4, 1, null)
+ .infuse(8, 0, null)
+ .toString();
+ String expected = "counter:7|c||message1|message2";
+ assertEquals("Aggregated stat", expected, actual);
+ }
+
+ @Test
+ public final void testTimer() {
+ TimerBucket bucket = new TimerBucket();
+ bucket.setName("timer");
+ String actual = bucket.infuse(1, "message1")
+ .infuse(3, "message2")
+ .infuse(5, null)
+ .toString();
+ String expected = "timer:3|ms||message1|message2";
+ assertEquals("Aggregated stat", expected, actual);
+ }
+
+ @Test
+ public final void testGauge() {
+ String actual = "";
+ long beforelast = 0L;
+ try {
+ GaugeBucket bucket = new GaugeBucket();
+ bucket.setName("gauge");
+ bucket.infuse(1, "message1")
+ .infuse(3, "message2");
+
+ beforelast = new Date().getTime();
+ Thread.sleep(1);
+
+ actual = bucket
+ .infuse(5, null)
+ .toString();
+
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ }
+
+ String expectedhead = "gauge:3|g|";
+ String expectedtail = "|message1|message2";
+ assertTrue("Aggregated stat", actual.startsWith(expectedhead));
+ assertTrue("Aggregated stat", actual.endsWith(expectedtail));
+ long timestamp = Long.valueOf(actual.substring(
+ actual.indexOf(expectedhead) + expectedhead.length(),
+ actual.indexOf(expectedtail)));
+ long now = new Date().getTime();
+ assertTrue("Some time before now", timestamp < now);
+ assertTrue("Some time after start", timestamp > beforelast);
+ }
+}
View
106 java/src/test/java/com/appfirst/statsd/test/TestStatsdClient.java
@@ -0,0 +1,106 @@
+package com.appfirst.statsd.test;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Random;
+
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Test;
+
+import com.appfirst.statsd.GeyserStrategy;
+import com.appfirst.statsd.UDPClient;
+
+public class TestStatsdClient {
+
+ @Test
+ public final void testSetStrategy() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testDump() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testRun() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testGaugeStringInt() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testGaugeStringIntString() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testTimingStringInt() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testTimingStringIntString() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testDecrement() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testIncrement() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testUpdateStatsIntStringArray() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testUpdateStatsIntDoubleStringArray() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testUpdateStatsIntStringDoubleStringArray() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testUpdateStatsStringIntDoubleString() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testDoSend() {
+ fail("Not yet implemented"); // TODO
+ }
+
+ @Test
+ public final void testUnderPressure() throws UnknownHostException, IOException, InterruptedException {
+ BasicConfigurator.configure();
+// StatsdClient client = new AFClient();
+ UDPClient client = new UDPClient();
+ client.setStrategy(new GeyserStrategy(2));
+ Random r = new Random();
+ for (int i=0; i<1000; i++){
+ Thread.sleep(r.nextInt(5));
+ long start = System.currentTimeMillis();
+ client.increment("multiple1");
+ int elapsedTimeMillis = (int)(System.currentTimeMillis()-start);
+ client.timing("incr_time", elapsedTimeMillis);
+ if (i%3==0){
+ client.increment("multiple3");
+ }
+ }
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.