From 4c48d022a7aeff3785bd5260aa4b795c69fa8188 Mon Sep 17 00:00:00 2001 From: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com> Date: Thu, 5 Dec 2019 16:42:31 -0800 Subject: [PATCH] Simply and enhance the RebalanceLatencyGauge so it can be used in multi-threads. (#636) The previous design of RebalanceLatencyGauge won't support asynchronous metric data emitting. This PR adds support by using a ThreadLocal object. The metric logic is not changed. --- .../implementation/RebalanceLatencyGauge.java | 25 +++++++++---------- .../metrics/model/LatencyMetric.java | 2 -- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java index 365f0dc1ea..b0c563b11b 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java @@ -19,9 +19,10 @@ * under the License. */ +import java.util.concurrent.TimeUnit; + import com.codahale.metrics.Histogram; import com.codahale.metrics.SlidingTimeWindowArrayReservoir; -import java.util.concurrent.TimeUnit; import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,8 @@ public class RebalanceLatencyGauge extends LatencyMetric { private static final Logger LOG = LoggerFactory.getLogger(RebalanceLatencyGauge.class); private static final long VALUE_NOT_SET = -1; private long _lastEmittedMetricValue = VALUE_NOT_SET; + // Use threadlocal here so the start time can be updated and recorded in multi-threads. + private final ThreadLocal _startTime; /** * Instantiates a new Histogram dynamic metric. @@ -39,10 +42,10 @@ public RebalanceLatencyGauge(String metricName, long slidingTimeWindow) { super(metricName, new Histogram( new SlidingTimeWindowArrayReservoir(slidingTimeWindow, TimeUnit.MILLISECONDS))); _metricName = metricName; + _startTime = ThreadLocal.withInitial(() -> VALUE_NOT_SET); } /** - * WARNING: this method is not thread-safe. * Calling this method multiple times would simply overwrite the previous state. This is because * the rebalancer could fail at any point, and we want it to recover gracefully by resetting the * internal state of this metric. @@ -50,24 +53,21 @@ public RebalanceLatencyGauge(String metricName, long slidingTimeWindow) { @Override public void startMeasuringLatency() { reset(); - _startTime = System.currentTimeMillis(); + _startTime.set(System.currentTimeMillis()); } - /** - * WARNING: this method is not thread-safe. - */ @Override public void endMeasuringLatency() { - if (_startTime == VALUE_NOT_SET || _endTime != VALUE_NOT_SET) { + if (_startTime.get() == VALUE_NOT_SET) { LOG.error( "Needs to call startMeasuringLatency first! Ignoring and resetting the metric. Metric name: {}", _metricName); - reset(); return; } - _endTime = System.currentTimeMillis(); - _lastEmittedMetricValue = _endTime - _startTime; - updateValue(_lastEmittedMetricValue); + synchronized (this) { + _lastEmittedMetricValue = System.currentTimeMillis() - _startTime.get(); + updateValue(_lastEmittedMetricValue); + } reset(); } @@ -84,7 +84,6 @@ public Long getLastEmittedMetricValue() { * Resets the internal state of this metric. */ private void reset() { - _startTime = VALUE_NOT_SET; - _endTime = VALUE_NOT_SET; + _startTime.set(VALUE_NOT_SET); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java index de63d0fba9..733635ef4e 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java @@ -28,8 +28,6 @@ * how long a particular stage in the logic took in milliseconds. */ public abstract class LatencyMetric extends HistogramDynamicMetric implements Metric { - protected long _startTime; - protected long _endTime; protected String _metricName; /**