Permalink
Browse files

S4-86 Fix metrics config and enable PE timing

- fix matcher for metrics configuration pattern
- enable optional metric for PE processing timing
  • Loading branch information...
1 parent bfe0498 commit d6b89c3074082c5cbe1a7bec7169d5c63ebfcd5c @matthieumorel matthieumorel committed Mar 4, 2013
@@ -71,17 +71,21 @@
private ReceiverImpl receiver;
@Inject
- RemoteSenders remoteSenders;
+ private RemoteSenders remoteSenders;
@Inject
- Hasher hasher;
+ private Hasher hasher;
@Inject
- RemoteStreams remoteStreams;
+ private RemoteStreams remoteStreams;
@Inject
@Named("s4.cluster.name")
- String clusterName;
+ private String clusterName;
+
+ @Inject(optional = true)
+ @Named("s4.metrics.peProcessingTime")
+ boolean measurePEProcessingTime = true;
// default is NoOpCheckpointingFramework
@Inject
@@ -54,6 +54,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
/**
* <p>
@@ -171,7 +172,6 @@ public ProcessingElement load(String key) throws Exception {
*/
this.pePrototype = this;
- processingTimer = Metrics.newTimer(getClass(), getClass().getName() + "-pe-processing-time");
}
/**
@@ -183,6 +183,10 @@ public ProcessingElement load(String key) throws Exception {
public ProcessingElement(App app) {
this();
setApp(app);
+ if (app.measurePEProcessingTime) {
+ processingTimer = Metrics.newTimer(getClass(), getClass().getName() + "-pe-processing-time");
+ }
+
}
/**
@@ -436,8 +440,11 @@ public boolean isThreadSafe() {
}
protected void handleInputEvent(Event event) {
-
- // TimerContext timerContext = processingTimer.time();
+ TimerContext timerContext = null;
+ if (processingTimer != null) {
+ // if timing enabled
+ timerContext = processingTimer.time();
+ }
Object object;
if (isThreadSafe) {
object = new Object(); // a dummy object TODO improve this.
@@ -466,7 +473,10 @@ protected void handleInputEvent(Event event) {
checkpoint();
}
}
- // timerContext.stop();
+ if (timerContext != null) {
+ // if timing enabled
+ timerContext.stop();
+ }
}
protected boolean isCheckpointable() {
@@ -83,7 +83,6 @@ private void init() {
"Invalid metrics configuration [{}]. Metrics configuration must match the pattern [{}]. Metrics reporting disabled.",
metricsConfig, METRICS_CONFIG_PATTERN);
} else {
- matcher.find();
String group1 = matcher.group(1);
if ("csv".equals(group1)) {

0 comments on commit d6b89c3

Please sign in to comment.